spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1;
- spin_lock_init(&conn->c_send_lock);
- atomic_set(&conn->c_send_generation, 1);
- atomic_set(&conn->c_senders, 0);
+ init_waitqueue_head(&conn->c_waitq);
INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans);
}
mutex_unlock(&conn->c_cm_lock);
- /* verify everybody's out of rds_send_xmit() */
- spin_lock_irq(&conn->c_send_lock);
- spin_unlock_irq(&conn->c_send_lock);
-
- while(atomic_read(&conn->c_senders)) {
- schedule_timeout(1);
- spin_lock_irq(&conn->c_send_lock);
- spin_unlock_irq(&conn->c_send_lock);
- }
+ wait_event(conn->c_waitq,
+ !test_bit(RDS_IN_XMIT, &conn->c_flags));
conn->c_trans->conn_shutdown(conn);
rds_conn_reset(conn);
sizeof(cinfo->transport));
cinfo->flags = 0;
- rds_conn_info_set(cinfo->flags,
- spin_is_locked(&conn->c_send_lock), SENDING);
+ rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+ SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
/*
- * Reset the send state. Caller must hold c_send_lock when calling here.
+ * Reset the send state. Callers must ensure that this doesn't race with
+ * rds_send_xmit().
*/
void rds_send_reset(struct rds_connection *conn)
{
struct rds_message *rm, *tmp;
unsigned long flags;
- spin_lock_irqsave(&conn->c_send_lock, flags);
if (conn->c_xmit_rm) {
rm = conn->c_xmit_rm;
conn->c_xmit_rm = NULL;
* independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */
rds_message_unmapped(rm);
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
-
rds_message_put(rm);
- } else {
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
}
conn->c_xmit_sg = 0;
spin_unlock_irqrestore(&conn->c_lock, flags);
}
+static int acquire_in_xmit(struct rds_connection *conn)
+{
+ return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+}
+
+static void release_in_xmit(struct rds_connection *conn)
+{
+ clear_bit(RDS_IN_XMIT, &conn->c_flags);
+ smp_mb__after_clear_bit();
+ /*
+ * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+ * hot path and finding waiters is very rare. We don't want to walk
+ * the system-wide hashed waitqueue buckets in the fast path only to
+ * almost never find waiters.
+ */
+ if (waitqueue_active(&conn->c_waitq))
+ wake_up_all(&conn->c_waitq);
+}
+
/*
* We're making the concious trade-off here to only send one message
* down the connection at a time.
unsigned int tmp;
struct scatterlist *sg;
int ret = 0;
- int gen = 0;
LIST_HEAD(to_be_dropped);
restart:
- if (!rds_conn_up(conn))
- goto out;
/*
* sendmsg calls here after having queued its message on the send
* avoids blocking the caller and trading per-connection data between
* caches per message.
*/
- if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
+ if (!acquire_in_xmit(conn)) {
rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM;
goto out;
}
- atomic_inc(&conn->c_senders);
+
+ /*
+ * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
+ * we do the opposite to avoid races.
+ */
+ if (!rds_conn_up(conn)) {
+ release_in_xmit(conn);
+ ret = 0;
+ goto out;
+ }
if (conn->c_trans->xmit_prepare)
conn->c_trans->xmit_prepare(conn);
- gen = atomic_inc_return(&conn->c_send_generation);
-
/*
* spin trying to push headers and data down the connection until
* the connection doesn't make forward progress.
if (!rm) {
unsigned int len;
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
if (!list_empty(&conn->c_send_queue)) {
rm = list_entry(conn->c_send_queue.next,
list_move_tail(&rm->m_conn_item, &conn->c_retrans);
}
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
if (!rm)
break;
*/
if (rm->rdma.op_active &&
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move(&rm->m_conn_item, &to_be_dropped);
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
continue;
}
if (conn->c_trans->xmit_complete)
conn->c_trans->xmit_complete(conn);
- /*
- * We might be racing with another sender who queued a message but
- * backed off on noticing that we held the c_send_lock. If we check
- * for queued messages after dropping the sem then either we'll
- * see the queued message or the queuer will get the sem. If we
- * notice the queued message then we trigger an immediate retry.
- *
- * We need to be careful only to do this when we stopped processing
- * the send queue because it was empty. It's the only way we
- * stop processing the loop when the transport hasn't taken
- * responsibility for forward progress.
- */
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
+ release_in_xmit(conn);
/* Nuke any messages we decided not to retransmit. */
if (!list_empty(&to_be_dropped)) {
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
}
- atomic_dec(&conn->c_senders);
-
/*
- * Other senders will see we have c_send_lock and exit. We
- * need to recheck the send queue and race again for c_send_lock
- * to make sure messages don't just sit on the send queue, if
- * somebody hasn't already beat us into the loop.
+ * Other senders can queue a message after we last test the send queue
+ * but before we clear RDS_IN_XMIT. In that case they'd back off and
+ * not try and send their newly queued message. We need to check the
+ * send queue after having cleared RDS_IN_XMIT so that their message
+ * doesn't get stuck on the send queue.
*
* If the transport cannot continue (i.e ret != 0), then it must
* call us when more room is available, such as from the tx
smp_mb();
if (!list_empty(&conn->c_send_queue)) {
rds_stats_inc(s_send_lock_queue_raced);
- if (gen == atomic_read(&conn->c_send_generation)) {
- goto restart;
- }
+ goto restart;
}
}
out: