Browse Source

drbd: allow to dequeue batches of work at a time

cherry-picked and adapted from drbd 9 devel branch

In 8.4, we still use drbd_queue_work_front(),
so in normal operation, we can not dequeue batches,
but only single items.

Still, followup commits will wake the worker
without explicitly queueing a work item,
so up() is replaced by a simple wake_up().

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Lars Ellenberg 13 years ago
parent
commit
8c0785a5c9
3 changed files with 43 additions and 55 deletions
  1. 3 5
      drivers/block/drbd/drbd_int.h
  2. 1 1
      drivers/block/drbd/drbd_main.c
  3. 39 49
      drivers/block/drbd/drbd_worker.c

+ 3 - 5
drivers/block/drbd/drbd_int.h

@@ -735,8 +735,8 @@ enum bm_flag {
 
 struct drbd_work_queue {
 	struct list_head q;
-	struct semaphore s; /* producers up it, worker down()s it */
 	spinlock_t q_lock;  /* to protect the list. */
+	wait_queue_head_t q_wait;
 };
 
 struct drbd_socket {
@@ -1832,9 +1832,8 @@ drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
 	unsigned long flags;
 	spin_lock_irqsave(&q->q_lock, flags);
 	list_add(&w->list, &q->q);
-	up(&q->s); /* within the spinlock,
-		      see comment near end of drbd_worker() */
 	spin_unlock_irqrestore(&q->q_lock, flags);
+	wake_up(&q->q_wait);
 }
 
 static inline void
@@ -1843,9 +1842,8 @@ drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
 	unsigned long flags;
 	spin_lock_irqsave(&q->q_lock, flags);
 	list_add_tail(&w->list, &q->q);
-	up(&q->s); /* within the spinlock,
-		      see comment near end of drbd_worker() */
 	spin_unlock_irqrestore(&q->q_lock, flags);
+	wake_up(&q->q_wait);
 }
 
 static inline void wake_asender(struct drbd_tconn *tconn)

+ 1 - 1
drivers/block/drbd/drbd_main.c

@@ -2535,9 +2535,9 @@ out:
 
 static void drbd_init_workqueue(struct drbd_work_queue* wq)
 {
-	sema_init(&wq->s, 0);
 	spin_lock_init(&wq->q_lock);
 	INIT_LIST_HEAD(&wq->q);
+	init_waitqueue_head(&wq->q_wait);
 }
 
 struct drbd_tconn *conn_get_by_name(const char *name)

+ 39 - 49
drivers/block/drbd/drbd_worker.c

@@ -1673,6 +1673,23 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
 	mutex_unlock(mdev->state_mutex);
 }
 
+bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
+{
+	spin_lock_irq(&queue->q_lock);
+	list_splice_init(&queue->q, work_list);
+	spin_unlock_irq(&queue->q_lock);
+	return !list_empty(work_list);
+}
+
+bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
+{
+	spin_lock_irq(&queue->q_lock);
+	if (!list_empty(&queue->q))
+		list_move(queue->q.next, work_list);
+	spin_unlock_irq(&queue->q_lock);
+	return !list_empty(work_list);
+}
+
 int drbd_worker(struct drbd_thread *thi)
 {
 	struct drbd_tconn *tconn = thi->tconn;
@@ -1680,15 +1697,21 @@ int drbd_worker(struct drbd_thread *thi)
 	struct drbd_conf *mdev;
 	struct net_conf *nc;
 	LIST_HEAD(work_list);
-	int vnr, intr = 0;
+	int vnr;
 	int cork;
 
 	while (get_t_state(thi) == RUNNING) {
 		drbd_thread_current_set_cpu(thi);
 
-		if (down_trylock(&tconn->data.work.s)) {
-			mutex_lock(&tconn->data.mutex);
+		/* as long as we use drbd_queue_work_front(),
+		 * we may only dequeue single work items here, not batches. */
+		if (list_empty(&work_list))
+			dequeue_work_item(&tconn->data.work, &work_list);
 
+		/* Still nothing to do? Poke TCP, just in case,
+		 * then wait for new work (or signal). */
+		if (list_empty(&work_list)) {
+			mutex_lock(&tconn->data.mutex);
 			rcu_read_lock();
 			nc = rcu_dereference(tconn->net_conf);
 			cork = nc ? nc->tcp_cork : 0;
@@ -1698,15 +1721,16 @@ int drbd_worker(struct drbd_thread *thi)
 				drbd_tcp_uncork(tconn->data.socket);
 			mutex_unlock(&tconn->data.mutex);
 
-			intr = down_interruptible(&tconn->data.work.s);
+			wait_event_interruptible(tconn->data.work.q_wait,
+				dequeue_work_item(&tconn->data.work, &work_list));
 
 			mutex_lock(&tconn->data.mutex);
-			if (tconn->data.socket  && cork)
+			if (tconn->data.socket && cork)
 				drbd_tcp_cork(tconn->data.socket);
 			mutex_unlock(&tconn->data.mutex);
 		}
 
-		if (intr) {
+		if (signal_pending(current)) {
 			flush_signals(current);
 			if (get_t_state(thi) == RUNNING) {
 				conn_warn(tconn, "Worker got an unexpected signal\n");
@@ -1717,59 +1741,25 @@ int drbd_worker(struct drbd_thread *thi)
 
 		if (get_t_state(thi) != RUNNING)
 			break;
-		/* With this break, we have done a down() but not consumed
-		   the entry from the list. The cleanup code takes care of
-		   this...   */
-
-		w = NULL;
-		spin_lock_irq(&tconn->data.work.q_lock);
-		if (list_empty(&tconn->data.work.q)) {
-			/* something terribly wrong in our logic.
-			 * we were able to down() the semaphore,
-			 * but the list is empty... doh.
-			 *
-			 * what is the best thing to do now?
-			 * try again from scratch, restarting the receiver,
-			 * asender, whatnot? could break even more ugly,
-			 * e.g. when we are primary, but no good local data.
-			 *
-			 * I'll try to get away just starting over this loop.
-			 */
-			conn_warn(tconn, "Work list unexpectedly empty\n");
-			spin_unlock_irq(&tconn->data.work.q_lock);
-			continue;
-		}
-		w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
-		list_del_init(&w->list);
-		spin_unlock_irq(&tconn->data.work.q_lock);
 
-		if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
-			/* dev_warn(DEV, "worker: a callback failed! \n"); */
+		while (!list_empty(&work_list)) {
+			w = list_first_entry(&work_list, struct drbd_work, list);
+			list_del_init(&w->list);
+			if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
+				continue;
 			if (tconn->cstate >= C_WF_REPORT_PARAMS)
 				conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
 		}
 	}
 
-	spin_lock_irq(&tconn->data.work.q_lock);
-	while (!list_empty(&tconn->data.work.q)) {
-		list_splice_init(&tconn->data.work.q, &work_list);
-		spin_unlock_irq(&tconn->data.work.q_lock);
-
+	do {
 		while (!list_empty(&work_list)) {
-			w = list_entry(work_list.next, struct drbd_work, list);
+			w = list_first_entry(&work_list, struct drbd_work, list);
 			list_del_init(&w->list);
 			w->cb(w, 1);
 		}
-
-		spin_lock_irq(&tconn->data.work.q_lock);
-	}
-	sema_init(&tconn->data.work.s, 0);
-	/* DANGEROUS race: if someone did queue his work within the spinlock,
-	 * but up() ed outside the spinlock, we could get an up() on the
-	 * semaphore without corresponding list entry.
-	 * So don't do that.
-	 */
-	spin_unlock_irq(&tconn->data.work.q_lock);
+		dequeue_work_batch(&tconn->data.work, &work_list);
+	} while (!list_empty(&work_list));
 
 	rcu_read_lock();
 	idr_for_each_entry(&tconn->volumes, mdev, vnr) {