async-thread.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717
  1. /*
  2. * Copyright (C) 2007 Oracle. All rights reserved.
  3. *
  4. * This program is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU General Public
  6. * License v2 as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. * General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public
  14. * License along with this program; if not, write to the
  15. * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  16. * Boston, MA 021110-1307, USA.
  17. */
  18. #include <linux/kthread.h>
  19. #include <linux/slab.h>
  20. #include <linux/list.h>
  21. #include <linux/spinlock.h>
  22. #include <linux/freezer.h>
  23. #include "async-thread.h"
  24. #define WORK_QUEUED_BIT 0
  25. #define WORK_DONE_BIT 1
  26. #define WORK_ORDER_DONE_BIT 2
  27. #define WORK_HIGH_PRIO_BIT 3
  28. /*
  29. * container for the kthread task pointer and the list of pending work
  30. * One of these is allocated per thread.
  31. */
  32. struct btrfs_worker_thread {
  33. /* pool we belong to */
  34. struct btrfs_workers *workers;
  35. /* list of struct btrfs_work that are waiting for service */
  36. struct list_head pending;
  37. struct list_head prio_pending;
  38. /* list of worker threads from struct btrfs_workers */
  39. struct list_head worker_list;
  40. /* kthread */
  41. struct task_struct *task;
  42. /* number of things on the pending list */
  43. atomic_t num_pending;
  44. /* reference counter for this struct */
  45. atomic_t refs;
  46. unsigned long sequence;
  47. /* protects the pending list. */
  48. spinlock_t lock;
  49. /* set to non-zero when this thread is already awake and kicking */
  50. int working;
  51. /* are we currently idle */
  52. int idle;
  53. };
  54. /*
  55. * btrfs_start_workers uses kthread_run, which can block waiting for memory
  56. * for a very long time. It will actually throttle on page writeback,
  57. * and so it may not make progress until after our btrfs worker threads
  58. * process all of the pending work structs in their queue
  59. *
  60. * This means we can't use btrfs_start_workers from inside a btrfs worker
  61. * thread that is used as part of cleaning dirty memory, which pretty much
  62. * involves all of the worker threads.
  63. *
  64. * Instead we have a helper queue who never has more than one thread
  65. * where we scheduler thread start operations. This worker_start struct
  66. * is used to contain the work and hold a pointer to the queue that needs
  67. * another worker.
  68. */
  69. struct worker_start {
  70. struct btrfs_work work;
  71. struct btrfs_workers *queue;
  72. };
  73. static void start_new_worker_func(struct btrfs_work *work)
  74. {
  75. struct worker_start *start;
  76. start = container_of(work, struct worker_start, work);
  77. btrfs_start_workers(start->queue, 1);
  78. kfree(start);
  79. }
  80. static int start_new_worker(struct btrfs_workers *queue)
  81. {
  82. struct worker_start *start;
  83. int ret;
  84. start = kzalloc(sizeof(*start), GFP_NOFS);
  85. if (!start)
  86. return -ENOMEM;
  87. start->work.func = start_new_worker_func;
  88. start->queue = queue;
  89. ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
  90. if (ret)
  91. kfree(start);
  92. return ret;
  93. }
  94. /*
  95. * helper function to move a thread onto the idle list after it
  96. * has finished some requests.
  97. */
  98. static void check_idle_worker(struct btrfs_worker_thread *worker)
  99. {
  100. if (!worker->idle && atomic_read(&worker->num_pending) <
  101. worker->workers->idle_thresh / 2) {
  102. unsigned long flags;
  103. spin_lock_irqsave(&worker->workers->lock, flags);
  104. worker->idle = 1;
  105. /* the list may be empty if the worker is just starting */
  106. if (!list_empty(&worker->worker_list)) {
  107. list_move(&worker->worker_list,
  108. &worker->workers->idle_list);
  109. }
  110. spin_unlock_irqrestore(&worker->workers->lock, flags);
  111. }
  112. }
  113. /*
  114. * helper function to move a thread off the idle list after new
  115. * pending work is added.
  116. */
  117. static void check_busy_worker(struct btrfs_worker_thread *worker)
  118. {
  119. if (worker->idle && atomic_read(&worker->num_pending) >=
  120. worker->workers->idle_thresh) {
  121. unsigned long flags;
  122. spin_lock_irqsave(&worker->workers->lock, flags);
  123. worker->idle = 0;
  124. if (!list_empty(&worker->worker_list)) {
  125. list_move_tail(&worker->worker_list,
  126. &worker->workers->worker_list);
  127. }
  128. spin_unlock_irqrestore(&worker->workers->lock, flags);
  129. }
  130. }
  131. static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
  132. {
  133. struct btrfs_workers *workers = worker->workers;
  134. unsigned long flags;
  135. rmb();
  136. if (!workers->atomic_start_pending)
  137. return;
  138. spin_lock_irqsave(&workers->lock, flags);
  139. if (!workers->atomic_start_pending)
  140. goto out;
  141. workers->atomic_start_pending = 0;
  142. if (workers->num_workers + workers->num_workers_starting >=
  143. workers->max_workers)
  144. goto out;
  145. workers->num_workers_starting += 1;
  146. spin_unlock_irqrestore(&workers->lock, flags);
  147. start_new_worker(workers);
  148. return;
  149. out:
  150. spin_unlock_irqrestore(&workers->lock, flags);
  151. }
  152. static noinline int run_ordered_completions(struct btrfs_workers *workers,
  153. struct btrfs_work *work)
  154. {
  155. if (!workers->ordered)
  156. return 0;
  157. set_bit(WORK_DONE_BIT, &work->flags);
  158. spin_lock(&workers->order_lock);
  159. while (1) {
  160. if (!list_empty(&workers->prio_order_list)) {
  161. work = list_entry(workers->prio_order_list.next,
  162. struct btrfs_work, order_list);
  163. } else if (!list_empty(&workers->order_list)) {
  164. work = list_entry(workers->order_list.next,
  165. struct btrfs_work, order_list);
  166. } else {
  167. break;
  168. }
  169. if (!test_bit(WORK_DONE_BIT, &work->flags))
  170. break;
  171. /* we are going to call the ordered done function, but
  172. * we leave the work item on the list as a barrier so
  173. * that later work items that are done don't have their
  174. * functions called before this one returns
  175. */
  176. if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
  177. break;
  178. spin_unlock(&workers->order_lock);
  179. work->ordered_func(work);
  180. /* now take the lock again and call the freeing code */
  181. spin_lock(&workers->order_lock);
  182. list_del(&work->order_list);
  183. work->ordered_free(work);
  184. }
  185. spin_unlock(&workers->order_lock);
  186. return 0;
  187. }
  188. static void put_worker(struct btrfs_worker_thread *worker)
  189. {
  190. if (atomic_dec_and_test(&worker->refs))
  191. kfree(worker);
  192. }
  193. static int try_worker_shutdown(struct btrfs_worker_thread *worker)
  194. {
  195. int freeit = 0;
  196. spin_lock_irq(&worker->lock);
  197. spin_lock(&worker->workers->lock);
  198. if (worker->workers->num_workers > 1 &&
  199. worker->idle &&
  200. !worker->working &&
  201. !list_empty(&worker->worker_list) &&
  202. list_empty(&worker->prio_pending) &&
  203. list_empty(&worker->pending) &&
  204. atomic_read(&worker->num_pending) == 0) {
  205. freeit = 1;
  206. list_del_init(&worker->worker_list);
  207. worker->workers->num_workers--;
  208. }
  209. spin_unlock(&worker->workers->lock);
  210. spin_unlock_irq(&worker->lock);
  211. if (freeit)
  212. put_worker(worker);
  213. return freeit;
  214. }
  215. static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
  216. struct list_head *prio_head,
  217. struct list_head *head)
  218. {
  219. struct btrfs_work *work = NULL;
  220. struct list_head *cur = NULL;
  221. if(!list_empty(prio_head))
  222. cur = prio_head->next;
  223. smp_mb();
  224. if (!list_empty(&worker->prio_pending))
  225. goto refill;
  226. if (!list_empty(head))
  227. cur = head->next;
  228. if (cur)
  229. goto out;
  230. refill:
  231. spin_lock_irq(&worker->lock);
  232. list_splice_tail_init(&worker->prio_pending, prio_head);
  233. list_splice_tail_init(&worker->pending, head);
  234. if (!list_empty(prio_head))
  235. cur = prio_head->next;
  236. else if (!list_empty(head))
  237. cur = head->next;
  238. spin_unlock_irq(&worker->lock);
  239. if (!cur)
  240. goto out_fail;
  241. out:
  242. work = list_entry(cur, struct btrfs_work, list);
  243. out_fail:
  244. return work;
  245. }
  246. /*
  247. * main loop for servicing work items
  248. */
  249. static int worker_loop(void *arg)
  250. {
  251. struct btrfs_worker_thread *worker = arg;
  252. struct list_head head;
  253. struct list_head prio_head;
  254. struct btrfs_work *work;
  255. INIT_LIST_HEAD(&head);
  256. INIT_LIST_HEAD(&prio_head);
  257. do {
  258. again:
  259. while (1) {
  260. work = get_next_work(worker, &prio_head, &head);
  261. if (!work)
  262. break;
  263. list_del(&work->list);
  264. clear_bit(WORK_QUEUED_BIT, &work->flags);
  265. work->worker = worker;
  266. work->func(work);
  267. atomic_dec(&worker->num_pending);
  268. /*
  269. * unless this is an ordered work queue,
  270. * 'work' was probably freed by func above.
  271. */
  272. run_ordered_completions(worker->workers, work);
  273. check_pending_worker_creates(worker);
  274. }
  275. spin_lock_irq(&worker->lock);
  276. check_idle_worker(worker);
  277. if (freezing(current)) {
  278. worker->working = 0;
  279. spin_unlock_irq(&worker->lock);
  280. refrigerator();
  281. } else {
  282. spin_unlock_irq(&worker->lock);
  283. if (!kthread_should_stop()) {
  284. cpu_relax();
  285. /*
  286. * we've dropped the lock, did someone else
  287. * jump_in?
  288. */
  289. smp_mb();
  290. if (!list_empty(&worker->pending) ||
  291. !list_empty(&worker->prio_pending))
  292. continue;
  293. /*
  294. * this short schedule allows more work to
  295. * come in without the queue functions
  296. * needing to go through wake_up_process()
  297. *
  298. * worker->working is still 1, so nobody
  299. * is going to try and wake us up
  300. */
  301. schedule_timeout(1);
  302. smp_mb();
  303. if (!list_empty(&worker->pending) ||
  304. !list_empty(&worker->prio_pending))
  305. continue;
  306. if (kthread_should_stop())
  307. break;
  308. /* still no more work?, sleep for real */
  309. spin_lock_irq(&worker->lock);
  310. set_current_state(TASK_INTERRUPTIBLE);
  311. if (!list_empty(&worker->pending) ||
  312. !list_empty(&worker->prio_pending)) {
  313. spin_unlock_irq(&worker->lock);
  314. goto again;
  315. }
  316. /*
  317. * this makes sure we get a wakeup when someone
  318. * adds something new to the queue
  319. */
  320. worker->working = 0;
  321. spin_unlock_irq(&worker->lock);
  322. if (!kthread_should_stop()) {
  323. schedule_timeout(HZ * 120);
  324. if (!worker->working &&
  325. try_worker_shutdown(worker)) {
  326. return 0;
  327. }
  328. }
  329. }
  330. __set_current_state(TASK_RUNNING);
  331. }
  332. } while (!kthread_should_stop());
  333. return 0;
  334. }
  335. /*
  336. * this will wait for all the worker threads to shutdown
  337. */
  338. int btrfs_stop_workers(struct btrfs_workers *workers)
  339. {
  340. struct list_head *cur;
  341. struct btrfs_worker_thread *worker;
  342. int can_stop;
  343. spin_lock_irq(&workers->lock);
  344. list_splice_init(&workers->idle_list, &workers->worker_list);
  345. while (!list_empty(&workers->worker_list)) {
  346. cur = workers->worker_list.next;
  347. worker = list_entry(cur, struct btrfs_worker_thread,
  348. worker_list);
  349. atomic_inc(&worker->refs);
  350. workers->num_workers -= 1;
  351. if (!list_empty(&worker->worker_list)) {
  352. list_del_init(&worker->worker_list);
  353. put_worker(worker);
  354. can_stop = 1;
  355. } else
  356. can_stop = 0;
  357. spin_unlock_irq(&workers->lock);
  358. if (can_stop)
  359. kthread_stop(worker->task);
  360. spin_lock_irq(&workers->lock);
  361. put_worker(worker);
  362. }
  363. spin_unlock_irq(&workers->lock);
  364. return 0;
  365. }
  366. /*
  367. * simple init on struct btrfs_workers
  368. */
  369. void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
  370. struct btrfs_workers *async_helper)
  371. {
  372. workers->num_workers = 0;
  373. workers->num_workers_starting = 0;
  374. INIT_LIST_HEAD(&workers->worker_list);
  375. INIT_LIST_HEAD(&workers->idle_list);
  376. INIT_LIST_HEAD(&workers->order_list);
  377. INIT_LIST_HEAD(&workers->prio_order_list);
  378. spin_lock_init(&workers->lock);
  379. spin_lock_init(&workers->order_lock);
  380. workers->max_workers = max;
  381. workers->idle_thresh = 32;
  382. workers->name = name;
  383. workers->ordered = 0;
  384. workers->atomic_start_pending = 0;
  385. workers->atomic_worker_start = async_helper;
  386. }
  387. /*
  388. * starts new worker threads. This does not enforce the max worker
  389. * count in case you need to temporarily go past it.
  390. */
  391. static int __btrfs_start_workers(struct btrfs_workers *workers,
  392. int num_workers)
  393. {
  394. struct btrfs_worker_thread *worker;
  395. int ret = 0;
  396. int i;
  397. for (i = 0; i < num_workers; i++) {
  398. worker = kzalloc(sizeof(*worker), GFP_NOFS);
  399. if (!worker) {
  400. ret = -ENOMEM;
  401. goto fail;
  402. }
  403. INIT_LIST_HEAD(&worker->pending);
  404. INIT_LIST_HEAD(&worker->prio_pending);
  405. INIT_LIST_HEAD(&worker->worker_list);
  406. spin_lock_init(&worker->lock);
  407. atomic_set(&worker->num_pending, 0);
  408. atomic_set(&worker->refs, 1);
  409. worker->workers = workers;
  410. worker->task = kthread_run(worker_loop, worker,
  411. "btrfs-%s-%d", workers->name,
  412. workers->num_workers + i);
  413. if (IS_ERR(worker->task)) {
  414. ret = PTR_ERR(worker->task);
  415. kfree(worker);
  416. goto fail;
  417. }
  418. spin_lock_irq(&workers->lock);
  419. list_add_tail(&worker->worker_list, &workers->idle_list);
  420. worker->idle = 1;
  421. workers->num_workers++;
  422. workers->num_workers_starting--;
  423. WARN_ON(workers->num_workers_starting < 0);
  424. spin_unlock_irq(&workers->lock);
  425. }
  426. return 0;
  427. fail:
  428. btrfs_stop_workers(workers);
  429. return ret;
  430. }
  431. int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
  432. {
  433. spin_lock_irq(&workers->lock);
  434. workers->num_workers_starting += num_workers;
  435. spin_unlock_irq(&workers->lock);
  436. return __btrfs_start_workers(workers, num_workers);
  437. }
  438. /*
  439. * run through the list and find a worker thread that doesn't have a lot
  440. * to do right now. This can return null if we aren't yet at the thread
  441. * count limit and all of the threads are busy.
  442. */
  443. static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
  444. {
  445. struct btrfs_worker_thread *worker;
  446. struct list_head *next;
  447. int enforce_min;
  448. enforce_min = (workers->num_workers + workers->num_workers_starting) <
  449. workers->max_workers;
  450. /*
  451. * if we find an idle thread, don't move it to the end of the
  452. * idle list. This improves the chance that the next submission
  453. * will reuse the same thread, and maybe catch it while it is still
  454. * working
  455. */
  456. if (!list_empty(&workers->idle_list)) {
  457. next = workers->idle_list.next;
  458. worker = list_entry(next, struct btrfs_worker_thread,
  459. worker_list);
  460. return worker;
  461. }
  462. if (enforce_min || list_empty(&workers->worker_list))
  463. return NULL;
  464. /*
  465. * if we pick a busy task, move the task to the end of the list.
  466. * hopefully this will keep things somewhat evenly balanced.
  467. * Do the move in batches based on the sequence number. This groups
  468. * requests submitted at roughly the same time onto the same worker.
  469. */
  470. next = workers->worker_list.next;
  471. worker = list_entry(next, struct btrfs_worker_thread, worker_list);
  472. worker->sequence++;
  473. if (worker->sequence % workers->idle_thresh == 0)
  474. list_move_tail(next, &workers->worker_list);
  475. return worker;
  476. }
  477. /*
  478. * selects a worker thread to take the next job. This will either find
  479. * an idle worker, start a new worker up to the max count, or just return
  480. * one of the existing busy workers.
  481. */
  482. static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
  483. {
  484. struct btrfs_worker_thread *worker;
  485. unsigned long flags;
  486. struct list_head *fallback;
  487. again:
  488. spin_lock_irqsave(&workers->lock, flags);
  489. worker = next_worker(workers);
  490. if (!worker) {
  491. if (workers->num_workers + workers->num_workers_starting >=
  492. workers->max_workers) {
  493. goto fallback;
  494. } else if (workers->atomic_worker_start) {
  495. workers->atomic_start_pending = 1;
  496. goto fallback;
  497. } else {
  498. workers->num_workers_starting++;
  499. spin_unlock_irqrestore(&workers->lock, flags);
  500. /* we're below the limit, start another worker */
  501. __btrfs_start_workers(workers, 1);
  502. goto again;
  503. }
  504. }
  505. goto found;
  506. fallback:
  507. fallback = NULL;
  508. /*
  509. * we have failed to find any workers, just
  510. * return the first one we can find.
  511. */
  512. if (!list_empty(&workers->worker_list))
  513. fallback = workers->worker_list.next;
  514. if (!list_empty(&workers->idle_list))
  515. fallback = workers->idle_list.next;
  516. BUG_ON(!fallback);
  517. worker = list_entry(fallback,
  518. struct btrfs_worker_thread, worker_list);
  519. found:
  520. /*
  521. * this makes sure the worker doesn't exit before it is placed
  522. * onto a busy/idle list
  523. */
  524. atomic_inc(&worker->num_pending);
  525. spin_unlock_irqrestore(&workers->lock, flags);
  526. return worker;
  527. }
  528. /*
  529. * btrfs_requeue_work just puts the work item back on the tail of the list
  530. * it was taken from. It is intended for use with long running work functions
  531. * that make some progress and want to give the cpu up for others.
  532. */
  533. int btrfs_requeue_work(struct btrfs_work *work)
  534. {
  535. struct btrfs_worker_thread *worker = work->worker;
  536. unsigned long flags;
  537. int wake = 0;
  538. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  539. goto out;
  540. spin_lock_irqsave(&worker->lock, flags);
  541. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  542. list_add_tail(&work->list, &worker->prio_pending);
  543. else
  544. list_add_tail(&work->list, &worker->pending);
  545. atomic_inc(&worker->num_pending);
  546. /* by definition we're busy, take ourselves off the idle
  547. * list
  548. */
  549. if (worker->idle) {
  550. spin_lock(&worker->workers->lock);
  551. worker->idle = 0;
  552. list_move_tail(&worker->worker_list,
  553. &worker->workers->worker_list);
  554. spin_unlock(&worker->workers->lock);
  555. }
  556. if (!worker->working) {
  557. wake = 1;
  558. worker->working = 1;
  559. }
  560. if (wake)
  561. wake_up_process(worker->task);
  562. spin_unlock_irqrestore(&worker->lock, flags);
  563. out:
  564. return 0;
  565. }
  566. void btrfs_set_work_high_prio(struct btrfs_work *work)
  567. {
  568. set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
  569. }
  570. /*
  571. * places a struct btrfs_work into the pending queue of one of the kthreads
  572. */
  573. int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
  574. {
  575. struct btrfs_worker_thread *worker;
  576. unsigned long flags;
  577. int wake = 0;
  578. /* don't requeue something already on a list */
  579. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  580. goto out;
  581. worker = find_worker(workers);
  582. if (workers->ordered) {
  583. /*
  584. * you're not allowed to do ordered queues from an
  585. * interrupt handler
  586. */
  587. spin_lock(&workers->order_lock);
  588. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
  589. list_add_tail(&work->order_list,
  590. &workers->prio_order_list);
  591. } else {
  592. list_add_tail(&work->order_list, &workers->order_list);
  593. }
  594. spin_unlock(&workers->order_lock);
  595. } else {
  596. INIT_LIST_HEAD(&work->order_list);
  597. }
  598. spin_lock_irqsave(&worker->lock, flags);
  599. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  600. list_add_tail(&work->list, &worker->prio_pending);
  601. else
  602. list_add_tail(&work->list, &worker->pending);
  603. check_busy_worker(worker);
  604. /*
  605. * avoid calling into wake_up_process if this thread has already
  606. * been kicked
  607. */
  608. if (!worker->working)
  609. wake = 1;
  610. worker->working = 1;
  611. if (wake)
  612. wake_up_process(worker->task);
  613. spin_unlock_irqrestore(&worker->lock, flags);
  614. out:
  615. return 0;
  616. }