async-thread.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. /*
  2. * Copyright (C) 2007 Oracle. All rights reserved.
  3. *
  4. * This program is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU General Public
  6. * License v2 as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. * General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public
  14. * License along with this program; if not, write to the
  15. * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  16. * Boston, MA 021110-1307, USA.
  17. */
  18. #include <linux/kthread.h>
  19. #include <linux/slab.h>
  20. #include <linux/list.h>
  21. #include <linux/spinlock.h>
  22. #include <linux/freezer.h>
  23. #include "async-thread.h"
  24. #define WORK_QUEUED_BIT 0
  25. #define WORK_DONE_BIT 1
  26. #define WORK_ORDER_DONE_BIT 2
  27. #define WORK_HIGH_PRIO_BIT 3
  28. /*
  29. * container for the kthread task pointer and the list of pending work
  30. * One of these is allocated per thread.
  31. */
  32. struct btrfs_worker_thread {
  33. /* pool we belong to */
  34. struct btrfs_workers *workers;
  35. /* list of struct btrfs_work that are waiting for service */
  36. struct list_head pending;
  37. struct list_head prio_pending;
  38. /* list of worker threads from struct btrfs_workers */
  39. struct list_head worker_list;
  40. /* kthread */
  41. struct task_struct *task;
  42. /* number of things on the pending list */
  43. atomic_t num_pending;
  44. /* reference counter for this struct */
  45. atomic_t refs;
  46. unsigned long sequence;
  47. /* protects the pending list. */
  48. spinlock_t lock;
  49. /* set to non-zero when this thread is already awake and kicking */
  50. int working;
  51. /* are we currently idle */
  52. int idle;
  53. };
  54. /*
  55. * btrfs_start_workers uses kthread_run, which can block waiting for memory
  56. * for a very long time. It will actually throttle on page writeback,
  57. * and so it may not make progress until after our btrfs worker threads
  58. * process all of the pending work structs in their queue
  59. *
  60. * This means we can't use btrfs_start_workers from inside a btrfs worker
  61. * thread that is used as part of cleaning dirty memory, which pretty much
  62. * involves all of the worker threads.
  63. *
  64. * Instead we have a helper queue who never has more than one thread
  65. * where we scheduler thread start operations. This worker_start struct
  66. * is used to contain the work and hold a pointer to the queue that needs
  67. * another worker.
  68. */
  69. struct worker_start {
  70. struct btrfs_work work;
  71. struct btrfs_workers *queue;
  72. };
  73. static void start_new_worker_func(struct btrfs_work *work)
  74. {
  75. struct worker_start *start;
  76. start = container_of(work, struct worker_start, work);
  77. btrfs_start_workers(start->queue, 1);
  78. kfree(start);
  79. }
  80. static int start_new_worker(struct btrfs_workers *queue)
  81. {
  82. struct worker_start *start;
  83. int ret;
  84. start = kzalloc(sizeof(*start), GFP_NOFS);
  85. if (!start)
  86. return -ENOMEM;
  87. start->work.func = start_new_worker_func;
  88. start->queue = queue;
  89. ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
  90. if (ret)
  91. kfree(start);
  92. return ret;
  93. }
  94. /*
  95. * helper function to move a thread onto the idle list after it
  96. * has finished some requests.
  97. */
  98. static void check_idle_worker(struct btrfs_worker_thread *worker)
  99. {
  100. if (!worker->idle && atomic_read(&worker->num_pending) <
  101. worker->workers->idle_thresh / 2) {
  102. unsigned long flags;
  103. spin_lock_irqsave(&worker->workers->lock, flags);
  104. worker->idle = 1;
  105. /* the list may be empty if the worker is just starting */
  106. if (!list_empty(&worker->worker_list)) {
  107. list_move(&worker->worker_list,
  108. &worker->workers->idle_list);
  109. }
  110. spin_unlock_irqrestore(&worker->workers->lock, flags);
  111. }
  112. }
  113. /*
  114. * helper function to move a thread off the idle list after new
  115. * pending work is added.
  116. */
  117. static void check_busy_worker(struct btrfs_worker_thread *worker)
  118. {
  119. if (worker->idle && atomic_read(&worker->num_pending) >=
  120. worker->workers->idle_thresh) {
  121. unsigned long flags;
  122. spin_lock_irqsave(&worker->workers->lock, flags);
  123. worker->idle = 0;
  124. if (!list_empty(&worker->worker_list)) {
  125. list_move_tail(&worker->worker_list,
  126. &worker->workers->worker_list);
  127. }
  128. spin_unlock_irqrestore(&worker->workers->lock, flags);
  129. }
  130. }
  131. static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
  132. {
  133. struct btrfs_workers *workers = worker->workers;
  134. unsigned long flags;
  135. rmb();
  136. if (!workers->atomic_start_pending)
  137. return;
  138. spin_lock_irqsave(&workers->lock, flags);
  139. if (!workers->atomic_start_pending)
  140. goto out;
  141. workers->atomic_start_pending = 0;
  142. if (workers->num_workers + workers->num_workers_starting >=
  143. workers->max_workers)
  144. goto out;
  145. workers->num_workers_starting += 1;
  146. spin_unlock_irqrestore(&workers->lock, flags);
  147. start_new_worker(workers);
  148. return;
  149. out:
  150. spin_unlock_irqrestore(&workers->lock, flags);
  151. }
  152. static noinline int run_ordered_completions(struct btrfs_workers *workers,
  153. struct btrfs_work *work)
  154. {
  155. if (!workers->ordered)
  156. return 0;
  157. set_bit(WORK_DONE_BIT, &work->flags);
  158. spin_lock(&workers->order_lock);
  159. while (1) {
  160. if (!list_empty(&workers->prio_order_list)) {
  161. work = list_entry(workers->prio_order_list.next,
  162. struct btrfs_work, order_list);
  163. } else if (!list_empty(&workers->order_list)) {
  164. work = list_entry(workers->order_list.next,
  165. struct btrfs_work, order_list);
  166. } else {
  167. break;
  168. }
  169. if (!test_bit(WORK_DONE_BIT, &work->flags))
  170. break;
  171. /* we are going to call the ordered done function, but
  172. * we leave the work item on the list as a barrier so
  173. * that later work items that are done don't have their
  174. * functions called before this one returns
  175. */
  176. if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
  177. break;
  178. spin_unlock(&workers->order_lock);
  179. work->ordered_func(work);
  180. /* now take the lock again and call the freeing code */
  181. spin_lock(&workers->order_lock);
  182. list_del(&work->order_list);
  183. work->ordered_free(work);
  184. }
  185. spin_unlock(&workers->order_lock);
  186. return 0;
  187. }
  188. static void put_worker(struct btrfs_worker_thread *worker)
  189. {
  190. if (atomic_dec_and_test(&worker->refs))
  191. kfree(worker);
  192. }
  193. static int try_worker_shutdown(struct btrfs_worker_thread *worker)
  194. {
  195. int freeit = 0;
  196. spin_lock_irq(&worker->lock);
  197. spin_lock(&worker->workers->lock);
  198. if (worker->workers->num_workers > 1 &&
  199. worker->idle &&
  200. !worker->working &&
  201. !list_empty(&worker->worker_list) &&
  202. list_empty(&worker->prio_pending) &&
  203. list_empty(&worker->pending) &&
  204. atomic_read(&worker->num_pending) == 0) {
  205. freeit = 1;
  206. list_del_init(&worker->worker_list);
  207. worker->workers->num_workers--;
  208. }
  209. spin_unlock(&worker->workers->lock);
  210. spin_unlock_irq(&worker->lock);
  211. if (freeit)
  212. put_worker(worker);
  213. return freeit;
  214. }
  215. static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
  216. struct list_head *prio_head,
  217. struct list_head *head)
  218. {
  219. struct btrfs_work *work = NULL;
  220. struct list_head *cur = NULL;
  221. if(!list_empty(prio_head))
  222. cur = prio_head->next;
  223. smp_mb();
  224. if (!list_empty(&worker->prio_pending))
  225. goto refill;
  226. if (!list_empty(head))
  227. cur = head->next;
  228. if (cur)
  229. goto out;
  230. refill:
  231. spin_lock_irq(&worker->lock);
  232. list_splice_tail_init(&worker->prio_pending, prio_head);
  233. list_splice_tail_init(&worker->pending, head);
  234. if (!list_empty(prio_head))
  235. cur = prio_head->next;
  236. else if (!list_empty(head))
  237. cur = head->next;
  238. spin_unlock_irq(&worker->lock);
  239. if (!cur)
  240. goto out_fail;
  241. out:
  242. work = list_entry(cur, struct btrfs_work, list);
  243. out_fail:
  244. return work;
  245. }
  246. /*
  247. * main loop for servicing work items
  248. */
  249. static int worker_loop(void *arg)
  250. {
  251. struct btrfs_worker_thread *worker = arg;
  252. struct list_head head;
  253. struct list_head prio_head;
  254. struct btrfs_work *work;
  255. INIT_LIST_HEAD(&head);
  256. INIT_LIST_HEAD(&prio_head);
  257. do {
  258. again:
  259. while (1) {
  260. work = get_next_work(worker, &prio_head, &head);
  261. if (!work)
  262. break;
  263. list_del(&work->list);
  264. clear_bit(WORK_QUEUED_BIT, &work->flags);
  265. work->worker = worker;
  266. work->func(work);
  267. atomic_dec(&worker->num_pending);
  268. /*
  269. * unless this is an ordered work queue,
  270. * 'work' was probably freed by func above.
  271. */
  272. run_ordered_completions(worker->workers, work);
  273. check_pending_worker_creates(worker);
  274. cond_resched();
  275. }
  276. spin_lock_irq(&worker->lock);
  277. check_idle_worker(worker);
  278. if (freezing(current)) {
  279. worker->working = 0;
  280. spin_unlock_irq(&worker->lock);
  281. refrigerator();
  282. } else {
  283. spin_unlock_irq(&worker->lock);
  284. if (!kthread_should_stop()) {
  285. cpu_relax();
  286. /*
  287. * we've dropped the lock, did someone else
  288. * jump_in?
  289. */
  290. smp_mb();
  291. if (!list_empty(&worker->pending) ||
  292. !list_empty(&worker->prio_pending))
  293. continue;
  294. /*
  295. * this short schedule allows more work to
  296. * come in without the queue functions
  297. * needing to go through wake_up_process()
  298. *
  299. * worker->working is still 1, so nobody
  300. * is going to try and wake us up
  301. */
  302. schedule_timeout(1);
  303. smp_mb();
  304. if (!list_empty(&worker->pending) ||
  305. !list_empty(&worker->prio_pending))
  306. continue;
  307. if (kthread_should_stop())
  308. break;
  309. /* still no more work?, sleep for real */
  310. spin_lock_irq(&worker->lock);
  311. set_current_state(TASK_INTERRUPTIBLE);
  312. if (!list_empty(&worker->pending) ||
  313. !list_empty(&worker->prio_pending)) {
  314. spin_unlock_irq(&worker->lock);
  315. set_current_state(TASK_RUNNING);
  316. goto again;
  317. }
  318. /*
  319. * this makes sure we get a wakeup when someone
  320. * adds something new to the queue
  321. */
  322. worker->working = 0;
  323. spin_unlock_irq(&worker->lock);
  324. if (!kthread_should_stop()) {
  325. schedule_timeout(HZ * 120);
  326. if (!worker->working &&
  327. try_worker_shutdown(worker)) {
  328. return 0;
  329. }
  330. }
  331. }
  332. __set_current_state(TASK_RUNNING);
  333. }
  334. } while (!kthread_should_stop());
  335. return 0;
  336. }
  337. /*
  338. * this will wait for all the worker threads to shutdown
  339. */
  340. int btrfs_stop_workers(struct btrfs_workers *workers)
  341. {
  342. struct list_head *cur;
  343. struct btrfs_worker_thread *worker;
  344. int can_stop;
  345. spin_lock_irq(&workers->lock);
  346. list_splice_init(&workers->idle_list, &workers->worker_list);
  347. while (!list_empty(&workers->worker_list)) {
  348. cur = workers->worker_list.next;
  349. worker = list_entry(cur, struct btrfs_worker_thread,
  350. worker_list);
  351. atomic_inc(&worker->refs);
  352. workers->num_workers -= 1;
  353. if (!list_empty(&worker->worker_list)) {
  354. list_del_init(&worker->worker_list);
  355. put_worker(worker);
  356. can_stop = 1;
  357. } else
  358. can_stop = 0;
  359. spin_unlock_irq(&workers->lock);
  360. if (can_stop)
  361. kthread_stop(worker->task);
  362. spin_lock_irq(&workers->lock);
  363. put_worker(worker);
  364. }
  365. spin_unlock_irq(&workers->lock);
  366. return 0;
  367. }
  368. /*
  369. * simple init on struct btrfs_workers
  370. */
  371. void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
  372. struct btrfs_workers *async_helper)
  373. {
  374. workers->num_workers = 0;
  375. workers->num_workers_starting = 0;
  376. INIT_LIST_HEAD(&workers->worker_list);
  377. INIT_LIST_HEAD(&workers->idle_list);
  378. INIT_LIST_HEAD(&workers->order_list);
  379. INIT_LIST_HEAD(&workers->prio_order_list);
  380. spin_lock_init(&workers->lock);
  381. spin_lock_init(&workers->order_lock);
  382. workers->max_workers = max;
  383. workers->idle_thresh = 32;
  384. workers->name = name;
  385. workers->ordered = 0;
  386. workers->atomic_start_pending = 0;
  387. workers->atomic_worker_start = async_helper;
  388. }
  389. /*
  390. * starts new worker threads. This does not enforce the max worker
  391. * count in case you need to temporarily go past it.
  392. */
  393. static int __btrfs_start_workers(struct btrfs_workers *workers,
  394. int num_workers)
  395. {
  396. struct btrfs_worker_thread *worker;
  397. int ret = 0;
  398. int i;
  399. for (i = 0; i < num_workers; i++) {
  400. worker = kzalloc(sizeof(*worker), GFP_NOFS);
  401. if (!worker) {
  402. ret = -ENOMEM;
  403. goto fail;
  404. }
  405. INIT_LIST_HEAD(&worker->pending);
  406. INIT_LIST_HEAD(&worker->prio_pending);
  407. INIT_LIST_HEAD(&worker->worker_list);
  408. spin_lock_init(&worker->lock);
  409. atomic_set(&worker->num_pending, 0);
  410. atomic_set(&worker->refs, 1);
  411. worker->workers = workers;
  412. worker->task = kthread_run(worker_loop, worker,
  413. "btrfs-%s-%d", workers->name,
  414. workers->num_workers + i);
  415. if (IS_ERR(worker->task)) {
  416. ret = PTR_ERR(worker->task);
  417. kfree(worker);
  418. goto fail;
  419. }
  420. spin_lock_irq(&workers->lock);
  421. list_add_tail(&worker->worker_list, &workers->idle_list);
  422. worker->idle = 1;
  423. workers->num_workers++;
  424. workers->num_workers_starting--;
  425. WARN_ON(workers->num_workers_starting < 0);
  426. spin_unlock_irq(&workers->lock);
  427. }
  428. return 0;
  429. fail:
  430. btrfs_stop_workers(workers);
  431. return ret;
  432. }
  433. int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
  434. {
  435. spin_lock_irq(&workers->lock);
  436. workers->num_workers_starting += num_workers;
  437. spin_unlock_irq(&workers->lock);
  438. return __btrfs_start_workers(workers, num_workers);
  439. }
  440. /*
  441. * run through the list and find a worker thread that doesn't have a lot
  442. * to do right now. This can return null if we aren't yet at the thread
  443. * count limit and all of the threads are busy.
  444. */
  445. static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
  446. {
  447. struct btrfs_worker_thread *worker;
  448. struct list_head *next;
  449. int enforce_min;
  450. enforce_min = (workers->num_workers + workers->num_workers_starting) <
  451. workers->max_workers;
  452. /*
  453. * if we find an idle thread, don't move it to the end of the
  454. * idle list. This improves the chance that the next submission
  455. * will reuse the same thread, and maybe catch it while it is still
  456. * working
  457. */
  458. if (!list_empty(&workers->idle_list)) {
  459. next = workers->idle_list.next;
  460. worker = list_entry(next, struct btrfs_worker_thread,
  461. worker_list);
  462. return worker;
  463. }
  464. if (enforce_min || list_empty(&workers->worker_list))
  465. return NULL;
  466. /*
  467. * if we pick a busy task, move the task to the end of the list.
  468. * hopefully this will keep things somewhat evenly balanced.
  469. * Do the move in batches based on the sequence number. This groups
  470. * requests submitted at roughly the same time onto the same worker.
  471. */
  472. next = workers->worker_list.next;
  473. worker = list_entry(next, struct btrfs_worker_thread, worker_list);
  474. worker->sequence++;
  475. if (worker->sequence % workers->idle_thresh == 0)
  476. list_move_tail(next, &workers->worker_list);
  477. return worker;
  478. }
  479. /*
  480. * selects a worker thread to take the next job. This will either find
  481. * an idle worker, start a new worker up to the max count, or just return
  482. * one of the existing busy workers.
  483. */
  484. static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
  485. {
  486. struct btrfs_worker_thread *worker;
  487. unsigned long flags;
  488. struct list_head *fallback;
  489. again:
  490. spin_lock_irqsave(&workers->lock, flags);
  491. worker = next_worker(workers);
  492. if (!worker) {
  493. if (workers->num_workers + workers->num_workers_starting >=
  494. workers->max_workers) {
  495. goto fallback;
  496. } else if (workers->atomic_worker_start) {
  497. workers->atomic_start_pending = 1;
  498. goto fallback;
  499. } else {
  500. workers->num_workers_starting++;
  501. spin_unlock_irqrestore(&workers->lock, flags);
  502. /* we're below the limit, start another worker */
  503. __btrfs_start_workers(workers, 1);
  504. goto again;
  505. }
  506. }
  507. goto found;
  508. fallback:
  509. fallback = NULL;
  510. /*
  511. * we have failed to find any workers, just
  512. * return the first one we can find.
  513. */
  514. if (!list_empty(&workers->worker_list))
  515. fallback = workers->worker_list.next;
  516. if (!list_empty(&workers->idle_list))
  517. fallback = workers->idle_list.next;
  518. BUG_ON(!fallback);
  519. worker = list_entry(fallback,
  520. struct btrfs_worker_thread, worker_list);
  521. found:
  522. /*
  523. * this makes sure the worker doesn't exit before it is placed
  524. * onto a busy/idle list
  525. */
  526. atomic_inc(&worker->num_pending);
  527. spin_unlock_irqrestore(&workers->lock, flags);
  528. return worker;
  529. }
  530. /*
  531. * btrfs_requeue_work just puts the work item back on the tail of the list
  532. * it was taken from. It is intended for use with long running work functions
  533. * that make some progress and want to give the cpu up for others.
  534. */
  535. int btrfs_requeue_work(struct btrfs_work *work)
  536. {
  537. struct btrfs_worker_thread *worker = work->worker;
  538. unsigned long flags;
  539. int wake = 0;
  540. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  541. goto out;
  542. spin_lock_irqsave(&worker->lock, flags);
  543. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  544. list_add_tail(&work->list, &worker->prio_pending);
  545. else
  546. list_add_tail(&work->list, &worker->pending);
  547. atomic_inc(&worker->num_pending);
  548. /* by definition we're busy, take ourselves off the idle
  549. * list
  550. */
  551. if (worker->idle) {
  552. spin_lock(&worker->workers->lock);
  553. worker->idle = 0;
  554. list_move_tail(&worker->worker_list,
  555. &worker->workers->worker_list);
  556. spin_unlock(&worker->workers->lock);
  557. }
  558. if (!worker->working) {
  559. wake = 1;
  560. worker->working = 1;
  561. }
  562. if (wake)
  563. wake_up_process(worker->task);
  564. spin_unlock_irqrestore(&worker->lock, flags);
  565. out:
  566. return 0;
  567. }
  568. void btrfs_set_work_high_prio(struct btrfs_work *work)
  569. {
  570. set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
  571. }
  572. /*
  573. * places a struct btrfs_work into the pending queue of one of the kthreads
  574. */
  575. int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
  576. {
  577. struct btrfs_worker_thread *worker;
  578. unsigned long flags;
  579. int wake = 0;
  580. /* don't requeue something already on a list */
  581. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  582. goto out;
  583. worker = find_worker(workers);
  584. if (workers->ordered) {
  585. /*
  586. * you're not allowed to do ordered queues from an
  587. * interrupt handler
  588. */
  589. spin_lock(&workers->order_lock);
  590. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
  591. list_add_tail(&work->order_list,
  592. &workers->prio_order_list);
  593. } else {
  594. list_add_tail(&work->order_list, &workers->order_list);
  595. }
  596. spin_unlock(&workers->order_lock);
  597. } else {
  598. INIT_LIST_HEAD(&work->order_list);
  599. }
  600. spin_lock_irqsave(&worker->lock, flags);
  601. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  602. list_add_tail(&work->list, &worker->prio_pending);
  603. else
  604. list_add_tail(&work->list, &worker->pending);
  605. check_busy_worker(worker);
  606. /*
  607. * avoid calling into wake_up_process if this thread has already
  608. * been kicked
  609. */
  610. if (!worker->working)
  611. wake = 1;
  612. worker->working = 1;
  613. if (wake)
  614. wake_up_process(worker->task);
  615. spin_unlock_irqrestore(&worker->lock, flags);
  616. out:
  617. return 0;
  618. }