Explorar o código

RDS: Use a generation counter to avoid rds_send_xmit loop

rds_send_xmit is required to loop around after it releases the lock
because someone else could done a trylock, found someone working on the
list and backed off.

But, once we drop our lock, it is possible that someone else does come
in and make progress on the list.  We should detect this and not loop
around if another process is actually working on the list.

This patch adds a generation counter that is bumped every time we
get the lock and do some send work.  If the retry notices someone else
has bumped the generation counter, it does not need to loop around and
continue working.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
Signed-off-by: Andy Grover <andy.grover@oracle.com>
Chris Mason %!s(int64=15) %!d(string=hai) anos
pai
achega
9e29db0e36
Modificáronse 3 ficheiros con 9 adicións e 4 borrados
  1. 1 0
      net/rds/connection.c
  2. 1 0
      net/rds/rds.h
  3. 7 4
      net/rds/send.c

+ 1 - 0
net/rds/connection.c

@@ -147,6 +147,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
 	conn->c_next_tx_seq = 1;
 	conn->c_next_tx_seq = 1;
 
 
 	spin_lock_init(&conn->c_send_lock);
 	spin_lock_init(&conn->c_send_lock);
+	atomic_set(&conn->c_send_generation, 1);
 	INIT_LIST_HEAD(&conn->c_send_queue);
 	INIT_LIST_HEAD(&conn->c_send_queue);
 	INIT_LIST_HEAD(&conn->c_retrans);
 	INIT_LIST_HEAD(&conn->c_retrans);
 
 

+ 1 - 0
net/rds/rds.h

@@ -92,6 +92,7 @@ struct rds_connection {
 	struct rds_cong_map	*c_fcong;
 	struct rds_cong_map	*c_fcong;
 
 
 	spinlock_t		c_send_lock;	/* protect send ring */
 	spinlock_t		c_send_lock;	/* protect send ring */
+	atomic_t		c_send_generation;
 	struct rds_message	*c_xmit_rm;
 	struct rds_message	*c_xmit_rm;
 	unsigned long		c_xmit_sg;
 	unsigned long		c_xmit_sg;
 	unsigned int		c_xmit_hdr_off;
 	unsigned int		c_xmit_hdr_off;

+ 7 - 4
net/rds/send.c

@@ -112,6 +112,7 @@ int rds_send_xmit(struct rds_connection *conn)
 	unsigned int tmp;
 	unsigned int tmp;
 	struct scatterlist *sg;
 	struct scatterlist *sg;
 	int ret = 0;
 	int ret = 0;
+	int gen = 0;
 	LIST_HEAD(to_be_dropped);
 	LIST_HEAD(to_be_dropped);
 
 
 restart:
 restart:
@@ -134,6 +135,8 @@ restart:
 	if (conn->c_trans->xmit_prepare)
 	if (conn->c_trans->xmit_prepare)
 		conn->c_trans->xmit_prepare(conn);
 		conn->c_trans->xmit_prepare(conn);
 
 
+	gen = atomic_inc_return(&conn->c_send_generation);
+
 	/*
 	/*
 	 * spin trying to push headers and data down the connection until
 	 * spin trying to push headers and data down the connection until
 	 * the connection doesn't make forward progress.
 	 * the connection doesn't make forward progress.
@@ -359,13 +362,13 @@ restart:
 	if (ret == 0) {
 	if (ret == 0) {
 		/* A simple bit test would be way faster than taking the
 		/* A simple bit test would be way faster than taking the
 		 * spin lock */
 		 * spin lock */
-		spin_lock_irqsave(&conn->c_lock, flags);
+		smp_mb();
 		if (!list_empty(&conn->c_send_queue)) {
 		if (!list_empty(&conn->c_send_queue)) {
 			rds_stats_inc(s_send_lock_queue_raced);
 			rds_stats_inc(s_send_lock_queue_raced);
-			spin_unlock_irqrestore(&conn->c_lock, flags);
-			goto restart;
+			if (gen == atomic_read(&conn->c_send_generation)) {
+				goto restart;
+			}
 		}
 		}
-		spin_unlock_irqrestore(&conn->c_lock, flags);
 	}
 	}
 out:
 out:
 	return ret;
 	return ret;