Pārlūkot izejas kodu

RDS/IB+IW: Move recv processing to a tasklet

Move receive processing from event handler to a tasklet.
This should help prevent hangcheck timer from going off
when RDS is under heavy load.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
Andy Grover 15 gadi atpakaļ
vecāks
revīzija
d521b63b27
6 mainītis faili ar 52 papildinājumiem un 12 dzēšanām
  1. 2 0
      net/rds/ib.h
  2. 2 0
      net/rds/ib_cm.c
  3. 22 6
      net/rds/ib_recv.c
  4. 2 0
      net/rds/iw.h
  5. 2 0
      net/rds/iw_cm.c
  6. 22 6
      net/rds/iw_recv.c

+ 2 - 0
net/rds/ib.h

@@ -98,6 +98,7 @@ struct rds_ib_connection {
 	struct rds_ib_send_work *i_sends;
 	struct rds_ib_send_work *i_sends;
 
 
 	/* rx */
 	/* rx */
+	struct tasklet_struct	i_recv_tasklet;
 	struct mutex		i_recv_mutex;
 	struct mutex		i_recv_mutex;
 	struct rds_ib_work_ring	i_recv_ring;
 	struct rds_ib_work_ring	i_recv_ring;
 	struct rds_ib_incoming	*i_ibinc;
 	struct rds_ib_incoming	*i_ibinc;
@@ -303,6 +304,7 @@ void rds_ib_inc_free(struct rds_incoming *inc);
 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
 			     size_t size);
 			     size_t size);
 void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context);
 void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context);
+void rds_ib_recv_tasklet_fn(unsigned long data);
 void rds_ib_recv_init_ring(struct rds_ib_connection *ic);
 void rds_ib_recv_init_ring(struct rds_ib_connection *ic);
 void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
 void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
 void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
 void rds_ib_recv_init_ack(struct rds_ib_connection *ic);

+ 2 - 0
net/rds/ib_cm.c

@@ -694,6 +694,8 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 		return -ENOMEM;
 		return -ENOMEM;
 
 
 	INIT_LIST_HEAD(&ic->ib_node);
 	INIT_LIST_HEAD(&ic->ib_node);
+	tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn,
+		     (unsigned long) ic);
 	mutex_init(&ic->i_recv_mutex);
 	mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
 #ifndef KERNEL_HAS_ATOMIC64
 	spin_lock_init(&ic->i_ack_lock);
 	spin_lock_init(&ic->i_ack_lock);

+ 22 - 6
net/rds/ib_recv.c

@@ -825,17 +825,22 @@ void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 {
 {
 	struct rds_connection *conn = context;
 	struct rds_connection *conn = context;
 	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct rds_ib_connection *ic = conn->c_transport_data;
-	struct ib_wc wc;
-	struct rds_ib_ack_state state = { 0, };
-	struct rds_ib_recv_work *recv;
 
 
 	rdsdebug("conn %p cq %p\n", conn, cq);
 	rdsdebug("conn %p cq %p\n", conn, cq);
 
 
 	rds_ib_stats_inc(s_ib_rx_cq_call);
 	rds_ib_stats_inc(s_ib_rx_cq_call);
 
 
-	ib_req_notify_cq(cq, IB_CQ_SOLICITED);
+	tasklet_schedule(&ic->i_recv_tasklet);
+}
 
 
-	while (ib_poll_cq(cq, 1, &wc) > 0) {
+static inline void rds_poll_cq(struct rds_ib_connection *ic,
+			       struct rds_ib_ack_state *state)
+{
+	struct rds_connection *conn = ic->conn;
+	struct ib_wc wc;
+	struct rds_ib_recv_work *recv;
+
+	while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
 		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
 			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
 			 be32_to_cpu(wc.ex.imm_data));
 			 be32_to_cpu(wc.ex.imm_data));
@@ -853,7 +858,7 @@ void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 		if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
 		if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
 			/* We expect errors as the qp is drained during shutdown */
 			/* We expect errors as the qp is drained during shutdown */
 			if (wc.status == IB_WC_SUCCESS) {
 			if (wc.status == IB_WC_SUCCESS) {
-				rds_ib_process_recv(conn, recv, wc.byte_len, &state);
+				rds_ib_process_recv(conn, recv, wc.byte_len, state);
 			} else {
 			} else {
 				rds_ib_conn_error(conn, "recv completion on "
 				rds_ib_conn_error(conn, "recv completion on "
 				       "%pI4 had status %u, disconnecting and "
 				       "%pI4 had status %u, disconnecting and "
@@ -864,6 +869,17 @@ void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 
 
 		rds_ib_ring_free(&ic->i_recv_ring, 1);
 		rds_ib_ring_free(&ic->i_recv_ring, 1);
 	}
 	}
+}
+
+void rds_ib_recv_tasklet_fn(unsigned long data)
+{
+	struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
+	struct rds_connection *conn = ic->conn;
+	struct rds_ib_ack_state state = { 0, };
+
+	rds_poll_cq(ic, &state);
+	ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
+	rds_poll_cq(ic, &state);
 
 
 	if (state.ack_next_valid)
 	if (state.ack_next_valid)
 		rds_ib_set_ack(ic, state.ack_next, state.ack_required);
 		rds_ib_set_ack(ic, state.ack_next, state.ack_required);

+ 2 - 0
net/rds/iw.h

@@ -119,6 +119,7 @@ struct rds_iw_connection {
 	struct rds_iw_send_work *i_sends;
 	struct rds_iw_send_work *i_sends;
 
 
 	/* rx */
 	/* rx */
+	struct tasklet_struct	i_recv_tasklet;
 	struct mutex		i_recv_mutex;
 	struct mutex		i_recv_mutex;
 	struct rds_iw_work_ring	i_recv_ring;
 	struct rds_iw_work_ring	i_recv_ring;
 	struct rds_iw_incoming	*i_iwinc;
 	struct rds_iw_incoming	*i_iwinc;
@@ -330,6 +331,7 @@ void rds_iw_inc_free(struct rds_incoming *inc);
 int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
 int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
 			     size_t size);
 			     size_t size);
 void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context);
 void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context);
+void rds_iw_recv_tasklet_fn(unsigned long data);
 void rds_iw_recv_init_ring(struct rds_iw_connection *ic);
 void rds_iw_recv_init_ring(struct rds_iw_connection *ic);
 void rds_iw_recv_clear_ring(struct rds_iw_connection *ic);
 void rds_iw_recv_clear_ring(struct rds_iw_connection *ic);
 void rds_iw_recv_init_ack(struct rds_iw_connection *ic);
 void rds_iw_recv_init_ack(struct rds_iw_connection *ic);

+ 2 - 0
net/rds/iw_cm.c

@@ -696,6 +696,8 @@ int rds_iw_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 		return -ENOMEM;
 		return -ENOMEM;
 
 
 	INIT_LIST_HEAD(&ic->iw_node);
 	INIT_LIST_HEAD(&ic->iw_node);
+	tasklet_init(&ic->i_recv_tasklet, rds_iw_recv_tasklet_fn,
+		     (unsigned long) ic);
 	mutex_init(&ic->i_recv_mutex);
 	mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
 #ifndef KERNEL_HAS_ATOMIC64
 	spin_lock_init(&ic->i_ack_lock);
 	spin_lock_init(&ic->i_ack_lock);

+ 22 - 6
net/rds/iw_recv.c

@@ -784,17 +784,22 @@ void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 {
 {
 	struct rds_connection *conn = context;
 	struct rds_connection *conn = context;
 	struct rds_iw_connection *ic = conn->c_transport_data;
 	struct rds_iw_connection *ic = conn->c_transport_data;
-	struct ib_wc wc;
-	struct rds_iw_ack_state state = { 0, };
-	struct rds_iw_recv_work *recv;
 
 
 	rdsdebug("conn %p cq %p\n", conn, cq);
 	rdsdebug("conn %p cq %p\n", conn, cq);
 
 
 	rds_iw_stats_inc(s_iw_rx_cq_call);
 	rds_iw_stats_inc(s_iw_rx_cq_call);
 
 
-	ib_req_notify_cq(cq, IB_CQ_SOLICITED);
+	tasklet_schedule(&ic->i_recv_tasklet);
+}
 
 
-	while (ib_poll_cq(cq, 1, &wc) > 0) {
+static inline void rds_poll_cq(struct rds_iw_connection *ic,
+			       struct rds_iw_ack_state *state)
+{
+	struct rds_connection *conn = ic->conn;
+	struct ib_wc wc;
+	struct rds_iw_recv_work *recv;
+
+	while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
 		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
 			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
 			 be32_to_cpu(wc.ex.imm_data));
 			 be32_to_cpu(wc.ex.imm_data));
@@ -812,7 +817,7 @@ void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 		if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
 		if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
 			/* We expect errors as the qp is drained during shutdown */
 			/* We expect errors as the qp is drained during shutdown */
 			if (wc.status == IB_WC_SUCCESS) {
 			if (wc.status == IB_WC_SUCCESS) {
-				rds_iw_process_recv(conn, recv, wc.byte_len, &state);
+				rds_iw_process_recv(conn, recv, wc.byte_len, state);
 			} else {
 			} else {
 				rds_iw_conn_error(conn, "recv completion on "
 				rds_iw_conn_error(conn, "recv completion on "
 				       "%pI4 had status %u, disconnecting and "
 				       "%pI4 had status %u, disconnecting and "
@@ -823,6 +828,17 @@ void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 
 
 		rds_iw_ring_free(&ic->i_recv_ring, 1);
 		rds_iw_ring_free(&ic->i_recv_ring, 1);
 	}
 	}
+}
+
+void rds_iw_recv_tasklet_fn(unsigned long data)
+{
+	struct rds_iw_connection *ic = (struct rds_iw_connection *) data;
+	struct rds_connection *conn = ic->conn;
+	struct rds_iw_ack_state state = { 0, };
+
+	rds_poll_cq(ic, &state);
+	ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
+	rds_poll_cq(ic, &state);
 
 
 	if (state.ack_next_valid)
 	if (state.ack_next_valid)
 		rds_iw_set_ack(ic, state.ack_next, state.ack_required);
 		rds_iw_set_ack(ic, state.ack_next, state.ack_required);