workqueue.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813
  1. /*
  2. * linux/kernel/workqueue.c
  3. *
  4. * Generic mechanism for defining kernel helper threads for running
  5. * arbitrary tasks in process context.
  6. *
  7. * Started by Ingo Molnar, Copyright (C) 2002
  8. *
  9. * Derived from the taskqueue/keventd code by:
  10. *
  11. * David Woodhouse <dwmw2@infradead.org>
  12. * Andrew Morton <andrewm@uow.edu.au>
  13. * Kai Petzke <wpp@marie.physik.tu-berlin.de>
  14. * Theodore Ts'o <tytso@mit.edu>
  15. *
  16. * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
  17. */
  18. #include <linux/module.h>
  19. #include <linux/kernel.h>
  20. #include <linux/sched.h>
  21. #include <linux/init.h>
  22. #include <linux/signal.h>
  23. #include <linux/completion.h>
  24. #include <linux/workqueue.h>
  25. #include <linux/slab.h>
  26. #include <linux/cpu.h>
  27. #include <linux/notifier.h>
  28. #include <linux/kthread.h>
  29. #include <linux/hardirq.h>
  30. #include <linux/mempolicy.h>
  31. #include <linux/freezer.h>
  32. #include <linux/kallsyms.h>
  33. #include <linux/debug_locks.h>
  34. /*
  35. * The per-CPU workqueue (if single thread, we always use the first
  36. * possible cpu).
  37. */
  38. struct cpu_workqueue_struct {
  39. spinlock_t lock;
  40. struct list_head worklist;
  41. wait_queue_head_t more_work;
  42. struct workqueue_struct *wq;
  43. struct task_struct *thread;
  44. int run_depth; /* Detect run_workqueue() recursion depth */
  45. int freezeable; /* Freeze the thread during suspend */
  46. } ____cacheline_aligned;
  47. /*
  48. * The externally visible workqueue abstraction is an array of
  49. * per-CPU workqueues:
  50. */
  51. struct workqueue_struct {
  52. struct cpu_workqueue_struct *cpu_wq;
  53. const char *name;
  54. struct list_head list; /* Empty if single thread */
  55. };
  56. /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
  57. threads to each one as cpus come/go. */
  58. static DEFINE_MUTEX(workqueue_mutex);
  59. static LIST_HEAD(workqueues);
  60. static int singlethread_cpu;
  61. /* If it's single threaded, it isn't in the list of workqueues. */
  62. static inline int is_single_threaded(struct workqueue_struct *wq)
  63. {
  64. return list_empty(&wq->list);
  65. }
  66. /*
  67. * Set the workqueue on which a work item is to be run
  68. * - Must *only* be called if the pending flag is set
  69. */
  70. static inline void set_wq_data(struct work_struct *work, void *wq)
  71. {
  72. unsigned long new;
  73. BUG_ON(!work_pending(work));
  74. new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING);
  75. new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
  76. atomic_long_set(&work->data, new);
  77. }
  78. static inline void *get_wq_data(struct work_struct *work)
  79. {
  80. return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
  81. }
  82. static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)
  83. {
  84. int ret = 0;
  85. unsigned long flags;
  86. spin_lock_irqsave(&cwq->lock, flags);
  87. /*
  88. * We need to re-validate the work info after we've gotten
  89. * the cpu_workqueue lock. We can run the work now iff:
  90. *
  91. * - the wq_data still matches the cpu_workqueue_struct
  92. * - AND the work is still marked pending
  93. * - AND the work is still on a list (which will be this
  94. * workqueue_struct list)
  95. *
  96. * All these conditions are important, because we
  97. * need to protect against the work being run right
  98. * now on another CPU (all but the last one might be
  99. * true if it's currently running and has not been
  100. * released yet, for example).
  101. */
  102. if (get_wq_data(work) == cwq
  103. && work_pending(work)
  104. && !list_empty(&work->entry)) {
  105. work_func_t f = work->func;
  106. list_del_init(&work->entry);
  107. spin_unlock_irqrestore(&cwq->lock, flags);
  108. if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
  109. work_release(work);
  110. f(work);
  111. spin_lock_irqsave(&cwq->lock, flags);
  112. ret = 1;
  113. }
  114. spin_unlock_irqrestore(&cwq->lock, flags);
  115. return ret;
  116. }
  117. /**
  118. * run_scheduled_work - run scheduled work synchronously
  119. * @work: work to run
  120. *
  121. * This checks if the work was pending, and runs it
  122. * synchronously if so. It returns a boolean to indicate
  123. * whether it had any scheduled work to run or not.
  124. *
  125. * NOTE! This _only_ works for normal work_structs. You
  126. * CANNOT use this for delayed work, because the wq data
  127. * for delayed work will not point properly to the per-
  128. * CPU workqueue struct, but will change!
  129. */
  130. int fastcall run_scheduled_work(struct work_struct *work)
  131. {
  132. for (;;) {
  133. struct cpu_workqueue_struct *cwq;
  134. if (!work_pending(work))
  135. return 0;
  136. if (list_empty(&work->entry))
  137. return 0;
  138. /* NOTE! This depends intimately on __queue_work! */
  139. cwq = get_wq_data(work);
  140. if (!cwq)
  141. return 0;
  142. if (__run_work(cwq, work))
  143. return 1;
  144. }
  145. }
  146. EXPORT_SYMBOL(run_scheduled_work);
  147. /* Preempt must be disabled. */
  148. static void __queue_work(struct cpu_workqueue_struct *cwq,
  149. struct work_struct *work)
  150. {
  151. unsigned long flags;
  152. spin_lock_irqsave(&cwq->lock, flags);
  153. set_wq_data(work, cwq);
  154. list_add_tail(&work->entry, &cwq->worklist);
  155. wake_up(&cwq->more_work);
  156. spin_unlock_irqrestore(&cwq->lock, flags);
  157. }
  158. /**
  159. * queue_work - queue work on a workqueue
  160. * @wq: workqueue to use
  161. * @work: work to queue
  162. *
  163. * Returns 0 if @work was already on a queue, non-zero otherwise.
  164. *
  165. * We queue the work to the CPU it was submitted, but there is no
  166. * guarantee that it will be processed by that CPU.
  167. */
  168. int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
  169. {
  170. int ret = 0, cpu = get_cpu();
  171. if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  172. if (unlikely(is_single_threaded(wq)))
  173. cpu = singlethread_cpu;
  174. BUG_ON(!list_empty(&work->entry));
  175. __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
  176. ret = 1;
  177. }
  178. put_cpu();
  179. return ret;
  180. }
  181. EXPORT_SYMBOL_GPL(queue_work);
  182. void delayed_work_timer_fn(unsigned long __data)
  183. {
  184. struct delayed_work *dwork = (struct delayed_work *)__data;
  185. struct workqueue_struct *wq = get_wq_data(&dwork->work);
  186. int cpu = smp_processor_id();
  187. if (unlikely(is_single_threaded(wq)))
  188. cpu = singlethread_cpu;
  189. __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work);
  190. }
  191. /**
  192. * queue_delayed_work - queue work on a workqueue after delay
  193. * @wq: workqueue to use
  194. * @dwork: delayable work to queue
  195. * @delay: number of jiffies to wait before queueing
  196. *
  197. * Returns 0 if @work was already on a queue, non-zero otherwise.
  198. */
  199. int fastcall queue_delayed_work(struct workqueue_struct *wq,
  200. struct delayed_work *dwork, unsigned long delay)
  201. {
  202. int ret = 0;
  203. struct timer_list *timer = &dwork->timer;
  204. struct work_struct *work = &dwork->work;
  205. timer_stats_timer_set_start_info(timer);
  206. if (delay == 0)
  207. return queue_work(wq, work);
  208. if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  209. BUG_ON(timer_pending(timer));
  210. BUG_ON(!list_empty(&work->entry));
  211. /* This stores wq for the moment, for the timer_fn */
  212. set_wq_data(work, wq);
  213. timer->expires = jiffies + delay;
  214. timer->data = (unsigned long)dwork;
  215. timer->function = delayed_work_timer_fn;
  216. add_timer(timer);
  217. ret = 1;
  218. }
  219. return ret;
  220. }
  221. EXPORT_SYMBOL_GPL(queue_delayed_work);
  222. /**
  223. * queue_delayed_work_on - queue work on specific CPU after delay
  224. * @cpu: CPU number to execute work on
  225. * @wq: workqueue to use
  226. * @dwork: work to queue
  227. * @delay: number of jiffies to wait before queueing
  228. *
  229. * Returns 0 if @work was already on a queue, non-zero otherwise.
  230. */
  231. int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
  232. struct delayed_work *dwork, unsigned long delay)
  233. {
  234. int ret = 0;
  235. struct timer_list *timer = &dwork->timer;
  236. struct work_struct *work = &dwork->work;
  237. if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  238. BUG_ON(timer_pending(timer));
  239. BUG_ON(!list_empty(&work->entry));
  240. /* This stores wq for the moment, for the timer_fn */
  241. set_wq_data(work, wq);
  242. timer->expires = jiffies + delay;
  243. timer->data = (unsigned long)dwork;
  244. timer->function = delayed_work_timer_fn;
  245. add_timer_on(timer, cpu);
  246. ret = 1;
  247. }
  248. return ret;
  249. }
  250. EXPORT_SYMBOL_GPL(queue_delayed_work_on);
  251. static void run_workqueue(struct cpu_workqueue_struct *cwq)
  252. {
  253. unsigned long flags;
  254. /*
  255. * Keep taking off work from the queue until
  256. * done.
  257. */
  258. spin_lock_irqsave(&cwq->lock, flags);
  259. cwq->run_depth++;
  260. if (cwq->run_depth > 3) {
  261. /* morton gets to eat his hat */
  262. printk("%s: recursion depth exceeded: %d\n",
  263. __FUNCTION__, cwq->run_depth);
  264. dump_stack();
  265. }
  266. while (!list_empty(&cwq->worklist)) {
  267. struct work_struct *work = list_entry(cwq->worklist.next,
  268. struct work_struct, entry);
  269. work_func_t f = work->func;
  270. list_del_init(cwq->worklist.next);
  271. spin_unlock_irqrestore(&cwq->lock, flags);
  272. BUG_ON(get_wq_data(work) != cwq);
  273. if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
  274. work_release(work);
  275. f(work);
  276. if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
  277. printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
  278. "%s/0x%08x/%d\n",
  279. current->comm, preempt_count(),
  280. current->pid);
  281. printk(KERN_ERR " last function: ");
  282. print_symbol("%s\n", (unsigned long)f);
  283. debug_show_held_locks(current);
  284. dump_stack();
  285. }
  286. spin_lock_irqsave(&cwq->lock, flags);
  287. }
  288. cwq->run_depth--;
  289. spin_unlock_irqrestore(&cwq->lock, flags);
  290. }
  291. static int worker_thread(void *__cwq)
  292. {
  293. struct cpu_workqueue_struct *cwq = __cwq;
  294. DECLARE_WAITQUEUE(wait, current);
  295. struct k_sigaction sa;
  296. sigset_t blocked;
  297. if (!cwq->freezeable)
  298. current->flags |= PF_NOFREEZE;
  299. set_user_nice(current, -5);
  300. /* Block and flush all signals */
  301. sigfillset(&blocked);
  302. sigprocmask(SIG_BLOCK, &blocked, NULL);
  303. flush_signals(current);
  304. /*
  305. * We inherited MPOL_INTERLEAVE from the booting kernel.
  306. * Set MPOL_DEFAULT to insure node local allocations.
  307. */
  308. numa_default_policy();
  309. /* SIG_IGN makes children autoreap: see do_notify_parent(). */
  310. sa.sa.sa_handler = SIG_IGN;
  311. sa.sa.sa_flags = 0;
  312. siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
  313. do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
  314. set_current_state(TASK_INTERRUPTIBLE);
  315. while (!kthread_should_stop()) {
  316. if (cwq->freezeable)
  317. try_to_freeze();
  318. add_wait_queue(&cwq->more_work, &wait);
  319. if (list_empty(&cwq->worklist))
  320. schedule();
  321. else
  322. __set_current_state(TASK_RUNNING);
  323. remove_wait_queue(&cwq->more_work, &wait);
  324. if (!list_empty(&cwq->worklist))
  325. run_workqueue(cwq);
  326. set_current_state(TASK_INTERRUPTIBLE);
  327. }
  328. __set_current_state(TASK_RUNNING);
  329. return 0;
  330. }
  331. struct wq_barrier {
  332. struct work_struct work;
  333. struct completion done;
  334. };
  335. static void wq_barrier_func(struct work_struct *work)
  336. {
  337. struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
  338. complete(&barr->done);
  339. }
  340. static inline void init_wq_barrier(struct wq_barrier *barr)
  341. {
  342. INIT_WORK(&barr->work, wq_barrier_func);
  343. __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
  344. init_completion(&barr->done);
  345. }
  346. static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
  347. {
  348. if (cwq->thread == current) {
  349. /*
  350. * Probably keventd trying to flush its own queue. So simply run
  351. * it by hand rather than deadlocking.
  352. */
  353. mutex_unlock(&workqueue_mutex);
  354. run_workqueue(cwq);
  355. mutex_lock(&workqueue_mutex);
  356. } else {
  357. struct wq_barrier barr;
  358. init_wq_barrier(&barr);
  359. __queue_work(cwq, &barr.work);
  360. mutex_unlock(&workqueue_mutex);
  361. wait_for_completion(&barr.done);
  362. mutex_lock(&workqueue_mutex);
  363. }
  364. }
  365. /**
  366. * flush_workqueue - ensure that any scheduled work has run to completion.
  367. * @wq: workqueue to flush
  368. *
  369. * Forces execution of the workqueue and blocks until its completion.
  370. * This is typically used in driver shutdown handlers.
  371. *
  372. * We sleep until all works which were queued on entry have been handled,
  373. * but we are not livelocked by new incoming ones.
  374. *
  375. * This function used to run the workqueues itself. Now we just wait for the
  376. * helper threads to do it.
  377. */
  378. void fastcall flush_workqueue(struct workqueue_struct *wq)
  379. {
  380. mutex_lock(&workqueue_mutex);
  381. if (is_single_threaded(wq)) {
  382. /* Always use first cpu's area. */
  383. flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
  384. } else {
  385. int cpu;
  386. for_each_online_cpu(cpu)
  387. flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
  388. }
  389. mutex_unlock(&workqueue_mutex);
  390. }
  391. EXPORT_SYMBOL_GPL(flush_workqueue);
  392. static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
  393. int cpu, int freezeable)
  394. {
  395. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  396. struct task_struct *p;
  397. spin_lock_init(&cwq->lock);
  398. cwq->wq = wq;
  399. cwq->thread = NULL;
  400. cwq->freezeable = freezeable;
  401. INIT_LIST_HEAD(&cwq->worklist);
  402. init_waitqueue_head(&cwq->more_work);
  403. if (is_single_threaded(wq))
  404. p = kthread_create(worker_thread, cwq, "%s", wq->name);
  405. else
  406. p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
  407. if (IS_ERR(p))
  408. return NULL;
  409. cwq->thread = p;
  410. return p;
  411. }
  412. struct workqueue_struct *__create_workqueue(const char *name,
  413. int singlethread, int freezeable)
  414. {
  415. int cpu, destroy = 0;
  416. struct workqueue_struct *wq;
  417. struct task_struct *p;
  418. wq = kzalloc(sizeof(*wq), GFP_KERNEL);
  419. if (!wq)
  420. return NULL;
  421. wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
  422. if (!wq->cpu_wq) {
  423. kfree(wq);
  424. return NULL;
  425. }
  426. wq->name = name;
  427. mutex_lock(&workqueue_mutex);
  428. if (singlethread) {
  429. INIT_LIST_HEAD(&wq->list);
  430. p = create_workqueue_thread(wq, singlethread_cpu, freezeable);
  431. if (!p)
  432. destroy = 1;
  433. else
  434. wake_up_process(p);
  435. } else {
  436. list_add(&wq->list, &workqueues);
  437. for_each_online_cpu(cpu) {
  438. p = create_workqueue_thread(wq, cpu, freezeable);
  439. if (p) {
  440. kthread_bind(p, cpu);
  441. wake_up_process(p);
  442. } else
  443. destroy = 1;
  444. }
  445. }
  446. mutex_unlock(&workqueue_mutex);
  447. /*
  448. * Was there any error during startup? If yes then clean up:
  449. */
  450. if (destroy) {
  451. destroy_workqueue(wq);
  452. wq = NULL;
  453. }
  454. return wq;
  455. }
  456. EXPORT_SYMBOL_GPL(__create_workqueue);
  457. static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
  458. {
  459. struct cpu_workqueue_struct *cwq;
  460. unsigned long flags;
  461. struct task_struct *p;
  462. cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  463. spin_lock_irqsave(&cwq->lock, flags);
  464. p = cwq->thread;
  465. cwq->thread = NULL;
  466. spin_unlock_irqrestore(&cwq->lock, flags);
  467. if (p)
  468. kthread_stop(p);
  469. }
  470. /**
  471. * destroy_workqueue - safely terminate a workqueue
  472. * @wq: target workqueue
  473. *
  474. * Safely destroy a workqueue. All work currently pending will be done first.
  475. */
  476. void destroy_workqueue(struct workqueue_struct *wq)
  477. {
  478. int cpu;
  479. flush_workqueue(wq);
  480. /* We don't need the distraction of CPUs appearing and vanishing. */
  481. mutex_lock(&workqueue_mutex);
  482. if (is_single_threaded(wq))
  483. cleanup_workqueue_thread(wq, singlethread_cpu);
  484. else {
  485. for_each_online_cpu(cpu)
  486. cleanup_workqueue_thread(wq, cpu);
  487. list_del(&wq->list);
  488. }
  489. mutex_unlock(&workqueue_mutex);
  490. free_percpu(wq->cpu_wq);
  491. kfree(wq);
  492. }
  493. EXPORT_SYMBOL_GPL(destroy_workqueue);
  494. static struct workqueue_struct *keventd_wq;
  495. /**
  496. * schedule_work - put work task in global workqueue
  497. * @work: job to be done
  498. *
  499. * This puts a job in the kernel-global workqueue.
  500. */
  501. int fastcall schedule_work(struct work_struct *work)
  502. {
  503. return queue_work(keventd_wq, work);
  504. }
  505. EXPORT_SYMBOL(schedule_work);
  506. /**
  507. * schedule_delayed_work - put work task in global workqueue after delay
  508. * @dwork: job to be done
  509. * @delay: number of jiffies to wait or 0 for immediate execution
  510. *
  511. * After waiting for a given time this puts a job in the kernel-global
  512. * workqueue.
  513. */
  514. int fastcall schedule_delayed_work(struct delayed_work *dwork,
  515. unsigned long delay)
  516. {
  517. timer_stats_timer_set_start_info(&dwork->timer);
  518. return queue_delayed_work(keventd_wq, dwork, delay);
  519. }
  520. EXPORT_SYMBOL(schedule_delayed_work);
  521. /**
  522. * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
  523. * @cpu: cpu to use
  524. * @dwork: job to be done
  525. * @delay: number of jiffies to wait
  526. *
  527. * After waiting for a given time this puts a job in the kernel-global
  528. * workqueue on the specified CPU.
  529. */
  530. int schedule_delayed_work_on(int cpu,
  531. struct delayed_work *dwork, unsigned long delay)
  532. {
  533. return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
  534. }
  535. EXPORT_SYMBOL(schedule_delayed_work_on);
  536. /**
  537. * schedule_on_each_cpu - call a function on each online CPU from keventd
  538. * @func: the function to call
  539. *
  540. * Returns zero on success.
  541. * Returns -ve errno on failure.
  542. *
  543. * Appears to be racy against CPU hotplug.
  544. *
  545. * schedule_on_each_cpu() is very slow.
  546. */
  547. int schedule_on_each_cpu(work_func_t func)
  548. {
  549. int cpu;
  550. struct work_struct *works;
  551. works = alloc_percpu(struct work_struct);
  552. if (!works)
  553. return -ENOMEM;
  554. preempt_disable(); /* CPU hotplug */
  555. for_each_online_cpu(cpu) {
  556. struct work_struct *work = per_cpu_ptr(works, cpu);
  557. INIT_WORK(work, func);
  558. set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
  559. __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
  560. }
  561. preempt_enable();
  562. flush_workqueue(keventd_wq);
  563. free_percpu(works);
  564. return 0;
  565. }
  566. void flush_scheduled_work(void)
  567. {
  568. flush_workqueue(keventd_wq);
  569. }
  570. EXPORT_SYMBOL(flush_scheduled_work);
  571. /**
  572. * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work.
  573. * @wq: the controlling workqueue structure
  574. * @dwork: the delayed work struct
  575. */
  576. void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
  577. struct delayed_work *dwork)
  578. {
  579. while (!cancel_delayed_work(dwork))
  580. flush_workqueue(wq);
  581. }
  582. EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
  583. /**
  584. * cancel_rearming_delayed_work - reliably kill off a delayed keventd work whose handler rearms the delayed work.
  585. * @dwork: the delayed work struct
  586. */
  587. void cancel_rearming_delayed_work(struct delayed_work *dwork)
  588. {
  589. cancel_rearming_delayed_workqueue(keventd_wq, dwork);
  590. }
  591. EXPORT_SYMBOL(cancel_rearming_delayed_work);
  592. /**
  593. * execute_in_process_context - reliably execute the routine with user context
  594. * @fn: the function to execute
  595. * @ew: guaranteed storage for the execute work structure (must
  596. * be available when the work executes)
  597. *
  598. * Executes the function immediately if process context is available,
  599. * otherwise schedules the function for delayed execution.
  600. *
  601. * Returns: 0 - function was executed
  602. * 1 - function was scheduled for execution
  603. */
  604. int execute_in_process_context(work_func_t fn, struct execute_work *ew)
  605. {
  606. if (!in_interrupt()) {
  607. fn(&ew->work);
  608. return 0;
  609. }
  610. INIT_WORK(&ew->work, fn);
  611. schedule_work(&ew->work);
  612. return 1;
  613. }
  614. EXPORT_SYMBOL_GPL(execute_in_process_context);
  615. int keventd_up(void)
  616. {
  617. return keventd_wq != NULL;
  618. }
  619. int current_is_keventd(void)
  620. {
  621. struct cpu_workqueue_struct *cwq;
  622. int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */
  623. int ret = 0;
  624. BUG_ON(!keventd_wq);
  625. cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
  626. if (current == cwq->thread)
  627. ret = 1;
  628. return ret;
  629. }
  630. /* Take the work from this (downed) CPU. */
  631. static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
  632. {
  633. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  634. struct list_head list;
  635. struct work_struct *work;
  636. spin_lock_irq(&cwq->lock);
  637. list_replace_init(&cwq->worklist, &list);
  638. while (!list_empty(&list)) {
  639. printk("Taking work for %s\n", wq->name);
  640. work = list_entry(list.next,struct work_struct,entry);
  641. list_del(&work->entry);
  642. __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
  643. }
  644. spin_unlock_irq(&cwq->lock);
  645. }
  646. /* We're holding the cpucontrol mutex here */
  647. static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
  648. unsigned long action,
  649. void *hcpu)
  650. {
  651. unsigned int hotcpu = (unsigned long)hcpu;
  652. struct workqueue_struct *wq;
  653. switch (action) {
  654. case CPU_UP_PREPARE:
  655. mutex_lock(&workqueue_mutex);
  656. /* Create a new workqueue thread for it. */
  657. list_for_each_entry(wq, &workqueues, list) {
  658. if (!create_workqueue_thread(wq, hotcpu, 0)) {
  659. printk("workqueue for %i failed\n", hotcpu);
  660. return NOTIFY_BAD;
  661. }
  662. }
  663. break;
  664. case CPU_ONLINE:
  665. /* Kick off worker threads. */
  666. list_for_each_entry(wq, &workqueues, list) {
  667. struct cpu_workqueue_struct *cwq;
  668. cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
  669. kthread_bind(cwq->thread, hotcpu);
  670. wake_up_process(cwq->thread);
  671. }
  672. mutex_unlock(&workqueue_mutex);
  673. break;
  674. case CPU_UP_CANCELED:
  675. list_for_each_entry(wq, &workqueues, list) {
  676. if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread)
  677. continue;
  678. /* Unbind so it can run. */
  679. kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
  680. any_online_cpu(cpu_online_map));
  681. cleanup_workqueue_thread(wq, hotcpu);
  682. }
  683. mutex_unlock(&workqueue_mutex);
  684. break;
  685. case CPU_DOWN_PREPARE:
  686. mutex_lock(&workqueue_mutex);
  687. break;
  688. case CPU_DOWN_FAILED:
  689. mutex_unlock(&workqueue_mutex);
  690. break;
  691. case CPU_DEAD:
  692. list_for_each_entry(wq, &workqueues, list)
  693. cleanup_workqueue_thread(wq, hotcpu);
  694. list_for_each_entry(wq, &workqueues, list)
  695. take_over_work(wq, hotcpu);
  696. mutex_unlock(&workqueue_mutex);
  697. break;
  698. }
  699. return NOTIFY_OK;
  700. }
  701. void init_workqueues(void)
  702. {
  703. singlethread_cpu = first_cpu(cpu_possible_map);
  704. hotcpu_notifier(workqueue_cpu_callback, 0);
  705. keventd_wq = create_workqueue("events");
  706. BUG_ON(!keventd_wq);
  707. }