xprtrdma: Backchannel can use GFP_KERNEL allocations
authorChuck Lever <chuck.lever@oracle.com>
Wed, 24 Apr 2019 13:39:37 +0000 (09:39 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Thu, 25 Apr 2019 19:18:58 +0000 (15:18 -0400)
The Receive handler runs in process context, thus can use on-demand
GFP_KERNEL allocations instead of pre-allocation.

This makes the xprtrdma backchannel independent of the number of
backchannel session slots provisioned by the Upper Layer protocol.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/backchannel.c

index e1a125ad888dafd241b38095f180cc41bf69a399..ae51ef6a897a26dff31913702b7f209def797336 100644 (file)
 
 #undef RPCRDMA_BACKCHANNEL_DEBUG
 
-static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
-                                unsigned int count)
-{
-       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-       struct rpcrdma_req *req;
-       struct rpc_rqst *rqst;
-       unsigned int i;
-
-       for (i = 0; i < (count << 1); i++) {
-               size_t size;
-
-               size = min_t(size_t, r_xprt->rx_data.inline_rsize, PAGE_SIZE);
-               req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
-               if (!req)
-                       return -ENOMEM;
-               rqst = &req->rl_slot;
-
-               rqst->rq_xprt = xprt;
-               INIT_LIST_HEAD(&rqst->rq_bc_list);
-               __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
-               spin_lock(&xprt->bc_pa_lock);
-               list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
-               spin_unlock(&xprt->bc_pa_lock);
-               xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf),
-                            size);
-       }
-       return 0;
-}
-
 /**
  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
  * @xprt: transport associated with these backchannel resources
@@ -58,34 +29,10 @@ static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
 {
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
-       int rc;
-
-       /* The backchannel reply path returns each rpc_rqst to the
-        * bc_pa_list _after_ the reply is sent. If the server is
-        * faster than the client, it can send another backward
-        * direction request before the rpc_rqst is returned to the
-        * list. The client rejects the request in this case.
-        *
-        * Twice as many rpc_rqsts are prepared to ensure there is
-        * always an rpc_rqst available as soon as a reply is sent.
-        */
-       if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
-               goto out_err;
-
-       rc = rpcrdma_bc_setup_reqs(r_xprt, reqs);
-       if (rc)
-               goto out_free;
 
-       r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
+       r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1;
        trace_xprtrdma_cb_setup(r_xprt, reqs);
        return 0;
-
-out_free:
-       xprt_rdma_bc_destroy(xprt, reqs);
-
-out_err:
-       pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
-       return -ENOMEM;
 }
 
 /**
@@ -213,6 +160,43 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
        spin_unlock(&xprt->bc_pa_lock);
 }
 
+static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+       struct rpcrdma_req *req;
+       struct rpc_rqst *rqst;
+       size_t size;
+
+       spin_lock(&xprt->bc_pa_lock);
+       rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst,
+                                       rq_bc_pa_list);
+       if (!rqst)
+               goto create_req;
+       list_del(&rqst->rq_bc_pa_list);
+       spin_unlock(&xprt->bc_pa_lock);
+       return rqst;
+
+create_req:
+       spin_unlock(&xprt->bc_pa_lock);
+
+       /* Set a limit to prevent a remote from overrunning our resources.
+        */
+       if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS)
+               return NULL;
+
+       size = min_t(size_t, r_xprt->rx_data.inline_rsize, PAGE_SIZE);
+       req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
+       if (!req)
+               return NULL;
+
+       xprt->bc_alloc_count++;
+       rqst = &req->rl_slot;
+       rqst->rq_xprt = xprt;
+       __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+       xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
+       return rqst;
+}
+
 /**
  * rpcrdma_bc_receive_call - Handle a backward direction call
  * @r_xprt: transport receiving the call
@@ -244,18 +228,10 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
        pr_info("RPC:       %s: %*ph\n", __func__, size, p);
 #endif
 
-       /* Grab a free bc rqst */
-       spin_lock(&xprt->bc_pa_lock);
-       if (list_empty(&xprt->bc_pa_list)) {
-               spin_unlock(&xprt->bc_pa_lock);
+       rqst = rpcrdma_bc_rqst_get(r_xprt);
+       if (!rqst)
                goto out_overflow;
-       }
-       rqst = list_first_entry(&xprt->bc_pa_list,
-                               struct rpc_rqst, rq_bc_pa_list);
-       list_del(&rqst->rq_bc_pa_list);
-       spin_unlock(&xprt->bc_pa_lock);
 
-       /* Prepare rqst */
        rqst->rq_reply_bytes_recvd = 0;
        rqst->rq_xid = *p;