xprtrdma: Refactor rpcrdma_reply_handler some more
authorChuck Lever <chuck.lever@oracle.com>
Mon, 16 Oct 2017 19:01:22 +0000 (15:01 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Fri, 17 Nov 2017 18:47:54 +0000 (13:47 -0500)
Clean up: I'd like to be able to invoke the tail of
rpcrdma_reply_handler in two different places. Split the tail out
into its own helper function.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/xprt_rdma.h

index e355cd322a329b413c0f9e5ef20cfe68848120c1..418bcc6b3e1d29b69d2854e2d7e1dbff13310372 100644 (file)
@@ -1211,6 +1211,60 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
        return -EREMOTEIO;
 }
 
+/* Perform XID lookup, reconstruction of the RPC reply, and
+ * RPC completion while holding the transport lock to ensure
+ * the rep, rqst, and rq_task pointers remain stable.
+ */
+void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
+{
+       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+       struct rpc_rqst *rqst = rep->rr_rqst;
+       unsigned long cwnd;
+       int status;
+
+       xprt->reestablish_timeout = 0;
+
+       switch (rep->rr_proc) {
+       case rdma_msg:
+               status = rpcrdma_decode_msg(r_xprt, rep, rqst);
+               break;
+       case rdma_nomsg:
+               status = rpcrdma_decode_nomsg(r_xprt, rep);
+               break;
+       case rdma_error:
+               status = rpcrdma_decode_error(r_xprt, rep, rqst);
+               break;
+       default:
+               status = -EIO;
+       }
+       if (status < 0)
+               goto out_badheader;
+
+out:
+       spin_lock(&xprt->recv_lock);
+       cwnd = xprt->cwnd;
+       xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
+       if (xprt->cwnd > cwnd)
+               xprt_release_rqst_cong(rqst->rq_task);
+
+       xprt_complete_rqst(rqst->rq_task, status);
+       xprt_unpin_rqst(rqst);
+       spin_unlock(&xprt->recv_lock);
+       return;
+
+/* If the incoming reply terminated a pending RPC, the next
+ * RPC call will post a replacement receive buffer as it is
+ * being marshaled.
+ */
+out_badheader:
+       dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
+               rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
+       r_xprt->rx_stats.bad_reply_count++;
+       status = -EIO;
+       goto out;
+}
+
 /* Process received RPC/RDMA messages.
  *
  * Errors must result in the RPC task either being awakened, or
@@ -1225,8 +1279,6 @@ rpcrdma_reply_handler(struct work_struct *work)
        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        struct rpcrdma_req *req;
        struct rpc_rqst *rqst;
-       unsigned long cwnd;
-       int status;
        __be32 *p;
 
        dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
@@ -1263,6 +1315,7 @@ rpcrdma_reply_handler(struct work_struct *work)
        spin_unlock(&xprt->recv_lock);
        req = rpcr_to_rdmar(rqst);
        req->rl_reply = rep;
+       rep->rr_rqst = rqst;
 
        dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
                __func__, rep, req, be32_to_cpu(rep->rr_xid));
@@ -1280,36 +1333,7 @@ rpcrdma_reply_handler(struct work_struct *work)
                                                    &req->rl_registered);
        }
 
-       xprt->reestablish_timeout = 0;
-
-       switch (rep->rr_proc) {
-       case rdma_msg:
-               status = rpcrdma_decode_msg(r_xprt, rep, rqst);
-               break;
-       case rdma_nomsg:
-               status = rpcrdma_decode_nomsg(r_xprt, rep);
-               break;
-       case rdma_error:
-               status = rpcrdma_decode_error(r_xprt, rep, rqst);
-               break;
-       default:
-               status = -EIO;
-       }
-       if (status < 0)
-               goto out_badheader;
-
-out:
-       spin_lock(&xprt->recv_lock);
-       cwnd = xprt->cwnd;
-       xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
-       if (xprt->cwnd > cwnd)
-               xprt_release_rqst_cong(rqst->rq_task);
-
-       xprt_complete_rqst(rqst->rq_task, status);
-       xprt_unpin_rqst(rqst);
-       spin_unlock(&xprt->recv_lock);
-       dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
-               __func__, xprt, rqst, status);
+       rpcrdma_complete_rqst(rep);
        return;
 
 out_badstatus:
@@ -1325,20 +1349,8 @@ out_badversion:
                __func__, be32_to_cpu(rep->rr_vers));
        goto repost;
 
-/* If the incoming reply terminated a pending RPC, the next
- * RPC call will post a replacement receive buffer as it is
- * being marshaled.
- */
-out_badheader:
-       dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
-               rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
-       r_xprt->rx_stats.bad_reply_count++;
-       status = -EIO;
-       goto out;
-
-/* The req was still available, but by the time the recv_lock
- * was acquired, the rqst and task had been released. Thus the RPC
- * has already been terminated.
+/* The RPC transaction has already been terminated, or the header
+ * is corrupt.
  */
 out_norqst:
        spin_unlock(&xprt->recv_lock);
@@ -1348,7 +1360,6 @@ out_norqst:
 
 out_shortreply:
        dprintk("RPC:       %s: short/invalid reply\n", __func__);
-       goto repost;
 
 /* If no pending RPC transaction was matched, post a replacement
  * receive buffer before returning.
index 858b4c52047d94da4eb18646b19f45a406cc056a..d68a1351d95e1207dd7e065590c6450db36ae826 100644 (file)
@@ -202,18 +202,17 @@ enum {
 };
 
 /*
- * struct rpcrdma_rep -- this structure encapsulates state required to recv
- * and complete a reply, asychronously. It needs several pieces of
- * state:
- *   o recv buffer (posted to provider)
- *   o ib_sge (also donated to provider)
- *   o status of reply (length, success or not)
- *   o bookkeeping state to get run by reply handler (list, etc)
+ * struct rpcrdma_rep -- this structure encapsulates state required
+ * to receive and complete an RPC Reply, asychronously. It needs
+ * several pieces of state:
  *
- * These are allocated during initialization, per-transport instance.
+ *   o receive buffer and ib_sge (donated to provider)
+ *   o status of receive (success or not, length, inv rkey)
+ *   o bookkeeping state to get run by reply handler (XDR stream)
  *
- * N of these are associated with a transport instance, and stored in
- * struct rpcrdma_buffer. N is the max number of outstanding requests.
+ * These structures are allocated during transport initialization.
+ * N of these are associated with a transport instance, managed by
+ * struct rpcrdma_buffer. N is the max number of outstanding RPCs.
  */
 
 struct rpcrdma_rep {
@@ -228,6 +227,7 @@ struct rpcrdma_rep {
        struct work_struct      rr_work;
        struct xdr_buf          rr_hdrbuf;
        struct xdr_stream       rr_stream;
+       struct rpc_rqst         *rr_rqst;
        struct list_head        rr_list;
        struct ib_recv_wr       rr_recv_wr;
 };
@@ -616,6 +616,7 @@ bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
 void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
 int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
+void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
 void rpcrdma_reply_handler(struct work_struct *work);
 
 static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)