workqueue.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. /*
  2. * linux/kernel/workqueue.c
  3. *
  4. * Generic mechanism for defining kernel helper threads for running
  5. * arbitrary tasks in process context.
  6. *
  7. * Started by Ingo Molnar, Copyright (C) 2002
  8. *
  9. * Derived from the taskqueue/keventd code by:
  10. *
  11. * David Woodhouse <dwmw2@infradead.org>
  12. * Andrew Morton <andrewm@uow.edu.au>
  13. * Kai Petzke <wpp@marie.physik.tu-berlin.de>
  14. * Theodore Ts'o <tytso@mit.edu>
  15. */
  16. #include <linux/module.h>
  17. #include <linux/kernel.h>
  18. #include <linux/sched.h>
  19. #include <linux/init.h>
  20. #include <linux/signal.h>
  21. #include <linux/completion.h>
  22. #include <linux/workqueue.h>
  23. #include <linux/slab.h>
  24. #include <linux/cpu.h>
  25. #include <linux/notifier.h>
  26. #include <linux/kthread.h>
  27. /*
  28. * The per-CPU workqueue (if single thread, we always use cpu 0's).
  29. *
  30. * The sequence counters are for flush_scheduled_work(). It wants to wait
  31. * until until all currently-scheduled works are completed, but it doesn't
  32. * want to be livelocked by new, incoming ones. So it waits until
  33. * remove_sequence is >= the insert_sequence which pertained when
  34. * flush_scheduled_work() was called.
  35. */
  36. struct cpu_workqueue_struct {
  37. spinlock_t lock;
  38. long remove_sequence; /* Least-recently added (next to run) */
  39. long insert_sequence; /* Next to add */
  40. struct list_head worklist;
  41. wait_queue_head_t more_work;
  42. wait_queue_head_t work_done;
  43. struct workqueue_struct *wq;
  44. task_t *thread;
  45. int run_depth; /* Detect run_workqueue() recursion depth */
  46. } ____cacheline_aligned;
  47. /*
  48. * The externally visible workqueue abstraction is an array of
  49. * per-CPU workqueues:
  50. */
  51. struct workqueue_struct {
  52. struct cpu_workqueue_struct cpu_wq[NR_CPUS];
  53. const char *name;
  54. struct list_head list; /* Empty if single thread */
  55. };
  56. /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
  57. threads to each one as cpus come/go. */
  58. static DEFINE_SPINLOCK(workqueue_lock);
  59. static LIST_HEAD(workqueues);
  60. /* If it's single threaded, it isn't in the list of workqueues. */
  61. static inline int is_single_threaded(struct workqueue_struct *wq)
  62. {
  63. return list_empty(&wq->list);
  64. }
  65. /* Preempt must be disabled. */
  66. static void __queue_work(struct cpu_workqueue_struct *cwq,
  67. struct work_struct *work)
  68. {
  69. unsigned long flags;
  70. spin_lock_irqsave(&cwq->lock, flags);
  71. work->wq_data = cwq;
  72. list_add_tail(&work->entry, &cwq->worklist);
  73. cwq->insert_sequence++;
  74. wake_up(&cwq->more_work);
  75. spin_unlock_irqrestore(&cwq->lock, flags);
  76. }
  77. /*
  78. * Queue work on a workqueue. Return non-zero if it was successfully
  79. * added.
  80. *
  81. * We queue the work to the CPU it was submitted, but there is no
  82. * guarantee that it will be processed by that CPU.
  83. */
  84. int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
  85. {
  86. int ret = 0, cpu = get_cpu();
  87. if (!test_and_set_bit(0, &work->pending)) {
  88. if (unlikely(is_single_threaded(wq)))
  89. cpu = 0;
  90. BUG_ON(!list_empty(&work->entry));
  91. __queue_work(wq->cpu_wq + cpu, work);
  92. ret = 1;
  93. }
  94. put_cpu();
  95. return ret;
  96. }
  97. static void delayed_work_timer_fn(unsigned long __data)
  98. {
  99. struct work_struct *work = (struct work_struct *)__data;
  100. struct workqueue_struct *wq = work->wq_data;
  101. int cpu = smp_processor_id();
  102. if (unlikely(is_single_threaded(wq)))
  103. cpu = 0;
  104. __queue_work(wq->cpu_wq + cpu, work);
  105. }
  106. int fastcall queue_delayed_work(struct workqueue_struct *wq,
  107. struct work_struct *work, unsigned long delay)
  108. {
  109. int ret = 0;
  110. struct timer_list *timer = &work->timer;
  111. if (!test_and_set_bit(0, &work->pending)) {
  112. BUG_ON(timer_pending(timer));
  113. BUG_ON(!list_empty(&work->entry));
  114. /* This stores wq for the moment, for the timer_fn */
  115. work->wq_data = wq;
  116. timer->expires = jiffies + delay;
  117. timer->data = (unsigned long)work;
  118. timer->function = delayed_work_timer_fn;
  119. add_timer(timer);
  120. ret = 1;
  121. }
  122. return ret;
  123. }
  124. static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
  125. {
  126. unsigned long flags;
  127. /*
  128. * Keep taking off work from the queue until
  129. * done.
  130. */
  131. spin_lock_irqsave(&cwq->lock, flags);
  132. cwq->run_depth++;
  133. if (cwq->run_depth > 3) {
  134. /* morton gets to eat his hat */
  135. printk("%s: recursion depth exceeded: %d\n",
  136. __FUNCTION__, cwq->run_depth);
  137. dump_stack();
  138. }
  139. while (!list_empty(&cwq->worklist)) {
  140. struct work_struct *work = list_entry(cwq->worklist.next,
  141. struct work_struct, entry);
  142. void (*f) (void *) = work->func;
  143. void *data = work->data;
  144. list_del_init(cwq->worklist.next);
  145. spin_unlock_irqrestore(&cwq->lock, flags);
  146. BUG_ON(work->wq_data != cwq);
  147. clear_bit(0, &work->pending);
  148. f(data);
  149. spin_lock_irqsave(&cwq->lock, flags);
  150. cwq->remove_sequence++;
  151. wake_up(&cwq->work_done);
  152. }
  153. cwq->run_depth--;
  154. spin_unlock_irqrestore(&cwq->lock, flags);
  155. }
  156. static int worker_thread(void *__cwq)
  157. {
  158. struct cpu_workqueue_struct *cwq = __cwq;
  159. DECLARE_WAITQUEUE(wait, current);
  160. struct k_sigaction sa;
  161. sigset_t blocked;
  162. current->flags |= PF_NOFREEZE;
  163. set_user_nice(current, -5);
  164. /* Block and flush all signals */
  165. sigfillset(&blocked);
  166. sigprocmask(SIG_BLOCK, &blocked, NULL);
  167. flush_signals(current);
  168. /* SIG_IGN makes children autoreap: see do_notify_parent(). */
  169. sa.sa.sa_handler = SIG_IGN;
  170. sa.sa.sa_flags = 0;
  171. siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
  172. do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
  173. set_current_state(TASK_INTERRUPTIBLE);
  174. while (!kthread_should_stop()) {
  175. add_wait_queue(&cwq->more_work, &wait);
  176. if (list_empty(&cwq->worklist))
  177. schedule();
  178. else
  179. __set_current_state(TASK_RUNNING);
  180. remove_wait_queue(&cwq->more_work, &wait);
  181. if (!list_empty(&cwq->worklist))
  182. run_workqueue(cwq);
  183. set_current_state(TASK_INTERRUPTIBLE);
  184. }
  185. __set_current_state(TASK_RUNNING);
  186. return 0;
  187. }
  188. static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
  189. {
  190. if (cwq->thread == current) {
  191. /*
  192. * Probably keventd trying to flush its own queue. So simply run
  193. * it by hand rather than deadlocking.
  194. */
  195. run_workqueue(cwq);
  196. } else {
  197. DEFINE_WAIT(wait);
  198. long sequence_needed;
  199. spin_lock_irq(&cwq->lock);
  200. sequence_needed = cwq->insert_sequence;
  201. while (sequence_needed - cwq->remove_sequence > 0) {
  202. prepare_to_wait(&cwq->work_done, &wait,
  203. TASK_UNINTERRUPTIBLE);
  204. spin_unlock_irq(&cwq->lock);
  205. schedule();
  206. spin_lock_irq(&cwq->lock);
  207. }
  208. finish_wait(&cwq->work_done, &wait);
  209. spin_unlock_irq(&cwq->lock);
  210. }
  211. }
  212. /*
  213. * flush_workqueue - ensure that any scheduled work has run to completion.
  214. *
  215. * Forces execution of the workqueue and blocks until its completion.
  216. * This is typically used in driver shutdown handlers.
  217. *
  218. * This function will sample each workqueue's current insert_sequence number and
  219. * will sleep until the head sequence is greater than or equal to that. This
  220. * means that we sleep until all works which were queued on entry have been
  221. * handled, but we are not livelocked by new incoming ones.
  222. *
  223. * This function used to run the workqueues itself. Now we just wait for the
  224. * helper threads to do it.
  225. */
  226. void fastcall flush_workqueue(struct workqueue_struct *wq)
  227. {
  228. might_sleep();
  229. if (is_single_threaded(wq)) {
  230. /* Always use cpu 0's area. */
  231. flush_cpu_workqueue(wq->cpu_wq + 0);
  232. } else {
  233. int cpu;
  234. lock_cpu_hotplug();
  235. for_each_online_cpu(cpu)
  236. flush_cpu_workqueue(wq->cpu_wq + cpu);
  237. unlock_cpu_hotplug();
  238. }
  239. }
  240. static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
  241. int cpu)
  242. {
  243. struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
  244. struct task_struct *p;
  245. spin_lock_init(&cwq->lock);
  246. cwq->wq = wq;
  247. cwq->thread = NULL;
  248. cwq->insert_sequence = 0;
  249. cwq->remove_sequence = 0;
  250. INIT_LIST_HEAD(&cwq->worklist);
  251. init_waitqueue_head(&cwq->more_work);
  252. init_waitqueue_head(&cwq->work_done);
  253. if (is_single_threaded(wq))
  254. p = kthread_create(worker_thread, cwq, "%s", wq->name);
  255. else
  256. p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
  257. if (IS_ERR(p))
  258. return NULL;
  259. cwq->thread = p;
  260. return p;
  261. }
  262. struct workqueue_struct *__create_workqueue(const char *name,
  263. int singlethread)
  264. {
  265. int cpu, destroy = 0;
  266. struct workqueue_struct *wq;
  267. struct task_struct *p;
  268. wq = kzalloc(sizeof(*wq), GFP_KERNEL);
  269. if (!wq)
  270. return NULL;
  271. wq->name = name;
  272. /* We don't need the distraction of CPUs appearing and vanishing. */
  273. lock_cpu_hotplug();
  274. if (singlethread) {
  275. INIT_LIST_HEAD(&wq->list);
  276. p = create_workqueue_thread(wq, 0);
  277. if (!p)
  278. destroy = 1;
  279. else
  280. wake_up_process(p);
  281. } else {
  282. spin_lock(&workqueue_lock);
  283. list_add(&wq->list, &workqueues);
  284. spin_unlock(&workqueue_lock);
  285. for_each_online_cpu(cpu) {
  286. p = create_workqueue_thread(wq, cpu);
  287. if (p) {
  288. kthread_bind(p, cpu);
  289. wake_up_process(p);
  290. } else
  291. destroy = 1;
  292. }
  293. }
  294. unlock_cpu_hotplug();
  295. /*
  296. * Was there any error during startup? If yes then clean up:
  297. */
  298. if (destroy) {
  299. destroy_workqueue(wq);
  300. wq = NULL;
  301. }
  302. return wq;
  303. }
  304. static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
  305. {
  306. struct cpu_workqueue_struct *cwq;
  307. unsigned long flags;
  308. struct task_struct *p;
  309. cwq = wq->cpu_wq + cpu;
  310. spin_lock_irqsave(&cwq->lock, flags);
  311. p = cwq->thread;
  312. cwq->thread = NULL;
  313. spin_unlock_irqrestore(&cwq->lock, flags);
  314. if (p)
  315. kthread_stop(p);
  316. }
  317. void destroy_workqueue(struct workqueue_struct *wq)
  318. {
  319. int cpu;
  320. flush_workqueue(wq);
  321. /* We don't need the distraction of CPUs appearing and vanishing. */
  322. lock_cpu_hotplug();
  323. if (is_single_threaded(wq))
  324. cleanup_workqueue_thread(wq, 0);
  325. else {
  326. for_each_online_cpu(cpu)
  327. cleanup_workqueue_thread(wq, cpu);
  328. spin_lock(&workqueue_lock);
  329. list_del(&wq->list);
  330. spin_unlock(&workqueue_lock);
  331. }
  332. unlock_cpu_hotplug();
  333. kfree(wq);
  334. }
  335. static struct workqueue_struct *keventd_wq;
  336. int fastcall schedule_work(struct work_struct *work)
  337. {
  338. return queue_work(keventd_wq, work);
  339. }
  340. int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
  341. {
  342. return queue_delayed_work(keventd_wq, work, delay);
  343. }
  344. int schedule_delayed_work_on(int cpu,
  345. struct work_struct *work, unsigned long delay)
  346. {
  347. int ret = 0;
  348. struct timer_list *timer = &work->timer;
  349. if (!test_and_set_bit(0, &work->pending)) {
  350. BUG_ON(timer_pending(timer));
  351. BUG_ON(!list_empty(&work->entry));
  352. /* This stores keventd_wq for the moment, for the timer_fn */
  353. work->wq_data = keventd_wq;
  354. timer->expires = jiffies + delay;
  355. timer->data = (unsigned long)work;
  356. timer->function = delayed_work_timer_fn;
  357. add_timer_on(timer, cpu);
  358. ret = 1;
  359. }
  360. return ret;
  361. }
  362. void flush_scheduled_work(void)
  363. {
  364. flush_workqueue(keventd_wq);
  365. }
  366. /**
  367. * cancel_rearming_delayed_workqueue - reliably kill off a delayed
  368. * work whose handler rearms the delayed work.
  369. * @wq: the controlling workqueue structure
  370. * @work: the delayed work struct
  371. */
  372. void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
  373. struct work_struct *work)
  374. {
  375. while (!cancel_delayed_work(work))
  376. flush_workqueue(wq);
  377. }
  378. EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
  379. /**
  380. * cancel_rearming_delayed_work - reliably kill off a delayed keventd
  381. * work whose handler rearms the delayed work.
  382. * @work: the delayed work struct
  383. */
  384. void cancel_rearming_delayed_work(struct work_struct *work)
  385. {
  386. cancel_rearming_delayed_workqueue(keventd_wq, work);
  387. }
  388. EXPORT_SYMBOL(cancel_rearming_delayed_work);
  389. int keventd_up(void)
  390. {
  391. return keventd_wq != NULL;
  392. }
  393. int current_is_keventd(void)
  394. {
  395. struct cpu_workqueue_struct *cwq;
  396. int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */
  397. int ret = 0;
  398. BUG_ON(!keventd_wq);
  399. cwq = keventd_wq->cpu_wq + cpu;
  400. if (current == cwq->thread)
  401. ret = 1;
  402. return ret;
  403. }
  404. #ifdef CONFIG_HOTPLUG_CPU
  405. /* Take the work from this (downed) CPU. */
  406. static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
  407. {
  408. struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
  409. LIST_HEAD(list);
  410. struct work_struct *work;
  411. spin_lock_irq(&cwq->lock);
  412. list_splice_init(&cwq->worklist, &list);
  413. while (!list_empty(&list)) {
  414. printk("Taking work for %s\n", wq->name);
  415. work = list_entry(list.next,struct work_struct,entry);
  416. list_del(&work->entry);
  417. __queue_work(wq->cpu_wq + smp_processor_id(), work);
  418. }
  419. spin_unlock_irq(&cwq->lock);
  420. }
  421. /* We're holding the cpucontrol mutex here */
  422. static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
  423. unsigned long action,
  424. void *hcpu)
  425. {
  426. unsigned int hotcpu = (unsigned long)hcpu;
  427. struct workqueue_struct *wq;
  428. switch (action) {
  429. case CPU_UP_PREPARE:
  430. /* Create a new workqueue thread for it. */
  431. list_for_each_entry(wq, &workqueues, list) {
  432. if (!create_workqueue_thread(wq, hotcpu)) {
  433. printk("workqueue for %i failed\n", hotcpu);
  434. return NOTIFY_BAD;
  435. }
  436. }
  437. break;
  438. case CPU_ONLINE:
  439. /* Kick off worker threads. */
  440. list_for_each_entry(wq, &workqueues, list) {
  441. kthread_bind(wq->cpu_wq[hotcpu].thread, hotcpu);
  442. wake_up_process(wq->cpu_wq[hotcpu].thread);
  443. }
  444. break;
  445. case CPU_UP_CANCELED:
  446. list_for_each_entry(wq, &workqueues, list) {
  447. /* Unbind so it can run. */
  448. kthread_bind(wq->cpu_wq[hotcpu].thread,
  449. smp_processor_id());
  450. cleanup_workqueue_thread(wq, hotcpu);
  451. }
  452. break;
  453. case CPU_DEAD:
  454. list_for_each_entry(wq, &workqueues, list)
  455. cleanup_workqueue_thread(wq, hotcpu);
  456. list_for_each_entry(wq, &workqueues, list)
  457. take_over_work(wq, hotcpu);
  458. break;
  459. }
  460. return NOTIFY_OK;
  461. }
  462. #endif
  463. void init_workqueues(void)
  464. {
  465. hotcpu_notifier(workqueue_cpu_callback, 0);
  466. keventd_wq = create_workqueue("events");
  467. BUG_ON(!keventd_wq);
  468. }
  469. EXPORT_SYMBOL_GPL(__create_workqueue);
  470. EXPORT_SYMBOL_GPL(queue_work);
  471. EXPORT_SYMBOL_GPL(queue_delayed_work);
  472. EXPORT_SYMBOL_GPL(flush_workqueue);
  473. EXPORT_SYMBOL_GPL(destroy_workqueue);
  474. EXPORT_SYMBOL(schedule_work);
  475. EXPORT_SYMBOL(schedule_delayed_work);
  476. EXPORT_SYMBOL(schedule_delayed_work_on);
  477. EXPORT_SYMBOL(flush_scheduled_work);