async-thread.c 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. /*
  2. * Copyright (C) 2007 Oracle. All rights reserved.
  3. *
  4. * This program is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU General Public
  6. * License v2 as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. * General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public
  14. * License along with this program; if not, write to the
  15. * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  16. * Boston, MA 021110-1307, USA.
  17. */
  18. #include <linux/kthread.h>
  19. #include <linux/list.h>
  20. #include <linux/spinlock.h>
  21. #include <linux/freezer.h>
  22. #include "async-thread.h"
  23. /*
  24. * container for the kthread task pointer and the list of pending work
  25. * One of these is allocated per thread.
  26. */
  27. struct btrfs_worker_thread {
  28. /* list of struct btrfs_work that are waiting for service */
  29. struct list_head pending;
  30. /* list of worker threads from struct btrfs_workers */
  31. struct list_head worker_list;
  32. /* kthread */
  33. struct task_struct *task;
  34. /* number of things on the pending list */
  35. atomic_t num_pending;
  36. /* protects the pending list. */
  37. spinlock_t lock;
  38. /* set to non-zero when this thread is already awake and kicking */
  39. int working;
  40. };
  41. /*
  42. * main loop for servicing work items
  43. */
  44. static int worker_loop(void *arg)
  45. {
  46. struct btrfs_worker_thread *worker = arg;
  47. struct list_head *cur;
  48. struct btrfs_work *work;
  49. do {
  50. spin_lock_irq(&worker->lock);
  51. while(!list_empty(&worker->pending)) {
  52. cur = worker->pending.next;
  53. work = list_entry(cur, struct btrfs_work, list);
  54. list_del(&work->list);
  55. clear_bit(0, &work->flags);
  56. work->worker = worker;
  57. spin_unlock_irq(&worker->lock);
  58. work->func(work);
  59. atomic_dec(&worker->num_pending);
  60. spin_lock_irq(&worker->lock);
  61. }
  62. worker->working = 0;
  63. if (freezing(current)) {
  64. refrigerator();
  65. } else {
  66. set_current_state(TASK_INTERRUPTIBLE);
  67. spin_unlock_irq(&worker->lock);
  68. schedule();
  69. __set_current_state(TASK_RUNNING);
  70. }
  71. } while (!kthread_should_stop());
  72. return 0;
  73. }
  74. /*
  75. * this will wait for all the worker threads to shutdown
  76. */
  77. int btrfs_stop_workers(struct btrfs_workers *workers)
  78. {
  79. struct list_head *cur;
  80. struct btrfs_worker_thread *worker;
  81. while(!list_empty(&workers->worker_list)) {
  82. cur = workers->worker_list.next;
  83. worker = list_entry(cur, struct btrfs_worker_thread,
  84. worker_list);
  85. kthread_stop(worker->task);
  86. list_del(&worker->worker_list);
  87. kfree(worker);
  88. }
  89. return 0;
  90. }
  91. /*
  92. * simple init on struct btrfs_workers
  93. */
  94. void btrfs_init_workers(struct btrfs_workers *workers, int max)
  95. {
  96. workers->num_workers = 0;
  97. INIT_LIST_HEAD(&workers->worker_list);
  98. workers->last = NULL;
  99. spin_lock_init(&workers->lock);
  100. workers->max_workers = max;
  101. }
  102. /*
  103. * starts new worker threads. This does not enforce the max worker
  104. * count in case you need to temporarily go past it.
  105. */
  106. int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
  107. {
  108. struct btrfs_worker_thread *worker;
  109. int ret = 0;
  110. int i;
  111. for (i = 0; i < num_workers; i++) {
  112. worker = kzalloc(sizeof(*worker), GFP_NOFS);
  113. if (!worker) {
  114. ret = -ENOMEM;
  115. goto fail;
  116. }
  117. INIT_LIST_HEAD(&worker->pending);
  118. INIT_LIST_HEAD(&worker->worker_list);
  119. spin_lock_init(&worker->lock);
  120. atomic_set(&worker->num_pending, 0);
  121. worker->task = kthread_run(worker_loop, worker, "btrfs");
  122. if (IS_ERR(worker->task)) {
  123. ret = PTR_ERR(worker->task);
  124. goto fail;
  125. }
  126. spin_lock_irq(&workers->lock);
  127. list_add_tail(&worker->worker_list, &workers->worker_list);
  128. workers->last = worker;
  129. workers->num_workers++;
  130. spin_unlock_irq(&workers->lock);
  131. }
  132. return 0;
  133. fail:
  134. btrfs_stop_workers(workers);
  135. return ret;
  136. }
  137. /*
  138. * run through the list and find a worker thread that doesn't have a lot
  139. * to do right now. This can return null if we aren't yet at the thread
  140. * count limit and all of the threads are busy.
  141. */
  142. static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
  143. {
  144. struct btrfs_worker_thread *worker;
  145. struct list_head *next;
  146. struct list_head *start;
  147. int enforce_min = workers->num_workers < workers->max_workers;
  148. /* start with the last thread if it isn't busy */
  149. worker = workers->last;
  150. if (atomic_read(&worker->num_pending) < 64)
  151. goto done;
  152. next = worker->worker_list.next;
  153. start = &worker->worker_list;
  154. /*
  155. * check all the workers for someone that is bored. FIXME, do
  156. * something smart here
  157. */
  158. while(next != start) {
  159. if (next == &workers->worker_list) {
  160. next = workers->worker_list.next;
  161. continue;
  162. }
  163. worker = list_entry(next, struct btrfs_worker_thread,
  164. worker_list);
  165. if (atomic_read(&worker->num_pending) < 64 || !enforce_min)
  166. goto done;
  167. next = next->next;
  168. }
  169. /*
  170. * nobody was bored, if we're already at the max thread count,
  171. * use the last thread
  172. */
  173. if (!enforce_min || atomic_read(&workers->last->num_pending) < 64) {
  174. return workers->last;
  175. }
  176. return NULL;
  177. done:
  178. workers->last = worker;
  179. return worker;
  180. }
  181. static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
  182. {
  183. struct btrfs_worker_thread *worker;
  184. unsigned long flags;
  185. again:
  186. spin_lock_irqsave(&workers->lock, flags);
  187. worker = next_worker(workers);
  188. spin_unlock_irqrestore(&workers->lock, flags);
  189. if (!worker) {
  190. spin_lock_irqsave(&workers->lock, flags);
  191. if (workers->num_workers >= workers->max_workers) {
  192. /*
  193. * we have failed to find any workers, just
  194. * return the force one
  195. */
  196. worker = list_entry(workers->worker_list.next,
  197. struct btrfs_worker_thread, worker_list);
  198. spin_unlock_irqrestore(&workers->lock, flags);
  199. } else {
  200. spin_unlock_irqrestore(&workers->lock, flags);
  201. /* we're below the limit, start another worker */
  202. btrfs_start_workers(workers, 1);
  203. goto again;
  204. }
  205. }
  206. return worker;
  207. }
  208. /*
  209. * btrfs_requeue_work just puts the work item back on the tail of the list
  210. * it was taken from. It is intended for use with long running work functions
  211. * that make some progress and want to give the cpu up for others.
  212. */
  213. int btrfs_requeue_work(struct btrfs_work *work)
  214. {
  215. struct btrfs_worker_thread *worker = work->worker;
  216. unsigned long flags;
  217. if (test_and_set_bit(0, &work->flags))
  218. goto out;
  219. spin_lock_irqsave(&worker->lock, flags);
  220. atomic_inc(&worker->num_pending);
  221. list_add_tail(&work->list, &worker->pending);
  222. spin_unlock_irqrestore(&worker->lock, flags);
  223. out:
  224. return 0;
  225. }
  226. /*
  227. * places a struct btrfs_work into the pending queue of one of the kthreads
  228. */
  229. int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
  230. {
  231. struct btrfs_worker_thread *worker;
  232. unsigned long flags;
  233. int wake = 0;
  234. /* don't requeue something already on a list */
  235. if (test_and_set_bit(0, &work->flags))
  236. goto out;
  237. worker = find_worker(workers);
  238. spin_lock_irqsave(&worker->lock, flags);
  239. atomic_inc(&worker->num_pending);
  240. list_add_tail(&work->list, &worker->pending);
  241. /*
  242. * avoid calling into wake_up_process if this thread has already
  243. * been kicked
  244. */
  245. if (!worker->working)
  246. wake = 1;
  247. worker->working = 1;
  248. spin_unlock_irqrestore(&worker->lock, flags);
  249. if (wake)
  250. wake_up_process(worker->task);
  251. out:
  252. return 0;
  253. }