SUNRPC: Support dynamic slot allocation for TCP connections
authorTrond Myklebust <Trond.Myklebust@netapp.com>
Sun, 17 Jul 2011 22:11:30 +0000 (18:11 -0400)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Sun, 17 Jul 2011 22:11:30 +0000 (18:11 -0400)
Allow the number of available slots to grow with the TCP window size.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
include/linux/sunrpc/xprt.h
net/sunrpc/xprt.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtsock.c

index 05047250f2180f94b268e4ed94df32e0b407a042..d02762d1de279a600cac7251219fef8403f6cbcc 100644 (file)
@@ -22,6 +22,7 @@
 #define RPC_MIN_SLOT_TABLE     (2U)
 #define RPC_DEF_SLOT_TABLE     (16U)
 #define RPC_MAX_SLOT_TABLE     (128U)
+#define RPC_MAX_SLOT_TABLE_LIMIT       (65536U)
 
 /*
  * This describes a timeout strategy
@@ -168,7 +169,9 @@ struct rpc_xprt {
        struct rpc_wait_queue   pending;        /* requests in flight */
        struct rpc_wait_queue   backlog;        /* waiting for slot */
        struct list_head        free;           /* free slots */
-       unsigned int            max_reqs;       /* total slots */
+       unsigned int            max_reqs;       /* max number of slots */
+       unsigned int            min_reqs;       /* min number of slots */
+       atomic_t                num_reqs;       /* total slots */
        unsigned long           state;          /* transport state */
        unsigned char           shutdown   : 1, /* being shut down */
                                resvport   : 1; /* use a reserved port */
@@ -281,7 +284,9 @@ void                        xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
 void                   xprt_release(struct rpc_task *task);
 struct rpc_xprt *      xprt_get(struct rpc_xprt *xprt);
 void                   xprt_put(struct rpc_xprt *xprt);
-struct rpc_xprt *      xprt_alloc(struct net *net, int size, int max_req);
+struct rpc_xprt *      xprt_alloc(struct net *net, size_t size,
+                               unsigned int num_prealloc,
+                               unsigned int max_req);
 void                   xprt_free(struct rpc_xprt *);
 
 static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
index ea7b3c16cddde369885251c6ad781fae378aa863..be85cf04a47990b42f824a0d4852c2826cadca47 100644 (file)
@@ -935,25 +935,66 @@ void xprt_transmit(struct rpc_task *task)
        spin_unlock_bh(&xprt->transport_lock);
 }
 
+static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
+{
+       struct rpc_rqst *req = ERR_PTR(-EAGAIN);
+
+       if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
+               goto out;
+       req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
+       if (req != NULL)
+               goto out;
+       atomic_dec(&xprt->num_reqs);
+       req = ERR_PTR(-ENOMEM);
+out:
+       return req;
+}
+
+static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+       if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
+               kfree(req);
+               return true;
+       }
+       return false;
+}
+
 static void xprt_alloc_slot(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
+       struct rpc_rqst *req;
 
-       task->tk_status = 0;
        if (!list_empty(&xprt->free)) {
-               struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
-               list_del_init(&req->rq_list);
-               task->tk_rqstp = req;
-               xprt_request_init(task, xprt);
-               return;
+               req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
+               list_del(&req->rq_list);
+               goto out_init_req;
+       }
+       req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT);
+       if (!IS_ERR(req))
+               goto out_init_req;
+       switch (PTR_ERR(req)) {
+       case -ENOMEM:
+               rpc_delay(task, HZ >> 2);
+               dprintk("RPC:       dynamic allocation of request slot "
+                               "failed! Retrying\n");
+               break;
+       case -EAGAIN:
+               rpc_sleep_on(&xprt->backlog, task, NULL);
+               dprintk("RPC:       waiting for request slot\n");
        }
-       dprintk("RPC:       waiting for request slot\n");
        task->tk_status = -EAGAIN;
-       rpc_sleep_on(&xprt->backlog, task, NULL);
+       return;
+out_init_req:
+       task->tk_status = 0;
+       task->tk_rqstp = req;
+       xprt_request_init(task, xprt);
 }
 
 static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
+       if (xprt_dynamic_free_slot(xprt, req))
+               return;
+
        memset(req, 0, sizeof(*req));   /* mark unused */
 
        spin_lock(&xprt->reserve_lock);
@@ -972,7 +1013,9 @@ static void xprt_free_all_slots(struct rpc_xprt *xprt)
        }
 }
 
-struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_prealloc)
+struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
+               unsigned int num_prealloc,
+               unsigned int max_alloc)
 {
        struct rpc_xprt *xprt;
        struct rpc_rqst *req;
@@ -992,7 +1035,12 @@ struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_prealloc)
        }
        if (i < num_prealloc)
                goto out_free;
-       xprt->max_reqs = num_prealloc;
+       if (max_alloc > num_prealloc)
+               xprt->max_reqs = max_alloc;
+       else
+               xprt->max_reqs = num_prealloc;
+       xprt->min_reqs = num_prealloc;
+       atomic_set(&xprt->num_reqs, num_prealloc);
 
        return xprt;
 
@@ -1036,7 +1084,6 @@ void xprt_reserve(struct rpc_task *task)
        if (!xprt_lock_write(xprt, task))
                return;
 
-       task->tk_status = -EIO;
        spin_lock(&xprt->reserve_lock);
        xprt_alloc_slot(task);
        spin_unlock(&xprt->reserve_lock);
@@ -1057,6 +1104,7 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 {
        struct rpc_rqst *req = task->tk_rqstp;
 
+       INIT_LIST_HEAD(&req->rq_list);
        req->rq_timeout = task->tk_client->cl_timeout->to_initval;
        req->rq_task    = task;
        req->rq_xprt    = xprt;
index 674a49224450ac63c8069641f1df852e153e3255..b446e100286f7aca96f81ca10d6cb176471d55d5 100644 (file)
@@ -283,6 +283,7 @@ xprt_setup_rdma(struct xprt_create *args)
        }
 
        xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
+                       xprt_rdma_slot_table_entries,
                        xprt_rdma_slot_table_entries);
        if (xprt == NULL) {
                dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
index adaa54c6a09adb172bbf31fe07855bf86d18b450..d7f97ef265904f0ddc0d691360f1e920dc63210b 100644 (file)
@@ -54,7 +54,8 @@ static void xs_close(struct rpc_xprt *xprt);
  * xprtsock tunables
  */
 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
-unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
+unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
+unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
 
 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
@@ -75,6 +76,7 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
 
 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
+static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
 
@@ -103,6 +105,15 @@ static ctl_table xs_tunables_table[] = {
                .extra1         = &min_slot_table_size,
                .extra2         = &max_slot_table_size
        },
+       {
+               .procname       = "tcp_max_slot_table_entries",
+               .data           = &xprt_max_tcp_slot_table_entries,
+               .maxlen         = sizeof(unsigned int),
+               .mode           = 0644,
+               .proc_handler   = proc_dointvec_minmax,
+               .extra1         = &min_slot_table_size,
+               .extra2         = &max_tcp_slot_table_limit
+       },
        {
                .procname       = "min_resvport",
                .data           = &xprt_min_resvport,
@@ -2491,7 +2502,8 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap)
 }
 
 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
-                                     unsigned int slot_table_size)
+                                     unsigned int slot_table_size,
+                                     unsigned int max_slot_table_size)
 {
        struct rpc_xprt *xprt;
        struct sock_xprt *new;
@@ -2501,7 +2513,8 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
                return ERR_PTR(-EBADF);
        }
 
-       xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size);
+       xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
+                       max_slot_table_size);
        if (xprt == NULL) {
                dprintk("RPC:       xs_setup_xprt: couldn't allocate "
                                "rpc_xprt\n");
@@ -2543,7 +2556,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
        struct rpc_xprt *xprt;
        struct rpc_xprt *ret;
 
-       xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+       xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+                       xprt_max_tcp_slot_table_entries);
        if (IS_ERR(xprt))
                return xprt;
        transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2607,7 +2621,8 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
        struct sock_xprt *transport;
        struct rpc_xprt *ret;
 
-       xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
+       xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
+                       xprt_udp_slot_table_entries);
        if (IS_ERR(xprt))
                return xprt;
        transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2683,7 +2698,8 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
        struct sock_xprt *transport;
        struct rpc_xprt *ret;
 
-       xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+       xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+                       xprt_max_tcp_slot_table_entries);
        if (IS_ERR(xprt))
                return xprt;
        transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2762,7 +2778,8 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
                 */
                 return args->bc_xprt->xpt_bc_xprt;
        }
-       xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+       xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+                       xprt_tcp_slot_table_entries);
        if (IS_ERR(xprt))
                return xprt;
        transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2949,8 +2966,26 @@ static struct kernel_param_ops param_ops_slot_table_size = {
 #define param_check_slot_table_size(name, p) \
        __param_check(name, p, unsigned int);
 
+static int param_set_max_slot_table_size(const char *val,
+                                    const struct kernel_param *kp)
+{
+       return param_set_uint_minmax(val, kp,
+                       RPC_MIN_SLOT_TABLE,
+                       RPC_MAX_SLOT_TABLE_LIMIT);
+}
+
+static struct kernel_param_ops param_ops_max_slot_table_size = {
+       .set = param_set_max_slot_table_size,
+       .get = param_get_uint,
+};
+
+#define param_check_max_slot_table_size(name, p) \
+       __param_check(name, p, unsigned int);
+
 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
                   slot_table_size, 0644);
+module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
+                  max_slot_table_size, 0644);
 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
                   slot_table_size, 0644);