|
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
|
|
|
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
|
|
|
|
|
|
/*
|
|
|
- * Reset the send state. Caller must hold c_send_lock when calling here.
|
|
|
+ * Reset the send state. Callers must ensure that this doesn't race with
|
|
|
+ * rds_send_xmit().
|
|
|
*/
|
|
|
void rds_send_reset(struct rds_connection *conn)
|
|
|
{
|
|
|
struct rds_message *rm, *tmp;
|
|
|
unsigned long flags;
|
|
|
|
|
|
- spin_lock_irqsave(&conn->c_send_lock, flags);
|
|
|
if (conn->c_xmit_rm) {
|
|
|
rm = conn->c_xmit_rm;
|
|
|
conn->c_xmit_rm = NULL;
|
|
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
|
|
|
* independently) but as the connection is down, there's
|
|
|
* no ongoing RDMA to/from that memory */
|
|
|
rds_message_unmapped(rm);
|
|
|
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
|
|
|
-
|
|
|
rds_message_put(rm);
|
|
|
- } else {
|
|
|
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
|
|
|
}
|
|
|
|
|
|
conn->c_xmit_sg = 0;
|
|
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
|
|
|
spin_unlock_irqrestore(&conn->c_lock, flags);
|
|
|
}
|
|
|
|
|
|
+static int acquire_in_xmit(struct rds_connection *conn)
|
|
|
+{
|
|
|
+ return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
|
|
|
+}
|
|
|
+
|
|
|
+static void release_in_xmit(struct rds_connection *conn)
|
|
|
+{
|
|
|
+ clear_bit(RDS_IN_XMIT, &conn->c_flags);
|
|
|
+ smp_mb__after_clear_bit();
|
|
|
+ /*
|
|
|
+ * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
|
|
|
+ * hot path and finding waiters is very rare. We don't want to walk
|
|
|
+ * the system-wide hashed waitqueue buckets in the fast path only to
|
|
|
+ * almost never find waiters.
|
|
|
+ */
|
|
|
+ if (waitqueue_active(&conn->c_waitq))
|
|
|
+ wake_up_all(&conn->c_waitq);
|
|
|
+}
|
|
|
+
|
|
|
/*
|
|
|
* We're making the concious trade-off here to only send one message
|
|
|
* down the connection at a time.
|
|
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
|
|
|
unsigned int tmp;
|
|
|
struct scatterlist *sg;
|
|
|
int ret = 0;
|
|
|
- int gen = 0;
|
|
|
LIST_HEAD(to_be_dropped);
|
|
|
|
|
|
restart:
|
|
|
- if (!rds_conn_up(conn))
|
|
|
- goto out;
|
|
|
|
|
|
/*
|
|
|
* sendmsg calls here after having queued its message on the send
|
|
@@ -133,18 +145,25 @@ restart:
|
|
|
* avoids blocking the caller and trading per-connection data between
|
|
|
* caches per message.
|
|
|
*/
|
|
|
- if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
|
|
|
+ if (!acquire_in_xmit(conn)) {
|
|
|
rds_stats_inc(s_send_lock_contention);
|
|
|
ret = -ENOMEM;
|
|
|
goto out;
|
|
|
}
|
|
|
- atomic_inc(&conn->c_senders);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
|
|
|
+ * we do the opposite to avoid races.
|
|
|
+ */
|
|
|
+ if (!rds_conn_up(conn)) {
|
|
|
+ release_in_xmit(conn);
|
|
|
+ ret = 0;
|
|
|
+ goto out;
|
|
|
+ }
|
|
|
|
|
|
if (conn->c_trans->xmit_prepare)
|
|
|
conn->c_trans->xmit_prepare(conn);
|
|
|
|
|
|
- gen = atomic_inc_return(&conn->c_send_generation);
|
|
|
-
|
|
|
/*
|
|
|
* spin trying to push headers and data down the connection until
|
|
|
* the connection doesn't make forward progress.
|
|
@@ -178,7 +197,7 @@ restart:
|
|
|
if (!rm) {
|
|
|
unsigned int len;
|
|
|
|
|
|
- spin_lock(&conn->c_lock);
|
|
|
+ spin_lock_irqsave(&conn->c_lock, flags);
|
|
|
|
|
|
if (!list_empty(&conn->c_send_queue)) {
|
|
|
rm = list_entry(conn->c_send_queue.next,
|
|
@@ -193,7 +212,7 @@ restart:
|
|
|
list_move_tail(&rm->m_conn_item, &conn->c_retrans);
|
|
|
}
|
|
|
|
|
|
- spin_unlock(&conn->c_lock);
|
|
|
+ spin_unlock_irqrestore(&conn->c_lock, flags);
|
|
|
|
|
|
if (!rm)
|
|
|
break;
|
|
@@ -207,10 +226,10 @@ restart:
|
|
|
*/
|
|
|
if (rm->rdma.op_active &&
|
|
|
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
|
|
|
- spin_lock(&conn->c_lock);
|
|
|
+ spin_lock_irqsave(&conn->c_lock, flags);
|
|
|
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
|
|
|
list_move(&rm->m_conn_item, &to_be_dropped);
|
|
|
- spin_unlock(&conn->c_lock);
|
|
|
+ spin_unlock_irqrestore(&conn->c_lock, flags);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@@ -336,19 +355,7 @@ restart:
|
|
|
if (conn->c_trans->xmit_complete)
|
|
|
conn->c_trans->xmit_complete(conn);
|
|
|
|
|
|
- /*
|
|
|
- * We might be racing with another sender who queued a message but
|
|
|
- * backed off on noticing that we held the c_send_lock. If we check
|
|
|
- * for queued messages after dropping the sem then either we'll
|
|
|
- * see the queued message or the queuer will get the sem. If we
|
|
|
- * notice the queued message then we trigger an immediate retry.
|
|
|
- *
|
|
|
- * We need to be careful only to do this when we stopped processing
|
|
|
- * the send queue because it was empty. It's the only way we
|
|
|
- * stop processing the loop when the transport hasn't taken
|
|
|
- * responsibility for forward progress.
|
|
|
- */
|
|
|
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
|
|
|
+ release_in_xmit(conn);
|
|
|
|
|
|
/* Nuke any messages we decided not to retransmit. */
|
|
|
if (!list_empty(&to_be_dropped)) {
|
|
@@ -358,13 +365,12 @@ restart:
|
|
|
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
|
|
|
}
|
|
|
|
|
|
- atomic_dec(&conn->c_senders);
|
|
|
-
|
|
|
/*
|
|
|
- * Other senders will see we have c_send_lock and exit. We
|
|
|
- * need to recheck the send queue and race again for c_send_lock
|
|
|
- * to make sure messages don't just sit on the send queue, if
|
|
|
- * somebody hasn't already beat us into the loop.
|
|
|
+ * Other senders can queue a message after we last test the send queue
|
|
|
+ * but before we clear RDS_IN_XMIT. In that case they'd back off and
|
|
|
+ * not try and send their newly queued message. We need to check the
|
|
|
+ * send queue after having cleared RDS_IN_XMIT so that their message
|
|
|
+ * doesn't get stuck on the send queue.
|
|
|
*
|
|
|
* If the transport cannot continue (i.e ret != 0), then it must
|
|
|
* call us when more room is available, such as from the tx
|
|
@@ -374,9 +380,7 @@ restart:
|
|
|
smp_mb();
|
|
|
if (!list_empty(&conn->c_send_queue)) {
|
|
|
rds_stats_inc(s_send_lock_queue_raced);
|
|
|
- if (gen == atomic_read(&conn->c_send_generation)) {
|
|
|
- goto restart;
|
|
|
- }
|
|
|
+ goto restart;
|
|
|
}
|
|
|
}
|
|
|
out:
|