static void xprt_init(struct rpc_xprt *xprt, struct net *net);
static __be32 xprt_alloc_xid(struct rpc_xprt *xprt);
static void xprt_connect_status(struct rpc_task *task);
-static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
-static void __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *);
static void xprt_destroy(struct rpc_xprt *xprt);
static DEFINE_SPINLOCK(xprt_list_lock);
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
}
+static bool
+xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
+{
+ return test_bit(XPRT_CWND_WAIT, &xprt->state);
+}
+
+static void
+xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
+{
+ if (!list_empty(&xprt->xmit_queue)) {
+ /* Peek at head of queue to see if it can make progress */
+ if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
+ rq_xmit)->rq_cong)
+ return;
+ }
+ set_bit(XPRT_CWND_WAIT, &xprt->state);
+}
+
+static void
+xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
+{
+ if (!RPCXPRT_CONGESTED(xprt))
+ clear_bit(XPRT_CWND_WAIT, &xprt->state);
+}
+
/*
* xprt_reserve_xprt_cong - serialize write access to transports
* @task: task that is requesting access to the transport
* Same as xprt_reserve_xprt, but Van Jacobson congestion control is
* integrated into the decision of whether a request is allowed to be
* woken up and given access to the transport.
+ * Note that the lock is only granted if we know there are free slots.
*/
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{
xprt->snd_task = task;
return 1;
}
- if (__xprt_get_cong(xprt, task)) {
+ if (!xprt_need_congestion_window_wait(xprt)) {
xprt->snd_task = task;
return 1;
}
xprt_clear_locked(xprt);
out_sleep:
- if (req)
- __xprt_put_cong(xprt, req);
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
xprt_clear_locked(xprt);
}
-static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data)
-{
- struct rpc_xprt *xprt = data;
- struct rpc_rqst *req;
-
- req = task->tk_rqstp;
- if (req == NULL) {
- xprt->snd_task = task;
- return true;
- }
- if (__xprt_get_cong(xprt, task)) {
- xprt->snd_task = task;
- req->rq_ntrans++;
- return true;
- }
- return false;
-}
-
static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
{
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;
- if (RPCXPRT_CONGESTED(xprt))
+ if (xprt_need_congestion_window_wait(xprt))
goto out_unlock;
if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
- __xprt_lock_write_cong_func, xprt))
+ __xprt_lock_write_func, xprt))
return;
out_unlock:
xprt_clear_locked(xprt);
* overflowed. Put the task to sleep if this is the case.
*/
static int
-__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
+__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
- struct rpc_rqst *req = task->tk_rqstp;
-
if (req->rq_cong)
return 1;
dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
- task->tk_pid, xprt->cong, xprt->cwnd);
- if (RPCXPRT_CONGESTED(xprt))
+ req->rq_task->tk_pid, xprt->cong, xprt->cwnd);
+ if (RPCXPRT_CONGESTED(xprt)) {
+ xprt_set_congestion_window_wait(xprt);
return 0;
+ }
req->rq_cong = 1;
xprt->cong += RPC_CWNDSCALE;
return 1;
return;
req->rq_cong = 0;
xprt->cong -= RPC_CWNDSCALE;
+ xprt_test_and_clear_congestion_window_wait(xprt);
__xprt_lock_write_next_cong(xprt);
}
+/**
+ * xprt_request_get_cong - Request congestion control credits
+ * @xprt: pointer to transport
+ * @req: pointer to RPC request
+ *
+ * Useful for transports that require congestion control.
+ */
+bool
+xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+ bool ret = false;
+
+ if (req->rq_cong)
+ return true;
+ spin_lock_bh(&xprt->transport_lock);
+ ret = __xprt_get_cong(xprt, req) != 0;
+ spin_unlock_bh(&xprt->transport_lock);
+ return ret;
+}
+EXPORT_SYMBOL_GPL(xprt_request_get_cong);
+
/**
* xprt_release_rqst_cong - housekeeping when request is complete
* @task: RPC request that recently completed
}
EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
+/*
+ * Clear the congestion window wait flag and wake up the next
+ * entry on xprt->sending
+ */
+static void
+xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
+{
+ if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
+ spin_lock_bh(&xprt->transport_lock);
+ __xprt_lock_write_next_cong(xprt);
+ spin_unlock_bh(&xprt->transport_lock);
+ }
+}
+
/**
* xprt_adjust_cwnd - adjust transport congestion window
* @xprt: pointer to xprt
if (xprt_request_need_enqueue_transmit(task, req)) {
spin_lock(&xprt->queue_lock);
- list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
- if (pos->rq_task->tk_owner != task->tk_owner)
- continue;
- list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
- INIT_LIST_HEAD(&req->rq_xmit);
- goto out;
+ /*
+ * Requests that carry congestion control credits are added
+ * to the head of the list to avoid starvation issues.
+ */
+ if (req->rq_cong) {
+ xprt_clear_congestion_window_wait(xprt);
+ list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
+ if (pos->rq_cong)
+ continue;
+ /* Note: req is added _before_ pos */
+ list_add_tail(&req->rq_xmit, &pos->rq_xmit);
+ INIT_LIST_HEAD(&req->rq_xmit2);
+ goto out;
+ }
+ } else {
+ list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
+ if (pos->rq_task->tk_owner != task->tk_owner)
+ continue;
+ list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
+ INIT_LIST_HEAD(&req->rq_xmit);
+ goto out;
+ }
}
list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
INIT_LIST_HEAD(&req->rq_xmit2);