async-thread.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716
  1. /*
  2. * Copyright (C) 2007 Oracle. All rights reserved.
  3. *
  4. * This program is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU General Public
  6. * License v2 as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. * General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public
  14. * License along with this program; if not, write to the
  15. * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  16. * Boston, MA 021110-1307, USA.
  17. */
  18. #include <linux/kthread.h>
  19. #include <linux/list.h>
  20. #include <linux/spinlock.h>
  21. #include <linux/freezer.h>
  22. #include "async-thread.h"
  23. #define WORK_QUEUED_BIT 0
  24. #define WORK_DONE_BIT 1
  25. #define WORK_ORDER_DONE_BIT 2
  26. #define WORK_HIGH_PRIO_BIT 3
  27. /*
  28. * container for the kthread task pointer and the list of pending work
  29. * One of these is allocated per thread.
  30. */
  31. struct btrfs_worker_thread {
  32. /* pool we belong to */
  33. struct btrfs_workers *workers;
  34. /* list of struct btrfs_work that are waiting for service */
  35. struct list_head pending;
  36. struct list_head prio_pending;
  37. /* list of worker threads from struct btrfs_workers */
  38. struct list_head worker_list;
  39. /* kthread */
  40. struct task_struct *task;
  41. /* number of things on the pending list */
  42. atomic_t num_pending;
  43. /* reference counter for this struct */
  44. atomic_t refs;
  45. unsigned long sequence;
  46. /* protects the pending list. */
  47. spinlock_t lock;
  48. /* set to non-zero when this thread is already awake and kicking */
  49. int working;
  50. /* are we currently idle */
  51. int idle;
  52. };
  53. /*
  54. * btrfs_start_workers uses kthread_run, which can block waiting for memory
  55. * for a very long time. It will actually throttle on page writeback,
  56. * and so it may not make progress until after our btrfs worker threads
  57. * process all of the pending work structs in their queue
  58. *
  59. * This means we can't use btrfs_start_workers from inside a btrfs worker
  60. * thread that is used as part of cleaning dirty memory, which pretty much
  61. * involves all of the worker threads.
  62. *
  63. * Instead we have a helper queue who never has more than one thread
  64. * where we scheduler thread start operations. This worker_start struct
  65. * is used to contain the work and hold a pointer to the queue that needs
  66. * another worker.
  67. */
  68. struct worker_start {
  69. struct btrfs_work work;
  70. struct btrfs_workers *queue;
  71. };
  72. static void start_new_worker_func(struct btrfs_work *work)
  73. {
  74. struct worker_start *start;
  75. start = container_of(work, struct worker_start, work);
  76. btrfs_start_workers(start->queue, 1);
  77. kfree(start);
  78. }
  79. static int start_new_worker(struct btrfs_workers *queue)
  80. {
  81. struct worker_start *start;
  82. int ret;
  83. start = kzalloc(sizeof(*start), GFP_NOFS);
  84. if (!start)
  85. return -ENOMEM;
  86. start->work.func = start_new_worker_func;
  87. start->queue = queue;
  88. ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
  89. if (ret)
  90. kfree(start);
  91. return ret;
  92. }
  93. /*
  94. * helper function to move a thread onto the idle list after it
  95. * has finished some requests.
  96. */
  97. static void check_idle_worker(struct btrfs_worker_thread *worker)
  98. {
  99. if (!worker->idle && atomic_read(&worker->num_pending) <
  100. worker->workers->idle_thresh / 2) {
  101. unsigned long flags;
  102. spin_lock_irqsave(&worker->workers->lock, flags);
  103. worker->idle = 1;
  104. /* the list may be empty if the worker is just starting */
  105. if (!list_empty(&worker->worker_list)) {
  106. list_move(&worker->worker_list,
  107. &worker->workers->idle_list);
  108. }
  109. spin_unlock_irqrestore(&worker->workers->lock, flags);
  110. }
  111. }
  112. /*
  113. * helper function to move a thread off the idle list after new
  114. * pending work is added.
  115. */
  116. static void check_busy_worker(struct btrfs_worker_thread *worker)
  117. {
  118. if (worker->idle && atomic_read(&worker->num_pending) >=
  119. worker->workers->idle_thresh) {
  120. unsigned long flags;
  121. spin_lock_irqsave(&worker->workers->lock, flags);
  122. worker->idle = 0;
  123. if (!list_empty(&worker->worker_list)) {
  124. list_move_tail(&worker->worker_list,
  125. &worker->workers->worker_list);
  126. }
  127. spin_unlock_irqrestore(&worker->workers->lock, flags);
  128. }
  129. }
  130. static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
  131. {
  132. struct btrfs_workers *workers = worker->workers;
  133. unsigned long flags;
  134. rmb();
  135. if (!workers->atomic_start_pending)
  136. return;
  137. spin_lock_irqsave(&workers->lock, flags);
  138. if (!workers->atomic_start_pending)
  139. goto out;
  140. workers->atomic_start_pending = 0;
  141. if (workers->num_workers + workers->num_workers_starting >=
  142. workers->max_workers)
  143. goto out;
  144. workers->num_workers_starting += 1;
  145. spin_unlock_irqrestore(&workers->lock, flags);
  146. start_new_worker(workers);
  147. return;
  148. out:
  149. spin_unlock_irqrestore(&workers->lock, flags);
  150. }
  151. static noinline int run_ordered_completions(struct btrfs_workers *workers,
  152. struct btrfs_work *work)
  153. {
  154. if (!workers->ordered)
  155. return 0;
  156. set_bit(WORK_DONE_BIT, &work->flags);
  157. spin_lock(&workers->order_lock);
  158. while (1) {
  159. if (!list_empty(&workers->prio_order_list)) {
  160. work = list_entry(workers->prio_order_list.next,
  161. struct btrfs_work, order_list);
  162. } else if (!list_empty(&workers->order_list)) {
  163. work = list_entry(workers->order_list.next,
  164. struct btrfs_work, order_list);
  165. } else {
  166. break;
  167. }
  168. if (!test_bit(WORK_DONE_BIT, &work->flags))
  169. break;
  170. /* we are going to call the ordered done function, but
  171. * we leave the work item on the list as a barrier so
  172. * that later work items that are done don't have their
  173. * functions called before this one returns
  174. */
  175. if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
  176. break;
  177. spin_unlock(&workers->order_lock);
  178. work->ordered_func(work);
  179. /* now take the lock again and call the freeing code */
  180. spin_lock(&workers->order_lock);
  181. list_del(&work->order_list);
  182. work->ordered_free(work);
  183. }
  184. spin_unlock(&workers->order_lock);
  185. return 0;
  186. }
  187. static void put_worker(struct btrfs_worker_thread *worker)
  188. {
  189. if (atomic_dec_and_test(&worker->refs))
  190. kfree(worker);
  191. }
  192. static int try_worker_shutdown(struct btrfs_worker_thread *worker)
  193. {
  194. int freeit = 0;
  195. spin_lock_irq(&worker->lock);
  196. spin_lock(&worker->workers->lock);
  197. if (worker->workers->num_workers > 1 &&
  198. worker->idle &&
  199. !worker->working &&
  200. !list_empty(&worker->worker_list) &&
  201. list_empty(&worker->prio_pending) &&
  202. list_empty(&worker->pending) &&
  203. atomic_read(&worker->num_pending) == 0) {
  204. freeit = 1;
  205. list_del_init(&worker->worker_list);
  206. worker->workers->num_workers--;
  207. }
  208. spin_unlock(&worker->workers->lock);
  209. spin_unlock_irq(&worker->lock);
  210. if (freeit)
  211. put_worker(worker);
  212. return freeit;
  213. }
  214. static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
  215. struct list_head *prio_head,
  216. struct list_head *head)
  217. {
  218. struct btrfs_work *work = NULL;
  219. struct list_head *cur = NULL;
  220. if(!list_empty(prio_head))
  221. cur = prio_head->next;
  222. smp_mb();
  223. if (!list_empty(&worker->prio_pending))
  224. goto refill;
  225. if (!list_empty(head))
  226. cur = head->next;
  227. if (cur)
  228. goto out;
  229. refill:
  230. spin_lock_irq(&worker->lock);
  231. list_splice_tail_init(&worker->prio_pending, prio_head);
  232. list_splice_tail_init(&worker->pending, head);
  233. if (!list_empty(prio_head))
  234. cur = prio_head->next;
  235. else if (!list_empty(head))
  236. cur = head->next;
  237. spin_unlock_irq(&worker->lock);
  238. if (!cur)
  239. goto out_fail;
  240. out:
  241. work = list_entry(cur, struct btrfs_work, list);
  242. out_fail:
  243. return work;
  244. }
  245. /*
  246. * main loop for servicing work items
  247. */
  248. static int worker_loop(void *arg)
  249. {
  250. struct btrfs_worker_thread *worker = arg;
  251. struct list_head head;
  252. struct list_head prio_head;
  253. struct btrfs_work *work;
  254. INIT_LIST_HEAD(&head);
  255. INIT_LIST_HEAD(&prio_head);
  256. do {
  257. again:
  258. while (1) {
  259. work = get_next_work(worker, &prio_head, &head);
  260. if (!work)
  261. break;
  262. list_del(&work->list);
  263. clear_bit(WORK_QUEUED_BIT, &work->flags);
  264. work->worker = worker;
  265. work->func(work);
  266. atomic_dec(&worker->num_pending);
  267. /*
  268. * unless this is an ordered work queue,
  269. * 'work' was probably freed by func above.
  270. */
  271. run_ordered_completions(worker->workers, work);
  272. check_pending_worker_creates(worker);
  273. }
  274. spin_lock_irq(&worker->lock);
  275. check_idle_worker(worker);
  276. if (freezing(current)) {
  277. worker->working = 0;
  278. spin_unlock_irq(&worker->lock);
  279. refrigerator();
  280. } else {
  281. spin_unlock_irq(&worker->lock);
  282. if (!kthread_should_stop()) {
  283. cpu_relax();
  284. /*
  285. * we've dropped the lock, did someone else
  286. * jump_in?
  287. */
  288. smp_mb();
  289. if (!list_empty(&worker->pending) ||
  290. !list_empty(&worker->prio_pending))
  291. continue;
  292. /*
  293. * this short schedule allows more work to
  294. * come in without the queue functions
  295. * needing to go through wake_up_process()
  296. *
  297. * worker->working is still 1, so nobody
  298. * is going to try and wake us up
  299. */
  300. schedule_timeout(1);
  301. smp_mb();
  302. if (!list_empty(&worker->pending) ||
  303. !list_empty(&worker->prio_pending))
  304. continue;
  305. if (kthread_should_stop())
  306. break;
  307. /* still no more work?, sleep for real */
  308. spin_lock_irq(&worker->lock);
  309. set_current_state(TASK_INTERRUPTIBLE);
  310. if (!list_empty(&worker->pending) ||
  311. !list_empty(&worker->prio_pending)) {
  312. spin_unlock_irq(&worker->lock);
  313. goto again;
  314. }
  315. /*
  316. * this makes sure we get a wakeup when someone
  317. * adds something new to the queue
  318. */
  319. worker->working = 0;
  320. spin_unlock_irq(&worker->lock);
  321. if (!kthread_should_stop()) {
  322. schedule_timeout(HZ * 120);
  323. if (!worker->working &&
  324. try_worker_shutdown(worker)) {
  325. return 0;
  326. }
  327. }
  328. }
  329. __set_current_state(TASK_RUNNING);
  330. }
  331. } while (!kthread_should_stop());
  332. return 0;
  333. }
  334. /*
  335. * this will wait for all the worker threads to shutdown
  336. */
  337. int btrfs_stop_workers(struct btrfs_workers *workers)
  338. {
  339. struct list_head *cur;
  340. struct btrfs_worker_thread *worker;
  341. int can_stop;
  342. spin_lock_irq(&workers->lock);
  343. list_splice_init(&workers->idle_list, &workers->worker_list);
  344. while (!list_empty(&workers->worker_list)) {
  345. cur = workers->worker_list.next;
  346. worker = list_entry(cur, struct btrfs_worker_thread,
  347. worker_list);
  348. atomic_inc(&worker->refs);
  349. workers->num_workers -= 1;
  350. if (!list_empty(&worker->worker_list)) {
  351. list_del_init(&worker->worker_list);
  352. put_worker(worker);
  353. can_stop = 1;
  354. } else
  355. can_stop = 0;
  356. spin_unlock_irq(&workers->lock);
  357. if (can_stop)
  358. kthread_stop(worker->task);
  359. spin_lock_irq(&workers->lock);
  360. put_worker(worker);
  361. }
  362. spin_unlock_irq(&workers->lock);
  363. return 0;
  364. }
  365. /*
  366. * simple init on struct btrfs_workers
  367. */
  368. void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
  369. struct btrfs_workers *async_helper)
  370. {
  371. workers->num_workers = 0;
  372. workers->num_workers_starting = 0;
  373. INIT_LIST_HEAD(&workers->worker_list);
  374. INIT_LIST_HEAD(&workers->idle_list);
  375. INIT_LIST_HEAD(&workers->order_list);
  376. INIT_LIST_HEAD(&workers->prio_order_list);
  377. spin_lock_init(&workers->lock);
  378. spin_lock_init(&workers->order_lock);
  379. workers->max_workers = max;
  380. workers->idle_thresh = 32;
  381. workers->name = name;
  382. workers->ordered = 0;
  383. workers->atomic_start_pending = 0;
  384. workers->atomic_worker_start = async_helper;
  385. }
  386. /*
  387. * starts new worker threads. This does not enforce the max worker
  388. * count in case you need to temporarily go past it.
  389. */
  390. static int __btrfs_start_workers(struct btrfs_workers *workers,
  391. int num_workers)
  392. {
  393. struct btrfs_worker_thread *worker;
  394. int ret = 0;
  395. int i;
  396. for (i = 0; i < num_workers; i++) {
  397. worker = kzalloc(sizeof(*worker), GFP_NOFS);
  398. if (!worker) {
  399. ret = -ENOMEM;
  400. goto fail;
  401. }
  402. INIT_LIST_HEAD(&worker->pending);
  403. INIT_LIST_HEAD(&worker->prio_pending);
  404. INIT_LIST_HEAD(&worker->worker_list);
  405. spin_lock_init(&worker->lock);
  406. atomic_set(&worker->num_pending, 0);
  407. atomic_set(&worker->refs, 1);
  408. worker->workers = workers;
  409. worker->task = kthread_run(worker_loop, worker,
  410. "btrfs-%s-%d", workers->name,
  411. workers->num_workers + i);
  412. if (IS_ERR(worker->task)) {
  413. ret = PTR_ERR(worker->task);
  414. kfree(worker);
  415. goto fail;
  416. }
  417. spin_lock_irq(&workers->lock);
  418. list_add_tail(&worker->worker_list, &workers->idle_list);
  419. worker->idle = 1;
  420. workers->num_workers++;
  421. workers->num_workers_starting--;
  422. WARN_ON(workers->num_workers_starting < 0);
  423. spin_unlock_irq(&workers->lock);
  424. }
  425. return 0;
  426. fail:
  427. btrfs_stop_workers(workers);
  428. return ret;
  429. }
  430. int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
  431. {
  432. spin_lock_irq(&workers->lock);
  433. workers->num_workers_starting += num_workers;
  434. spin_unlock_irq(&workers->lock);
  435. return __btrfs_start_workers(workers, num_workers);
  436. }
  437. /*
  438. * run through the list and find a worker thread that doesn't have a lot
  439. * to do right now. This can return null if we aren't yet at the thread
  440. * count limit and all of the threads are busy.
  441. */
  442. static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
  443. {
  444. struct btrfs_worker_thread *worker;
  445. struct list_head *next;
  446. int enforce_min;
  447. enforce_min = (workers->num_workers + workers->num_workers_starting) <
  448. workers->max_workers;
  449. /*
  450. * if we find an idle thread, don't move it to the end of the
  451. * idle list. This improves the chance that the next submission
  452. * will reuse the same thread, and maybe catch it while it is still
  453. * working
  454. */
  455. if (!list_empty(&workers->idle_list)) {
  456. next = workers->idle_list.next;
  457. worker = list_entry(next, struct btrfs_worker_thread,
  458. worker_list);
  459. return worker;
  460. }
  461. if (enforce_min || list_empty(&workers->worker_list))
  462. return NULL;
  463. /*
  464. * if we pick a busy task, move the task to the end of the list.
  465. * hopefully this will keep things somewhat evenly balanced.
  466. * Do the move in batches based on the sequence number. This groups
  467. * requests submitted at roughly the same time onto the same worker.
  468. */
  469. next = workers->worker_list.next;
  470. worker = list_entry(next, struct btrfs_worker_thread, worker_list);
  471. worker->sequence++;
  472. if (worker->sequence % workers->idle_thresh == 0)
  473. list_move_tail(next, &workers->worker_list);
  474. return worker;
  475. }
  476. /*
  477. * selects a worker thread to take the next job. This will either find
  478. * an idle worker, start a new worker up to the max count, or just return
  479. * one of the existing busy workers.
  480. */
  481. static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
  482. {
  483. struct btrfs_worker_thread *worker;
  484. unsigned long flags;
  485. struct list_head *fallback;
  486. again:
  487. spin_lock_irqsave(&workers->lock, flags);
  488. worker = next_worker(workers);
  489. if (!worker) {
  490. if (workers->num_workers + workers->num_workers_starting >=
  491. workers->max_workers) {
  492. goto fallback;
  493. } else if (workers->atomic_worker_start) {
  494. workers->atomic_start_pending = 1;
  495. goto fallback;
  496. } else {
  497. workers->num_workers_starting++;
  498. spin_unlock_irqrestore(&workers->lock, flags);
  499. /* we're below the limit, start another worker */
  500. __btrfs_start_workers(workers, 1);
  501. goto again;
  502. }
  503. }
  504. goto found;
  505. fallback:
  506. fallback = NULL;
  507. /*
  508. * we have failed to find any workers, just
  509. * return the first one we can find.
  510. */
  511. if (!list_empty(&workers->worker_list))
  512. fallback = workers->worker_list.next;
  513. if (!list_empty(&workers->idle_list))
  514. fallback = workers->idle_list.next;
  515. BUG_ON(!fallback);
  516. worker = list_entry(fallback,
  517. struct btrfs_worker_thread, worker_list);
  518. found:
  519. /*
  520. * this makes sure the worker doesn't exit before it is placed
  521. * onto a busy/idle list
  522. */
  523. atomic_inc(&worker->num_pending);
  524. spin_unlock_irqrestore(&workers->lock, flags);
  525. return worker;
  526. }
  527. /*
  528. * btrfs_requeue_work just puts the work item back on the tail of the list
  529. * it was taken from. It is intended for use with long running work functions
  530. * that make some progress and want to give the cpu up for others.
  531. */
  532. int btrfs_requeue_work(struct btrfs_work *work)
  533. {
  534. struct btrfs_worker_thread *worker = work->worker;
  535. unsigned long flags;
  536. int wake = 0;
  537. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  538. goto out;
  539. spin_lock_irqsave(&worker->lock, flags);
  540. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  541. list_add_tail(&work->list, &worker->prio_pending);
  542. else
  543. list_add_tail(&work->list, &worker->pending);
  544. atomic_inc(&worker->num_pending);
  545. /* by definition we're busy, take ourselves off the idle
  546. * list
  547. */
  548. if (worker->idle) {
  549. spin_lock(&worker->workers->lock);
  550. worker->idle = 0;
  551. list_move_tail(&worker->worker_list,
  552. &worker->workers->worker_list);
  553. spin_unlock(&worker->workers->lock);
  554. }
  555. if (!worker->working) {
  556. wake = 1;
  557. worker->working = 1;
  558. }
  559. if (wake)
  560. wake_up_process(worker->task);
  561. spin_unlock_irqrestore(&worker->lock, flags);
  562. out:
  563. return 0;
  564. }
  565. void btrfs_set_work_high_prio(struct btrfs_work *work)
  566. {
  567. set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
  568. }
  569. /*
  570. * places a struct btrfs_work into the pending queue of one of the kthreads
  571. */
  572. int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
  573. {
  574. struct btrfs_worker_thread *worker;
  575. unsigned long flags;
  576. int wake = 0;
  577. /* don't requeue something already on a list */
  578. if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
  579. goto out;
  580. worker = find_worker(workers);
  581. if (workers->ordered) {
  582. /*
  583. * you're not allowed to do ordered queues from an
  584. * interrupt handler
  585. */
  586. spin_lock(&workers->order_lock);
  587. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
  588. list_add_tail(&work->order_list,
  589. &workers->prio_order_list);
  590. } else {
  591. list_add_tail(&work->order_list, &workers->order_list);
  592. }
  593. spin_unlock(&workers->order_lock);
  594. } else {
  595. INIT_LIST_HEAD(&work->order_list);
  596. }
  597. spin_lock_irqsave(&worker->lock, flags);
  598. if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
  599. list_add_tail(&work->list, &worker->prio_pending);
  600. else
  601. list_add_tail(&work->list, &worker->pending);
  602. check_busy_worker(worker);
  603. /*
  604. * avoid calling into wake_up_process if this thread has already
  605. * been kicked
  606. */
  607. if (!worker->working)
  608. wake = 1;
  609. worker->working = 1;
  610. if (wake)
  611. wake_up_process(worker->task);
  612. spin_unlock_irqrestore(&worker->lock, flags);
  613. out:
  614. return 0;
  615. }