xprtrdma: ->send_request returns -EAGAIN when there are no free MRs
authorChuck Lever <chuck.lever@oracle.com>
Wed, 28 Feb 2018 20:30:44 +0000 (15:30 -0500)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Tue, 10 Apr 2018 20:06:22 +0000 (16:06 -0400)
Currently, when the MR free list is exhausted during marshaling, the
RPC/RDMA transport places the RPC task on the delayq, which forces a
wait for HZ >> 2 before the marshal and send is retried.

With this change, the transport now places such an RPC task on the
pending queue, and wakes it just as soon as more MRs have been
created. Creating more MRs typically takes less than a millisecond,
and this waking mechanism is less deadlock-prone.

Moreover, the waiting RPC task is holding the transport's write
lock, which blocks the transport from sending RPCs. Therefore faster
recovery from MR exhaustion is desirable.

This is the same mechanism that the TCP transport utilizes when
handling write buffer space exhaustion.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/fmr_ops.c
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c

index d5f95bb39300e61ac2e3e4f9ddd83e947135473f..629e5397332185dc4ba2ec9550b7f9a05b8e189e 100644 (file)
@@ -191,7 +191,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 
        mr = rpcrdma_mr_get(r_xprt);
        if (!mr)
-               return ERR_PTR(-ENOBUFS);
+               return ERR_PTR(-EAGAIN);
 
        pageoff = offset_in_page(seg1->mr_offset);
        seg1->mr_offset -= pageoff;     /* start of page */
index 90f688f19783d089fe90fffc40f104bb6e43a94c..e21781cb20be4a6364216df9cd8b05e8fe4de8d8 100644 (file)
@@ -367,7 +367,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
                        rpcrdma_mr_defer_recovery(mr);
                mr = rpcrdma_mr_get(r_xprt);
                if (!mr)
-                       return ERR_PTR(-ENOBUFS);
+                       return ERR_PTR(-EAGAIN);
        } while (mr->frwr.fr_state != FRWR_IS_INVALID);
        frwr = &mr->frwr;
        frwr->fr_state = FRWR_IS_VALID;
index 4bc0f4d94a0168e4dadd970a6049b16a1c466008..e8adad33d0bb71671625c4da2b29ca05d8ce7b2a 100644 (file)
@@ -365,7 +365,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
                seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
                                                   false, &mr);
                if (IS_ERR(seg))
-                       return PTR_ERR(seg);
+                       goto out_maperr;
                rpcrdma_mr_push(mr, &req->rl_registered);
 
                if (encode_read_segment(xdr, mr, pos) < 0)
@@ -377,6 +377,11 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
        } while (nsegs);
 
        return 0;
+
+out_maperr:
+       if (PTR_ERR(seg) == -EAGAIN)
+               xprt_wait_for_buffer_space(rqst->rq_task, NULL);
+       return PTR_ERR(seg);
 }
 
 /* Register and XDR encode the Write list. Supports encoding a list
@@ -423,7 +428,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
                seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
                                                   true, &mr);
                if (IS_ERR(seg))
-                       return PTR_ERR(seg);
+                       goto out_maperr;
                rpcrdma_mr_push(mr, &req->rl_registered);
 
                if (encode_rdma_segment(xdr, mr) < 0)
@@ -440,6 +445,11 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
        *segcount = cpu_to_be32(nchunks);
 
        return 0;
+
+out_maperr:
+       if (PTR_ERR(seg) == -EAGAIN)
+               xprt_wait_for_buffer_space(rqst->rq_task, NULL);
+       return PTR_ERR(seg);
 }
 
 /* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -481,7 +491,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
                seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
                                                   true, &mr);
                if (IS_ERR(seg))
-                       return PTR_ERR(seg);
+                       goto out_maperr;
                rpcrdma_mr_push(mr, &req->rl_registered);
 
                if (encode_rdma_segment(xdr, mr) < 0)
@@ -498,6 +508,11 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
        *segcount = cpu_to_be32(nchunks);
 
        return 0;
+
+out_maperr:
+       if (PTR_ERR(seg) == -EAGAIN)
+               xprt_wait_for_buffer_space(rqst->rq_task, NULL);
+       return PTR_ERR(seg);
 }
 
 /**
@@ -724,8 +739,8 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
  * Returns:
  *     %0 if the RPC was sent successfully,
  *     %-ENOTCONN if the connection was lost,
- *     %-EAGAIN if not enough pages are available for on-demand reply buffer,
- *     %-ENOBUFS if no MRs are available to register chunks,
+ *     %-EAGAIN if the caller should call again with the same arguments,
+ *     %-ENOBUFS if the caller should call again after a delay,
  *     %-EMSGSIZE if the transport header is too small,
  *     %-EIO if a permanent problem occurred while marshaling.
  */
@@ -868,10 +883,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
        return 0;
 
 out_err:
-       if (ret != -ENOBUFS) {
-               pr_err("rpcrdma: header marshaling failed (%d)\n", ret);
-               r_xprt->rx_stats.failed_marshal_count++;
-       }
+       r_xprt->rx_stats.failed_marshal_count++;
        return ret;
 }
 
index 47b4604e820a7bc977c92b35d1db9715954cdd4d..08196896953db706ff1c6517bb1aae912848ca66 100644 (file)
@@ -689,7 +689,8 @@ xprt_rdma_free(struct rpc_task *task)
  * Returns:
  *     %0 if the RPC message has been sent
  *     %-ENOTCONN if the caller should reconnect and call again
- *     %-ENOBUFS if the caller should call again later
+ *     %-EAGAIN if the caller should call again
+ *     %-ENOBUFS if the caller should call again after a delay
  *     %-EIO if a permanent error occurred and the request was not
  *             sent. Do not try to send this message again.
  */
index 520e7e4fe2037a9ed32caa9eb0b008b39fff99da..d36c18f26cd3188081cad44878748c7be82129ca 100644 (file)
@@ -1048,8 +1048,9 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
        list_splice(&all, &buf->rb_all);
        r_xprt->rx_stats.mrs_allocated += count;
        spin_unlock(&buf->rb_mrlock);
-
        trace_xprtrdma_createmrs(r_xprt, count);
+
+       xprt_write_space(&r_xprt->rx_xprt);
 }
 
 static void