workqueue.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. /*
  2. * linux/kernel/workqueue.c
  3. *
  4. * Generic mechanism for defining kernel helper threads for running
  5. * arbitrary tasks in process context.
  6. *
  7. * Started by Ingo Molnar, Copyright (C) 2002
  8. *
  9. * Derived from the taskqueue/keventd code by:
  10. *
  11. * David Woodhouse <dwmw2@infradead.org>
  12. * Andrew Morton <andrewm@uow.edu.au>
  13. * Kai Petzke <wpp@marie.physik.tu-berlin.de>
  14. * Theodore Ts'o <tytso@mit.edu>
  15. */
  16. #include <linux/module.h>
  17. #include <linux/kernel.h>
  18. #include <linux/sched.h>
  19. #include <linux/init.h>
  20. #include <linux/signal.h>
  21. #include <linux/completion.h>
  22. #include <linux/workqueue.h>
  23. #include <linux/slab.h>
  24. #include <linux/cpu.h>
  25. #include <linux/notifier.h>
  26. #include <linux/kthread.h>
  27. /*
  28. * The per-CPU workqueue (if single thread, we always use cpu 0's).
  29. *
  30. * The sequence counters are for flush_scheduled_work(). It wants to wait
  31. * until until all currently-scheduled works are completed, but it doesn't
  32. * want to be livelocked by new, incoming ones. So it waits until
  33. * remove_sequence is >= the insert_sequence which pertained when
  34. * flush_scheduled_work() was called.
  35. */
  36. struct cpu_workqueue_struct {
  37. spinlock_t lock;
  38. long remove_sequence; /* Least-recently added (next to run) */
  39. long insert_sequence; /* Next to add */
  40. struct list_head worklist;
  41. wait_queue_head_t more_work;
  42. wait_queue_head_t work_done;
  43. struct workqueue_struct *wq;
  44. task_t *thread;
  45. int run_depth; /* Detect run_workqueue() recursion depth */
  46. } ____cacheline_aligned;
  47. /*
  48. * The externally visible workqueue abstraction is an array of
  49. * per-CPU workqueues:
  50. */
  51. struct workqueue_struct {
  52. struct cpu_workqueue_struct cpu_wq[NR_CPUS];
  53. const char *name;
  54. struct list_head list; /* Empty if single thread */
  55. };
  56. /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
  57. threads to each one as cpus come/go. */
  58. static DEFINE_SPINLOCK(workqueue_lock);
  59. static LIST_HEAD(workqueues);
  60. /* If it's single threaded, it isn't in the list of workqueues. */
  61. static inline int is_single_threaded(struct workqueue_struct *wq)
  62. {
  63. return list_empty(&wq->list);
  64. }
  65. /* Preempt must be disabled. */
  66. static void __queue_work(struct cpu_workqueue_struct *cwq,
  67. struct work_struct *work)
  68. {
  69. unsigned long flags;
  70. spin_lock_irqsave(&cwq->lock, flags);
  71. work->wq_data = cwq;
  72. list_add_tail(&work->entry, &cwq->worklist);
  73. cwq->insert_sequence++;
  74. wake_up(&cwq->more_work);
  75. spin_unlock_irqrestore(&cwq->lock, flags);
  76. }
  77. /*
  78. * Queue work on a workqueue. Return non-zero if it was successfully
  79. * added.
  80. *
  81. * We queue the work to the CPU it was submitted, but there is no
  82. * guarantee that it will be processed by that CPU.
  83. */
  84. int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
  85. {
  86. int ret = 0, cpu = get_cpu();
  87. if (!test_and_set_bit(0, &work->pending)) {
  88. if (unlikely(is_single_threaded(wq)))
  89. cpu = 0;
  90. BUG_ON(!list_empty(&work->entry));
  91. __queue_work(wq->cpu_wq + cpu, work);
  92. ret = 1;
  93. }
  94. put_cpu();
  95. return ret;
  96. }
  97. static void delayed_work_timer_fn(unsigned long __data)
  98. {
  99. struct work_struct *work = (struct work_struct *)__data;
  100. struct workqueue_struct *wq = work->wq_data;
  101. int cpu = smp_processor_id();
  102. if (unlikely(is_single_threaded(wq)))
  103. cpu = 0;
  104. __queue_work(wq->cpu_wq + cpu, work);
  105. }
  106. int fastcall queue_delayed_work(struct workqueue_struct *wq,
  107. struct work_struct *work, unsigned long delay)
  108. {
  109. int ret = 0;
  110. struct timer_list *timer = &work->timer;
  111. if (!test_and_set_bit(0, &work->pending)) {
  112. BUG_ON(timer_pending(timer));
  113. BUG_ON(!list_empty(&work->entry));
  114. /* This stores wq for the moment, for the timer_fn */
  115. work->wq_data = wq;
  116. timer->expires = jiffies + delay;
  117. timer->data = (unsigned long)work;
  118. timer->function = delayed_work_timer_fn;
  119. add_timer(timer);
  120. ret = 1;
  121. }
  122. return ret;
  123. }
  124. static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
  125. {
  126. unsigned long flags;
  127. /*
  128. * Keep taking off work from the queue until
  129. * done.
  130. */
  131. spin_lock_irqsave(&cwq->lock, flags);
  132. cwq->run_depth++;
  133. if (cwq->run_depth > 3) {
  134. /* morton gets to eat his hat */
  135. printk("%s: recursion depth exceeded: %d\n",
  136. __FUNCTION__, cwq->run_depth);
  137. dump_stack();
  138. }
  139. while (!list_empty(&cwq->worklist)) {
  140. struct work_struct *work = list_entry(cwq->worklist.next,
  141. struct work_struct, entry);
  142. void (*f) (void *) = work->func;
  143. void *data = work->data;
  144. list_del_init(cwq->worklist.next);
  145. spin_unlock_irqrestore(&cwq->lock, flags);
  146. BUG_ON(work->wq_data != cwq);
  147. clear_bit(0, &work->pending);
  148. f(data);
  149. spin_lock_irqsave(&cwq->lock, flags);
  150. cwq->remove_sequence++;
  151. wake_up(&cwq->work_done);
  152. }
  153. cwq->run_depth--;
  154. spin_unlock_irqrestore(&cwq->lock, flags);
  155. }
  156. static int worker_thread(void *__cwq)
  157. {
  158. struct cpu_workqueue_struct *cwq = __cwq;
  159. DECLARE_WAITQUEUE(wait, current);
  160. struct k_sigaction sa;
  161. sigset_t blocked;
  162. current->flags |= PF_NOFREEZE;
  163. set_user_nice(current, -5);
  164. /* Block and flush all signals */
  165. sigfillset(&blocked);
  166. sigprocmask(SIG_BLOCK, &blocked, NULL);
  167. flush_signals(current);
  168. /* SIG_IGN makes children autoreap: see do_notify_parent(). */
  169. sa.sa.sa_handler = SIG_IGN;
  170. sa.sa.sa_flags = 0;
  171. siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
  172. do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
  173. set_current_state(TASK_INTERRUPTIBLE);
  174. while (!kthread_should_stop()) {
  175. add_wait_queue(&cwq->more_work, &wait);
  176. if (list_empty(&cwq->worklist))
  177. schedule();
  178. else
  179. __set_current_state(TASK_RUNNING);
  180. remove_wait_queue(&cwq->more_work, &wait);
  181. if (!list_empty(&cwq->worklist))
  182. run_workqueue(cwq);
  183. set_current_state(TASK_INTERRUPTIBLE);
  184. }
  185. __set_current_state(TASK_RUNNING);
  186. return 0;
  187. }
  188. static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
  189. {
  190. if (cwq->thread == current) {
  191. /*
  192. * Probably keventd trying to flush its own queue. So simply run
  193. * it by hand rather than deadlocking.
  194. */
  195. run_workqueue(cwq);
  196. } else {
  197. DEFINE_WAIT(wait);
  198. long sequence_needed;
  199. spin_lock_irq(&cwq->lock);
  200. sequence_needed = cwq->insert_sequence;
  201. while (sequence_needed - cwq->remove_sequence > 0) {
  202. prepare_to_wait(&cwq->work_done, &wait,
  203. TASK_UNINTERRUPTIBLE);
  204. spin_unlock_irq(&cwq->lock);
  205. schedule();
  206. spin_lock_irq(&cwq->lock);
  207. }
  208. finish_wait(&cwq->work_done, &wait);
  209. spin_unlock_irq(&cwq->lock);
  210. }
  211. }
  212. /*
  213. * flush_workqueue - ensure that any scheduled work has run to completion.
  214. *
  215. * Forces execution of the workqueue and blocks until its completion.
  216. * This is typically used in driver shutdown handlers.
  217. *
  218. * This function will sample each workqueue's current insert_sequence number and
  219. * will sleep until the head sequence is greater than or equal to that. This
  220. * means that we sleep until all works which were queued on entry have been
  221. * handled, but we are not livelocked by new incoming ones.
  222. *
  223. * This function used to run the workqueues itself. Now we just wait for the
  224. * helper threads to do it.
  225. */
  226. void fastcall flush_workqueue(struct workqueue_struct *wq)
  227. {
  228. might_sleep();
  229. if (is_single_threaded(wq)) {
  230. /* Always use cpu 0's area. */
  231. flush_cpu_workqueue(wq->cpu_wq + 0);
  232. } else {
  233. int cpu;
  234. lock_cpu_hotplug();
  235. for_each_online_cpu(cpu)
  236. flush_cpu_workqueue(wq->cpu_wq + cpu);
  237. unlock_cpu_hotplug();
  238. }
  239. }
  240. static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
  241. int cpu)
  242. {
  243. struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
  244. struct task_struct *p;
  245. spin_lock_init(&cwq->lock);
  246. cwq->wq = wq;
  247. cwq->thread = NULL;
  248. cwq->insert_sequence = 0;
  249. cwq->remove_sequence = 0;
  250. INIT_LIST_HEAD(&cwq->worklist);
  251. init_waitqueue_head(&cwq->more_work);
  252. init_waitqueue_head(&cwq->work_done);
  253. if (is_single_threaded(wq))
  254. p = kthread_create(worker_thread, cwq, "%s", wq->name);
  255. else
  256. p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
  257. if (IS_ERR(p))
  258. return NULL;
  259. cwq->thread = p;
  260. return p;
  261. }
  262. struct workqueue_struct *__create_workqueue(const char *name,
  263. int singlethread)
  264. {
  265. int cpu, destroy = 0;
  266. struct workqueue_struct *wq;
  267. struct task_struct *p;
  268. BUG_ON(strlen(name) > 10);
  269. wq = kmalloc(sizeof(*wq), GFP_KERNEL);
  270. if (!wq)
  271. return NULL;
  272. memset(wq, 0, sizeof(*wq));
  273. wq->name = name;
  274. /* We don't need the distraction of CPUs appearing and vanishing. */
  275. lock_cpu_hotplug();
  276. if (singlethread) {
  277. INIT_LIST_HEAD(&wq->list);
  278. p = create_workqueue_thread(wq, 0);
  279. if (!p)
  280. destroy = 1;
  281. else
  282. wake_up_process(p);
  283. } else {
  284. spin_lock(&workqueue_lock);
  285. list_add(&wq->list, &workqueues);
  286. spin_unlock(&workqueue_lock);
  287. for_each_online_cpu(cpu) {
  288. p = create_workqueue_thread(wq, cpu);
  289. if (p) {
  290. kthread_bind(p, cpu);
  291. wake_up_process(p);
  292. } else
  293. destroy = 1;
  294. }
  295. }
  296. unlock_cpu_hotplug();
  297. /*
  298. * Was there any error during startup? If yes then clean up:
  299. */
  300. if (destroy) {
  301. destroy_workqueue(wq);
  302. wq = NULL;
  303. }
  304. return wq;
  305. }
  306. static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
  307. {
  308. struct cpu_workqueue_struct *cwq;
  309. unsigned long flags;
  310. struct task_struct *p;
  311. cwq = wq->cpu_wq + cpu;
  312. spin_lock_irqsave(&cwq->lock, flags);
  313. p = cwq->thread;
  314. cwq->thread = NULL;
  315. spin_unlock_irqrestore(&cwq->lock, flags);
  316. if (p)
  317. kthread_stop(p);
  318. }
  319. void destroy_workqueue(struct workqueue_struct *wq)
  320. {
  321. int cpu;
  322. flush_workqueue(wq);
  323. /* We don't need the distraction of CPUs appearing and vanishing. */
  324. lock_cpu_hotplug();
  325. if (is_single_threaded(wq))
  326. cleanup_workqueue_thread(wq, 0);
  327. else {
  328. for_each_online_cpu(cpu)
  329. cleanup_workqueue_thread(wq, cpu);
  330. spin_lock(&workqueue_lock);
  331. list_del(&wq->list);
  332. spin_unlock(&workqueue_lock);
  333. }
  334. unlock_cpu_hotplug();
  335. kfree(wq);
  336. }
  337. static struct workqueue_struct *keventd_wq;
  338. int fastcall schedule_work(struct work_struct *work)
  339. {
  340. return queue_work(keventd_wq, work);
  341. }
  342. int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
  343. {
  344. return queue_delayed_work(keventd_wq, work, delay);
  345. }
  346. int schedule_delayed_work_on(int cpu,
  347. struct work_struct *work, unsigned long delay)
  348. {
  349. int ret = 0;
  350. struct timer_list *timer = &work->timer;
  351. if (!test_and_set_bit(0, &work->pending)) {
  352. BUG_ON(timer_pending(timer));
  353. BUG_ON(!list_empty(&work->entry));
  354. /* This stores keventd_wq for the moment, for the timer_fn */
  355. work->wq_data = keventd_wq;
  356. timer->expires = jiffies + delay;
  357. timer->data = (unsigned long)work;
  358. timer->function = delayed_work_timer_fn;
  359. add_timer_on(timer, cpu);
  360. ret = 1;
  361. }
  362. return ret;
  363. }
  364. void flush_scheduled_work(void)
  365. {
  366. flush_workqueue(keventd_wq);
  367. }
  368. /**
  369. * cancel_rearming_delayed_workqueue - reliably kill off a delayed
  370. * work whose handler rearms the delayed work.
  371. * @wq: the controlling workqueue structure
  372. * @work: the delayed work struct
  373. */
  374. void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
  375. struct work_struct *work)
  376. {
  377. while (!cancel_delayed_work(work))
  378. flush_workqueue(wq);
  379. }
  380. EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
  381. /**
  382. * cancel_rearming_delayed_work - reliably kill off a delayed keventd
  383. * work whose handler rearms the delayed work.
  384. * @work: the delayed work struct
  385. */
  386. void cancel_rearming_delayed_work(struct work_struct *work)
  387. {
  388. cancel_rearming_delayed_workqueue(keventd_wq, work);
  389. }
  390. EXPORT_SYMBOL(cancel_rearming_delayed_work);
  391. int keventd_up(void)
  392. {
  393. return keventd_wq != NULL;
  394. }
  395. int current_is_keventd(void)
  396. {
  397. struct cpu_workqueue_struct *cwq;
  398. int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */
  399. int ret = 0;
  400. BUG_ON(!keventd_wq);
  401. cwq = keventd_wq->cpu_wq + cpu;
  402. if (current == cwq->thread)
  403. ret = 1;
  404. return ret;
  405. }
  406. #ifdef CONFIG_HOTPLUG_CPU
  407. /* Take the work from this (downed) CPU. */
  408. static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
  409. {
  410. struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
  411. LIST_HEAD(list);
  412. struct work_struct *work;
  413. spin_lock_irq(&cwq->lock);
  414. list_splice_init(&cwq->worklist, &list);
  415. while (!list_empty(&list)) {
  416. printk("Taking work for %s\n", wq->name);
  417. work = list_entry(list.next,struct work_struct,entry);
  418. list_del(&work->entry);
  419. __queue_work(wq->cpu_wq + smp_processor_id(), work);
  420. }
  421. spin_unlock_irq(&cwq->lock);
  422. }
  423. /* We're holding the cpucontrol mutex here */
  424. static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
  425. unsigned long action,
  426. void *hcpu)
  427. {
  428. unsigned int hotcpu = (unsigned long)hcpu;
  429. struct workqueue_struct *wq;
  430. switch (action) {
  431. case CPU_UP_PREPARE:
  432. /* Create a new workqueue thread for it. */
  433. list_for_each_entry(wq, &workqueues, list) {
  434. if (create_workqueue_thread(wq, hotcpu) < 0) {
  435. printk("workqueue for %i failed\n", hotcpu);
  436. return NOTIFY_BAD;
  437. }
  438. }
  439. break;
  440. case CPU_ONLINE:
  441. /* Kick off worker threads. */
  442. list_for_each_entry(wq, &workqueues, list) {
  443. kthread_bind(wq->cpu_wq[hotcpu].thread, hotcpu);
  444. wake_up_process(wq->cpu_wq[hotcpu].thread);
  445. }
  446. break;
  447. case CPU_UP_CANCELED:
  448. list_for_each_entry(wq, &workqueues, list) {
  449. /* Unbind so it can run. */
  450. kthread_bind(wq->cpu_wq[hotcpu].thread,
  451. smp_processor_id());
  452. cleanup_workqueue_thread(wq, hotcpu);
  453. }
  454. break;
  455. case CPU_DEAD:
  456. list_for_each_entry(wq, &workqueues, list)
  457. cleanup_workqueue_thread(wq, hotcpu);
  458. list_for_each_entry(wq, &workqueues, list)
  459. take_over_work(wq, hotcpu);
  460. break;
  461. }
  462. return NOTIFY_OK;
  463. }
  464. #endif
  465. void init_workqueues(void)
  466. {
  467. hotcpu_notifier(workqueue_cpu_callback, 0);
  468. keventd_wq = create_workqueue("events");
  469. BUG_ON(!keventd_wq);
  470. }
  471. EXPORT_SYMBOL_GPL(__create_workqueue);
  472. EXPORT_SYMBOL_GPL(queue_work);
  473. EXPORT_SYMBOL_GPL(queue_delayed_work);
  474. EXPORT_SYMBOL_GPL(flush_workqueue);
  475. EXPORT_SYMBOL_GPL(destroy_workqueue);
  476. EXPORT_SYMBOL(schedule_work);
  477. EXPORT_SYMBOL(schedule_delayed_work);
  478. EXPORT_SYMBOL(schedule_delayed_work_on);
  479. EXPORT_SYMBOL(flush_scheduled_work);