}
/**
- * xprt_transmit - send an RPC request on a transport
- * @task: controlling RPC task
+ * xprt_request_transmit - send an RPC request on a transport
+ * @req: pointer to request to transmit
+ * @snd_task: RPC task that owns the transport lock
*
- * We have to copy the iovec because sendmsg fiddles with its contents.
+ * This performs the transmission of a single request.
+ * Note that if the request is not the same as snd_task, then it
+ * does need to be pinned.
+ * Returns '0' on success.
*/
-void xprt_transmit(struct rpc_task *task)
+static int
+xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
{
- struct rpc_rqst *req = task->tk_rqstp;
- struct rpc_xprt *xprt = req->rq_xprt;
+ struct rpc_xprt *xprt = req->rq_xprt;
+ struct rpc_task *task = req->rq_task;
unsigned int connect_cookie;
int is_retrans = RPC_WAS_SENT(task);
int status;
dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
if (!req->rq_bytes_sent) {
- if (xprt_request_data_received(task))
+ if (xprt_request_data_received(task)) {
+ status = 0;
goto out_dequeue;
+ }
/* Verify that our message lies in the RPCSEC_GSS window */
if (rpcauth_xmit_need_reencode(task)) {
- task->tk_status = -EBADMSG;
+ status = -EBADMSG;
goto out_dequeue;
}
}
req->rq_ntrans++;
connect_cookie = xprt->connect_cookie;
- status = xprt->ops->send_request(req, task);
+ status = xprt->ops->send_request(req, snd_task);
trace_xprt_transmit(xprt, req->rq_xid, status);
if (status != 0) {
req->rq_ntrans--;
- task->tk_status = status;
- return;
+ return status;
}
if (is_retrans)
req->rq_connect_cookie = connect_cookie;
out_dequeue:
xprt_request_dequeue_transmit(task);
+ rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
+ return status;
+}
+
+/**
+ * xprt_transmit - send an RPC request on a transport
+ * @task: controlling RPC task
+ *
+ * Attempts to drain the transmit queue. On exit, either the transport
+ * signalled an error that needs to be handled before transmission can
+ * resume, or @task finished transmitting, and detected that it already
+ * received a reply.
+ */
+void
+xprt_transmit(struct rpc_task *task)
+{
+ struct rpc_rqst *next, *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+ int status;
+
+ spin_lock(&xprt->queue_lock);
+ while (!list_empty(&xprt->xmit_queue)) {
+ next = list_first_entry(&xprt->xmit_queue,
+ struct rpc_rqst, rq_xmit);
+ xprt_pin_rqst(next);
+ spin_unlock(&xprt->queue_lock);
+ status = xprt_request_transmit(next, task);
+ if (status == -EBADMSG && next != req)
+ status = 0;
+ cond_resched();
+ spin_lock(&xprt->queue_lock);
+ xprt_unpin_rqst(next);
+ if (status == 0) {
+ if (!xprt_request_data_received(task) ||
+ test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+ continue;
+ } else if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+ rpc_wake_up_queued_task(&xprt->pending, task);
+ else
+ task->tk_status = status;
+ break;
+ }
+ spin_unlock(&xprt->queue_lock);
}
static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)