workqueue.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. /*
  2. * linux/kernel/workqueue.c
  3. *
  4. * Generic mechanism for defining kernel helper threads for running
  5. * arbitrary tasks in process context.
  6. *
  7. * Started by Ingo Molnar, Copyright (C) 2002
  8. *
  9. * Derived from the taskqueue/keventd code by:
  10. *
  11. * David Woodhouse <dwmw2@infradead.org>
  12. * Andrew Morton <andrewm@uow.edu.au>
  13. * Kai Petzke <wpp@marie.physik.tu-berlin.de>
  14. * Theodore Ts'o <tytso@mit.edu>
  15. *
  16. * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
  17. */
  18. #include <linux/module.h>
  19. #include <linux/kernel.h>
  20. #include <linux/sched.h>
  21. #include <linux/init.h>
  22. #include <linux/signal.h>
  23. #include <linux/completion.h>
  24. #include <linux/workqueue.h>
  25. #include <linux/slab.h>
  26. #include <linux/cpu.h>
  27. #include <linux/notifier.h>
  28. #include <linux/kthread.h>
  29. /*
  30. * The per-CPU workqueue (if single thread, we always use the first
  31. * possible cpu).
  32. *
  33. * The sequence counters are for flush_scheduled_work(). It wants to wait
  34. * until until all currently-scheduled works are completed, but it doesn't
  35. * want to be livelocked by new, incoming ones. So it waits until
  36. * remove_sequence is >= the insert_sequence which pertained when
  37. * flush_scheduled_work() was called.
  38. */
  39. struct cpu_workqueue_struct {
  40. spinlock_t lock;
  41. long remove_sequence; /* Least-recently added (next to run) */
  42. long insert_sequence; /* Next to add */
  43. struct list_head worklist;
  44. wait_queue_head_t more_work;
  45. wait_queue_head_t work_done;
  46. struct workqueue_struct *wq;
  47. task_t *thread;
  48. int run_depth; /* Detect run_workqueue() recursion depth */
  49. } ____cacheline_aligned;
  50. /*
  51. * The externally visible workqueue abstraction is an array of
  52. * per-CPU workqueues:
  53. */
  54. struct workqueue_struct {
  55. struct cpu_workqueue_struct *cpu_wq;
  56. const char *name;
  57. struct list_head list; /* Empty if single thread */
  58. };
  59. /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
  60. threads to each one as cpus come/go. */
  61. static DEFINE_SPINLOCK(workqueue_lock);
  62. static LIST_HEAD(workqueues);
  63. static int singlethread_cpu;
  64. /* If it's single threaded, it isn't in the list of workqueues. */
  65. static inline int is_single_threaded(struct workqueue_struct *wq)
  66. {
  67. return list_empty(&wq->list);
  68. }
  69. /* Preempt must be disabled. */
  70. static void __queue_work(struct cpu_workqueue_struct *cwq,
  71. struct work_struct *work)
  72. {
  73. unsigned long flags;
  74. spin_lock_irqsave(&cwq->lock, flags);
  75. work->wq_data = cwq;
  76. list_add_tail(&work->entry, &cwq->worklist);
  77. cwq->insert_sequence++;
  78. wake_up(&cwq->more_work);
  79. spin_unlock_irqrestore(&cwq->lock, flags);
  80. }
  81. /*
  82. * Queue work on a workqueue. Return non-zero if it was successfully
  83. * added.
  84. *
  85. * We queue the work to the CPU it was submitted, but there is no
  86. * guarantee that it will be processed by that CPU.
  87. */
  88. int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
  89. {
  90. int ret = 0, cpu = get_cpu();
  91. if (!test_and_set_bit(0, &work->pending)) {
  92. if (unlikely(is_single_threaded(wq)))
  93. cpu = singlethread_cpu;
  94. BUG_ON(!list_empty(&work->entry));
  95. __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
  96. ret = 1;
  97. }
  98. put_cpu();
  99. return ret;
  100. }
  101. static void delayed_work_timer_fn(unsigned long __data)
  102. {
  103. struct work_struct *work = (struct work_struct *)__data;
  104. struct workqueue_struct *wq = work->wq_data;
  105. int cpu = smp_processor_id();
  106. if (unlikely(is_single_threaded(wq)))
  107. cpu = singlethread_cpu;
  108. __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
  109. }
  110. int fastcall queue_delayed_work(struct workqueue_struct *wq,
  111. struct work_struct *work, unsigned long delay)
  112. {
  113. int ret = 0;
  114. struct timer_list *timer = &work->timer;
  115. if (!test_and_set_bit(0, &work->pending)) {
  116. BUG_ON(timer_pending(timer));
  117. BUG_ON(!list_empty(&work->entry));
  118. /* This stores wq for the moment, for the timer_fn */
  119. work->wq_data = wq;
  120. timer->expires = jiffies + delay;
  121. timer->data = (unsigned long)work;
  122. timer->function = delayed_work_timer_fn;
  123. add_timer(timer);
  124. ret = 1;
  125. }
  126. return ret;
  127. }
  128. static void run_workqueue(struct cpu_workqueue_struct *cwq)
  129. {
  130. unsigned long flags;
  131. /*
  132. * Keep taking off work from the queue until
  133. * done.
  134. */
  135. spin_lock_irqsave(&cwq->lock, flags);
  136. cwq->run_depth++;
  137. if (cwq->run_depth > 3) {
  138. /* morton gets to eat his hat */
  139. printk("%s: recursion depth exceeded: %d\n",
  140. __FUNCTION__, cwq->run_depth);
  141. dump_stack();
  142. }
  143. while (!list_empty(&cwq->worklist)) {
  144. struct work_struct *work = list_entry(cwq->worklist.next,
  145. struct work_struct, entry);
  146. void (*f) (void *) = work->func;
  147. void *data = work->data;
  148. list_del_init(cwq->worklist.next);
  149. spin_unlock_irqrestore(&cwq->lock, flags);
  150. BUG_ON(work->wq_data != cwq);
  151. clear_bit(0, &work->pending);
  152. f(data);
  153. spin_lock_irqsave(&cwq->lock, flags);
  154. cwq->remove_sequence++;
  155. wake_up(&cwq->work_done);
  156. }
  157. cwq->run_depth--;
  158. spin_unlock_irqrestore(&cwq->lock, flags);
  159. }
  160. static int worker_thread(void *__cwq)
  161. {
  162. struct cpu_workqueue_struct *cwq = __cwq;
  163. DECLARE_WAITQUEUE(wait, current);
  164. struct k_sigaction sa;
  165. sigset_t blocked;
  166. current->flags |= PF_NOFREEZE;
  167. set_user_nice(current, -5);
  168. /* Block and flush all signals */
  169. sigfillset(&blocked);
  170. sigprocmask(SIG_BLOCK, &blocked, NULL);
  171. flush_signals(current);
  172. /* SIG_IGN makes children autoreap: see do_notify_parent(). */
  173. sa.sa.sa_handler = SIG_IGN;
  174. sa.sa.sa_flags = 0;
  175. siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
  176. do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
  177. set_current_state(TASK_INTERRUPTIBLE);
  178. while (!kthread_should_stop()) {
  179. add_wait_queue(&cwq->more_work, &wait);
  180. if (list_empty(&cwq->worklist))
  181. schedule();
  182. else
  183. __set_current_state(TASK_RUNNING);
  184. remove_wait_queue(&cwq->more_work, &wait);
  185. if (!list_empty(&cwq->worklist))
  186. run_workqueue(cwq);
  187. set_current_state(TASK_INTERRUPTIBLE);
  188. }
  189. __set_current_state(TASK_RUNNING);
  190. return 0;
  191. }
  192. static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
  193. {
  194. if (cwq->thread == current) {
  195. /*
  196. * Probably keventd trying to flush its own queue. So simply run
  197. * it by hand rather than deadlocking.
  198. */
  199. run_workqueue(cwq);
  200. } else {
  201. DEFINE_WAIT(wait);
  202. long sequence_needed;
  203. spin_lock_irq(&cwq->lock);
  204. sequence_needed = cwq->insert_sequence;
  205. while (sequence_needed - cwq->remove_sequence > 0) {
  206. prepare_to_wait(&cwq->work_done, &wait,
  207. TASK_UNINTERRUPTIBLE);
  208. spin_unlock_irq(&cwq->lock);
  209. schedule();
  210. spin_lock_irq(&cwq->lock);
  211. }
  212. finish_wait(&cwq->work_done, &wait);
  213. spin_unlock_irq(&cwq->lock);
  214. }
  215. }
  216. /*
  217. * flush_workqueue - ensure that any scheduled work has run to completion.
  218. *
  219. * Forces execution of the workqueue and blocks until its completion.
  220. * This is typically used in driver shutdown handlers.
  221. *
  222. * This function will sample each workqueue's current insert_sequence number and
  223. * will sleep until the head sequence is greater than or equal to that. This
  224. * means that we sleep until all works which were queued on entry have been
  225. * handled, but we are not livelocked by new incoming ones.
  226. *
  227. * This function used to run the workqueues itself. Now we just wait for the
  228. * helper threads to do it.
  229. */
  230. void fastcall flush_workqueue(struct workqueue_struct *wq)
  231. {
  232. might_sleep();
  233. if (is_single_threaded(wq)) {
  234. /* Always use first cpu's area. */
  235. flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
  236. } else {
  237. int cpu;
  238. lock_cpu_hotplug();
  239. for_each_online_cpu(cpu)
  240. flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
  241. unlock_cpu_hotplug();
  242. }
  243. }
  244. static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
  245. int cpu)
  246. {
  247. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  248. struct task_struct *p;
  249. spin_lock_init(&cwq->lock);
  250. cwq->wq = wq;
  251. cwq->thread = NULL;
  252. cwq->insert_sequence = 0;
  253. cwq->remove_sequence = 0;
  254. INIT_LIST_HEAD(&cwq->worklist);
  255. init_waitqueue_head(&cwq->more_work);
  256. init_waitqueue_head(&cwq->work_done);
  257. if (is_single_threaded(wq))
  258. p = kthread_create(worker_thread, cwq, "%s", wq->name);
  259. else
  260. p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
  261. if (IS_ERR(p))
  262. return NULL;
  263. cwq->thread = p;
  264. return p;
  265. }
  266. struct workqueue_struct *__create_workqueue(const char *name,
  267. int singlethread)
  268. {
  269. int cpu, destroy = 0;
  270. struct workqueue_struct *wq;
  271. struct task_struct *p;
  272. wq = kzalloc(sizeof(*wq), GFP_KERNEL);
  273. if (!wq)
  274. return NULL;
  275. wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
  276. if (!wq->cpu_wq) {
  277. kfree(wq);
  278. return NULL;
  279. }
  280. wq->name = name;
  281. /* We don't need the distraction of CPUs appearing and vanishing. */
  282. lock_cpu_hotplug();
  283. if (singlethread) {
  284. INIT_LIST_HEAD(&wq->list);
  285. p = create_workqueue_thread(wq, singlethread_cpu);
  286. if (!p)
  287. destroy = 1;
  288. else
  289. wake_up_process(p);
  290. } else {
  291. spin_lock(&workqueue_lock);
  292. list_add(&wq->list, &workqueues);
  293. spin_unlock(&workqueue_lock);
  294. for_each_online_cpu(cpu) {
  295. p = create_workqueue_thread(wq, cpu);
  296. if (p) {
  297. kthread_bind(p, cpu);
  298. wake_up_process(p);
  299. } else
  300. destroy = 1;
  301. }
  302. }
  303. unlock_cpu_hotplug();
  304. /*
  305. * Was there any error during startup? If yes then clean up:
  306. */
  307. if (destroy) {
  308. destroy_workqueue(wq);
  309. wq = NULL;
  310. }
  311. return wq;
  312. }
  313. static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
  314. {
  315. struct cpu_workqueue_struct *cwq;
  316. unsigned long flags;
  317. struct task_struct *p;
  318. cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  319. spin_lock_irqsave(&cwq->lock, flags);
  320. p = cwq->thread;
  321. cwq->thread = NULL;
  322. spin_unlock_irqrestore(&cwq->lock, flags);
  323. if (p)
  324. kthread_stop(p);
  325. }
  326. void destroy_workqueue(struct workqueue_struct *wq)
  327. {
  328. int cpu;
  329. flush_workqueue(wq);
  330. /* We don't need the distraction of CPUs appearing and vanishing. */
  331. lock_cpu_hotplug();
  332. if (is_single_threaded(wq))
  333. cleanup_workqueue_thread(wq, singlethread_cpu);
  334. else {
  335. for_each_online_cpu(cpu)
  336. cleanup_workqueue_thread(wq, cpu);
  337. spin_lock(&workqueue_lock);
  338. list_del(&wq->list);
  339. spin_unlock(&workqueue_lock);
  340. }
  341. unlock_cpu_hotplug();
  342. free_percpu(wq->cpu_wq);
  343. kfree(wq);
  344. }
  345. static struct workqueue_struct *keventd_wq;
  346. int fastcall schedule_work(struct work_struct *work)
  347. {
  348. return queue_work(keventd_wq, work);
  349. }
  350. int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
  351. {
  352. return queue_delayed_work(keventd_wq, work, delay);
  353. }
  354. int schedule_delayed_work_on(int cpu,
  355. struct work_struct *work, unsigned long delay)
  356. {
  357. int ret = 0;
  358. struct timer_list *timer = &work->timer;
  359. if (!test_and_set_bit(0, &work->pending)) {
  360. BUG_ON(timer_pending(timer));
  361. BUG_ON(!list_empty(&work->entry));
  362. /* This stores keventd_wq for the moment, for the timer_fn */
  363. work->wq_data = keventd_wq;
  364. timer->expires = jiffies + delay;
  365. timer->data = (unsigned long)work;
  366. timer->function = delayed_work_timer_fn;
  367. add_timer_on(timer, cpu);
  368. ret = 1;
  369. }
  370. return ret;
  371. }
  372. int schedule_on_each_cpu(void (*func) (void *info), void *info)
  373. {
  374. int cpu;
  375. struct work_struct *work;
  376. work = kmalloc(NR_CPUS * sizeof(struct work_struct), GFP_KERNEL);
  377. if (!work)
  378. return -ENOMEM;
  379. for_each_online_cpu(cpu) {
  380. INIT_WORK(work + cpu, func, info);
  381. __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu),
  382. work + cpu);
  383. }
  384. flush_workqueue(keventd_wq);
  385. kfree(work);
  386. return 0;
  387. }
  388. void flush_scheduled_work(void)
  389. {
  390. flush_workqueue(keventd_wq);
  391. }
  392. /**
  393. * cancel_rearming_delayed_workqueue - reliably kill off a delayed
  394. * work whose handler rearms the delayed work.
  395. * @wq: the controlling workqueue structure
  396. * @work: the delayed work struct
  397. */
  398. void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
  399. struct work_struct *work)
  400. {
  401. while (!cancel_delayed_work(work))
  402. flush_workqueue(wq);
  403. }
  404. EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
  405. /**
  406. * cancel_rearming_delayed_work - reliably kill off a delayed keventd
  407. * work whose handler rearms the delayed work.
  408. * @work: the delayed work struct
  409. */
  410. void cancel_rearming_delayed_work(struct work_struct *work)
  411. {
  412. cancel_rearming_delayed_workqueue(keventd_wq, work);
  413. }
  414. EXPORT_SYMBOL(cancel_rearming_delayed_work);
  415. int keventd_up(void)
  416. {
  417. return keventd_wq != NULL;
  418. }
  419. int current_is_keventd(void)
  420. {
  421. struct cpu_workqueue_struct *cwq;
  422. int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */
  423. int ret = 0;
  424. BUG_ON(!keventd_wq);
  425. cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
  426. if (current == cwq->thread)
  427. ret = 1;
  428. return ret;
  429. }
  430. #ifdef CONFIG_HOTPLUG_CPU
  431. /* Take the work from this (downed) CPU. */
  432. static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
  433. {
  434. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  435. LIST_HEAD(list);
  436. struct work_struct *work;
  437. spin_lock_irq(&cwq->lock);
  438. list_splice_init(&cwq->worklist, &list);
  439. while (!list_empty(&list)) {
  440. printk("Taking work for %s\n", wq->name);
  441. work = list_entry(list.next,struct work_struct,entry);
  442. list_del(&work->entry);
  443. __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
  444. }
  445. spin_unlock_irq(&cwq->lock);
  446. }
  447. /* We're holding the cpucontrol mutex here */
  448. static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
  449. unsigned long action,
  450. void *hcpu)
  451. {
  452. unsigned int hotcpu = (unsigned long)hcpu;
  453. struct workqueue_struct *wq;
  454. switch (action) {
  455. case CPU_UP_PREPARE:
  456. /* Create a new workqueue thread for it. */
  457. list_for_each_entry(wq, &workqueues, list) {
  458. if (!create_workqueue_thread(wq, hotcpu)) {
  459. printk("workqueue for %i failed\n", hotcpu);
  460. return NOTIFY_BAD;
  461. }
  462. }
  463. break;
  464. case CPU_ONLINE:
  465. /* Kick off worker threads. */
  466. list_for_each_entry(wq, &workqueues, list) {
  467. struct cpu_workqueue_struct *cwq;
  468. cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
  469. kthread_bind(cwq->thread, hotcpu);
  470. wake_up_process(cwq->thread);
  471. }
  472. break;
  473. case CPU_UP_CANCELED:
  474. list_for_each_entry(wq, &workqueues, list) {
  475. /* Unbind so it can run. */
  476. kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
  477. any_online_cpu(cpu_online_map));
  478. cleanup_workqueue_thread(wq, hotcpu);
  479. }
  480. break;
  481. case CPU_DEAD:
  482. list_for_each_entry(wq, &workqueues, list)
  483. cleanup_workqueue_thread(wq, hotcpu);
  484. list_for_each_entry(wq, &workqueues, list)
  485. take_over_work(wq, hotcpu);
  486. break;
  487. }
  488. return NOTIFY_OK;
  489. }
  490. #endif
  491. void init_workqueues(void)
  492. {
  493. singlethread_cpu = first_cpu(cpu_possible_map);
  494. hotcpu_notifier(workqueue_cpu_callback, 0);
  495. keventd_wq = create_workqueue("events");
  496. BUG_ON(!keventd_wq);
  497. }
  498. EXPORT_SYMBOL_GPL(__create_workqueue);
  499. EXPORT_SYMBOL_GPL(queue_work);
  500. EXPORT_SYMBOL_GPL(queue_delayed_work);
  501. EXPORT_SYMBOL_GPL(flush_workqueue);
  502. EXPORT_SYMBOL_GPL(destroy_workqueue);
  503. EXPORT_SYMBOL(schedule_work);
  504. EXPORT_SYMBOL(schedule_delayed_work);
  505. EXPORT_SYMBOL(schedule_delayed_work_on);
  506. EXPORT_SYMBOL(flush_scheduled_work);