workqueue.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916
  1. /*
  2. * linux/kernel/workqueue.c
  3. *
  4. * Generic mechanism for defining kernel helper threads for running
  5. * arbitrary tasks in process context.
  6. *
  7. * Started by Ingo Molnar, Copyright (C) 2002
  8. *
  9. * Derived from the taskqueue/keventd code by:
  10. *
  11. * David Woodhouse <dwmw2@infradead.org>
  12. * Andrew Morton <andrewm@uow.edu.au>
  13. * Kai Petzke <wpp@marie.physik.tu-berlin.de>
  14. * Theodore Ts'o <tytso@mit.edu>
  15. *
  16. * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
  17. */
  18. #include <linux/module.h>
  19. #include <linux/kernel.h>
  20. #include <linux/sched.h>
  21. #include <linux/init.h>
  22. #include <linux/signal.h>
  23. #include <linux/completion.h>
  24. #include <linux/workqueue.h>
  25. #include <linux/slab.h>
  26. #include <linux/cpu.h>
  27. #include <linux/notifier.h>
  28. #include <linux/kthread.h>
  29. #include <linux/hardirq.h>
  30. #include <linux/mempolicy.h>
  31. #include <linux/freezer.h>
  32. #include <linux/kallsyms.h>
  33. #include <linux/debug_locks.h>
  34. /*
  35. * The per-CPU workqueue (if single thread, we always use the first
  36. * possible cpu).
  37. */
  38. struct cpu_workqueue_struct {
  39. spinlock_t lock;
  40. struct list_head worklist;
  41. wait_queue_head_t more_work;
  42. struct workqueue_struct *wq;
  43. struct task_struct *thread;
  44. struct work_struct *current_work;
  45. int run_depth; /* Detect run_workqueue() recursion depth */
  46. } ____cacheline_aligned;
  47. /*
  48. * The externally visible workqueue abstraction is an array of
  49. * per-CPU workqueues:
  50. */
  51. struct workqueue_struct {
  52. struct cpu_workqueue_struct *cpu_wq;
  53. const char *name;
  54. struct list_head list; /* Empty if single thread */
  55. int freezeable; /* Freeze threads during suspend */
  56. };
  57. /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
  58. threads to each one as cpus come/go. */
  59. static long migrate_sequence __read_mostly;
  60. static DEFINE_MUTEX(workqueue_mutex);
  61. static LIST_HEAD(workqueues);
  62. static int singlethread_cpu;
  63. /* If it's single threaded, it isn't in the list of workqueues. */
  64. static inline int is_single_threaded(struct workqueue_struct *wq)
  65. {
  66. return list_empty(&wq->list);
  67. }
  68. /*
  69. * Set the workqueue on which a work item is to be run
  70. * - Must *only* be called if the pending flag is set
  71. */
  72. static inline void set_wq_data(struct work_struct *work, void *wq)
  73. {
  74. unsigned long new;
  75. BUG_ON(!work_pending(work));
  76. new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING);
  77. new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
  78. atomic_long_set(&work->data, new);
  79. }
  80. static inline void *get_wq_data(struct work_struct *work)
  81. {
  82. return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
  83. }
  84. static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)
  85. {
  86. int ret = 0;
  87. unsigned long flags;
  88. spin_lock_irqsave(&cwq->lock, flags);
  89. /*
  90. * We need to re-validate the work info after we've gotten
  91. * the cpu_workqueue lock. We can run the work now iff:
  92. *
  93. * - the wq_data still matches the cpu_workqueue_struct
  94. * - AND the work is still marked pending
  95. * - AND the work is still on a list (which will be this
  96. * workqueue_struct list)
  97. *
  98. * All these conditions are important, because we
  99. * need to protect against the work being run right
  100. * now on another CPU (all but the last one might be
  101. * true if it's currently running and has not been
  102. * released yet, for example).
  103. */
  104. if (get_wq_data(work) == cwq
  105. && work_pending(work)
  106. && !list_empty(&work->entry)) {
  107. work_func_t f = work->func;
  108. cwq->current_work = work;
  109. list_del_init(&work->entry);
  110. spin_unlock_irqrestore(&cwq->lock, flags);
  111. if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
  112. work_release(work);
  113. f(work);
  114. spin_lock_irqsave(&cwq->lock, flags);
  115. cwq->current_work = NULL;
  116. ret = 1;
  117. }
  118. spin_unlock_irqrestore(&cwq->lock, flags);
  119. return ret;
  120. }
  121. /**
  122. * run_scheduled_work - run scheduled work synchronously
  123. * @work: work to run
  124. *
  125. * This checks if the work was pending, and runs it
  126. * synchronously if so. It returns a boolean to indicate
  127. * whether it had any scheduled work to run or not.
  128. *
  129. * NOTE! This _only_ works for normal work_structs. You
  130. * CANNOT use this for delayed work, because the wq data
  131. * for delayed work will not point properly to the per-
  132. * CPU workqueue struct, but will change!
  133. */
  134. int fastcall run_scheduled_work(struct work_struct *work)
  135. {
  136. for (;;) {
  137. struct cpu_workqueue_struct *cwq;
  138. if (!work_pending(work))
  139. return 0;
  140. if (list_empty(&work->entry))
  141. return 0;
  142. /* NOTE! This depends intimately on __queue_work! */
  143. cwq = get_wq_data(work);
  144. if (!cwq)
  145. return 0;
  146. if (__run_work(cwq, work))
  147. return 1;
  148. }
  149. }
  150. EXPORT_SYMBOL(run_scheduled_work);
  151. static void insert_work(struct cpu_workqueue_struct *cwq,
  152. struct work_struct *work, int tail)
  153. {
  154. set_wq_data(work, cwq);
  155. if (tail)
  156. list_add_tail(&work->entry, &cwq->worklist);
  157. else
  158. list_add(&work->entry, &cwq->worklist);
  159. wake_up(&cwq->more_work);
  160. }
  161. /* Preempt must be disabled. */
  162. static void __queue_work(struct cpu_workqueue_struct *cwq,
  163. struct work_struct *work)
  164. {
  165. unsigned long flags;
  166. spin_lock_irqsave(&cwq->lock, flags);
  167. insert_work(cwq, work, 1);
  168. spin_unlock_irqrestore(&cwq->lock, flags);
  169. }
  170. /**
  171. * queue_work - queue work on a workqueue
  172. * @wq: workqueue to use
  173. * @work: work to queue
  174. *
  175. * Returns 0 if @work was already on a queue, non-zero otherwise.
  176. *
  177. * We queue the work to the CPU it was submitted, but there is no
  178. * guarantee that it will be processed by that CPU.
  179. */
  180. int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
  181. {
  182. int ret = 0, cpu = get_cpu();
  183. if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  184. if (unlikely(is_single_threaded(wq)))
  185. cpu = singlethread_cpu;
  186. BUG_ON(!list_empty(&work->entry));
  187. __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
  188. ret = 1;
  189. }
  190. put_cpu();
  191. return ret;
  192. }
  193. EXPORT_SYMBOL_GPL(queue_work);
  194. void delayed_work_timer_fn(unsigned long __data)
  195. {
  196. struct delayed_work *dwork = (struct delayed_work *)__data;
  197. struct workqueue_struct *wq = get_wq_data(&dwork->work);
  198. int cpu = smp_processor_id();
  199. if (unlikely(is_single_threaded(wq)))
  200. cpu = singlethread_cpu;
  201. __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work);
  202. }
  203. /**
  204. * queue_delayed_work - queue work on a workqueue after delay
  205. * @wq: workqueue to use
  206. * @dwork: delayable work to queue
  207. * @delay: number of jiffies to wait before queueing
  208. *
  209. * Returns 0 if @work was already on a queue, non-zero otherwise.
  210. */
  211. int fastcall queue_delayed_work(struct workqueue_struct *wq,
  212. struct delayed_work *dwork, unsigned long delay)
  213. {
  214. int ret = 0;
  215. struct timer_list *timer = &dwork->timer;
  216. struct work_struct *work = &dwork->work;
  217. timer_stats_timer_set_start_info(timer);
  218. if (delay == 0)
  219. return queue_work(wq, work);
  220. if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  221. BUG_ON(timer_pending(timer));
  222. BUG_ON(!list_empty(&work->entry));
  223. /* This stores wq for the moment, for the timer_fn */
  224. set_wq_data(work, wq);
  225. timer->expires = jiffies + delay;
  226. timer->data = (unsigned long)dwork;
  227. timer->function = delayed_work_timer_fn;
  228. add_timer(timer);
  229. ret = 1;
  230. }
  231. return ret;
  232. }
  233. EXPORT_SYMBOL_GPL(queue_delayed_work);
  234. /**
  235. * queue_delayed_work_on - queue work on specific CPU after delay
  236. * @cpu: CPU number to execute work on
  237. * @wq: workqueue to use
  238. * @dwork: work to queue
  239. * @delay: number of jiffies to wait before queueing
  240. *
  241. * Returns 0 if @work was already on a queue, non-zero otherwise.
  242. */
  243. int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
  244. struct delayed_work *dwork, unsigned long delay)
  245. {
  246. int ret = 0;
  247. struct timer_list *timer = &dwork->timer;
  248. struct work_struct *work = &dwork->work;
  249. if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
  250. BUG_ON(timer_pending(timer));
  251. BUG_ON(!list_empty(&work->entry));
  252. /* This stores wq for the moment, for the timer_fn */
  253. set_wq_data(work, wq);
  254. timer->expires = jiffies + delay;
  255. timer->data = (unsigned long)dwork;
  256. timer->function = delayed_work_timer_fn;
  257. add_timer_on(timer, cpu);
  258. ret = 1;
  259. }
  260. return ret;
  261. }
  262. EXPORT_SYMBOL_GPL(queue_delayed_work_on);
  263. static void run_workqueue(struct cpu_workqueue_struct *cwq)
  264. {
  265. unsigned long flags;
  266. /*
  267. * Keep taking off work from the queue until
  268. * done.
  269. */
  270. spin_lock_irqsave(&cwq->lock, flags);
  271. cwq->run_depth++;
  272. if (cwq->run_depth > 3) {
  273. /* morton gets to eat his hat */
  274. printk("%s: recursion depth exceeded: %d\n",
  275. __FUNCTION__, cwq->run_depth);
  276. dump_stack();
  277. }
  278. while (!list_empty(&cwq->worklist)) {
  279. struct work_struct *work = list_entry(cwq->worklist.next,
  280. struct work_struct, entry);
  281. work_func_t f = work->func;
  282. cwq->current_work = work;
  283. list_del_init(cwq->worklist.next);
  284. spin_unlock_irqrestore(&cwq->lock, flags);
  285. BUG_ON(get_wq_data(work) != cwq);
  286. if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
  287. work_release(work);
  288. f(work);
  289. if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
  290. printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
  291. "%s/0x%08x/%d\n",
  292. current->comm, preempt_count(),
  293. current->pid);
  294. printk(KERN_ERR " last function: ");
  295. print_symbol("%s\n", (unsigned long)f);
  296. debug_show_held_locks(current);
  297. dump_stack();
  298. }
  299. spin_lock_irqsave(&cwq->lock, flags);
  300. cwq->current_work = NULL;
  301. }
  302. cwq->run_depth--;
  303. spin_unlock_irqrestore(&cwq->lock, flags);
  304. }
  305. static int worker_thread(void *__cwq)
  306. {
  307. struct cpu_workqueue_struct *cwq = __cwq;
  308. DECLARE_WAITQUEUE(wait, current);
  309. struct k_sigaction sa;
  310. sigset_t blocked;
  311. if (!cwq->wq->freezeable)
  312. current->flags |= PF_NOFREEZE;
  313. set_user_nice(current, -5);
  314. /* Block and flush all signals */
  315. sigfillset(&blocked);
  316. sigprocmask(SIG_BLOCK, &blocked, NULL);
  317. flush_signals(current);
  318. /*
  319. * We inherited MPOL_INTERLEAVE from the booting kernel.
  320. * Set MPOL_DEFAULT to insure node local allocations.
  321. */
  322. numa_default_policy();
  323. /* SIG_IGN makes children autoreap: see do_notify_parent(). */
  324. sa.sa.sa_handler = SIG_IGN;
  325. sa.sa.sa_flags = 0;
  326. siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
  327. do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
  328. set_current_state(TASK_INTERRUPTIBLE);
  329. while (!kthread_should_stop()) {
  330. if (cwq->wq->freezeable)
  331. try_to_freeze();
  332. add_wait_queue(&cwq->more_work, &wait);
  333. if (list_empty(&cwq->worklist))
  334. schedule();
  335. else
  336. __set_current_state(TASK_RUNNING);
  337. remove_wait_queue(&cwq->more_work, &wait);
  338. if (!list_empty(&cwq->worklist))
  339. run_workqueue(cwq);
  340. set_current_state(TASK_INTERRUPTIBLE);
  341. }
  342. __set_current_state(TASK_RUNNING);
  343. return 0;
  344. }
  345. struct wq_barrier {
  346. struct work_struct work;
  347. struct completion done;
  348. };
  349. static void wq_barrier_func(struct work_struct *work)
  350. {
  351. struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
  352. complete(&barr->done);
  353. }
  354. static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
  355. struct wq_barrier *barr, int tail)
  356. {
  357. INIT_WORK(&barr->work, wq_barrier_func);
  358. __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
  359. init_completion(&barr->done);
  360. insert_work(cwq, &barr->work, tail);
  361. }
  362. static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
  363. {
  364. if (cwq->thread == current) {
  365. /*
  366. * Probably keventd trying to flush its own queue. So simply run
  367. * it by hand rather than deadlocking.
  368. */
  369. run_workqueue(cwq);
  370. } else {
  371. struct wq_barrier barr;
  372. int active = 0;
  373. spin_lock_irq(&cwq->lock);
  374. if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
  375. insert_wq_barrier(cwq, &barr, 1);
  376. active = 1;
  377. }
  378. spin_unlock_irq(&cwq->lock);
  379. if (active)
  380. wait_for_completion(&barr.done);
  381. }
  382. }
  383. /**
  384. * flush_workqueue - ensure that any scheduled work has run to completion.
  385. * @wq: workqueue to flush
  386. *
  387. * Forces execution of the workqueue and blocks until its completion.
  388. * This is typically used in driver shutdown handlers.
  389. *
  390. * We sleep until all works which were queued on entry have been handled,
  391. * but we are not livelocked by new incoming ones.
  392. *
  393. * This function used to run the workqueues itself. Now we just wait for the
  394. * helper threads to do it.
  395. */
  396. void fastcall flush_workqueue(struct workqueue_struct *wq)
  397. {
  398. if (is_single_threaded(wq)) {
  399. /* Always use first cpu's area. */
  400. flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
  401. } else {
  402. long sequence;
  403. int cpu;
  404. again:
  405. sequence = migrate_sequence;
  406. for_each_possible_cpu(cpu)
  407. flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
  408. if (unlikely(sequence != migrate_sequence))
  409. goto again;
  410. }
  411. }
  412. EXPORT_SYMBOL_GPL(flush_workqueue);
  413. static void wait_on_work(struct cpu_workqueue_struct *cwq,
  414. struct work_struct *work)
  415. {
  416. struct wq_barrier barr;
  417. int running = 0;
  418. spin_lock_irq(&cwq->lock);
  419. if (unlikely(cwq->current_work == work)) {
  420. insert_wq_barrier(cwq, &barr, 0);
  421. running = 1;
  422. }
  423. spin_unlock_irq(&cwq->lock);
  424. if (unlikely(running)) {
  425. mutex_unlock(&workqueue_mutex);
  426. wait_for_completion(&barr.done);
  427. mutex_lock(&workqueue_mutex);
  428. }
  429. }
  430. /**
  431. * flush_work - block until a work_struct's callback has terminated
  432. * @wq: the workqueue on which the work is queued
  433. * @work: the work which is to be flushed
  434. *
  435. * flush_work() will attempt to cancel the work if it is queued. If the work's
  436. * callback appears to be running, flush_work() will block until it has
  437. * completed.
  438. *
  439. * flush_work() is designed to be used when the caller is tearing down data
  440. * structures which the callback function operates upon. It is expected that,
  441. * prior to calling flush_work(), the caller has arranged for the work to not
  442. * be requeued.
  443. */
  444. void flush_work(struct workqueue_struct *wq, struct work_struct *work)
  445. {
  446. struct cpu_workqueue_struct *cwq;
  447. mutex_lock(&workqueue_mutex);
  448. cwq = get_wq_data(work);
  449. /* Was it ever queued ? */
  450. if (!cwq)
  451. goto out;
  452. /*
  453. * This work can't be re-queued, and the lock above protects us
  454. * from take_over_work(), no need to re-check that get_wq_data()
  455. * is still the same when we take cwq->lock.
  456. */
  457. spin_lock_irq(&cwq->lock);
  458. list_del_init(&work->entry);
  459. work_release(work);
  460. spin_unlock_irq(&cwq->lock);
  461. if (is_single_threaded(wq)) {
  462. /* Always use first cpu's area. */
  463. wait_on_work(per_cpu_ptr(wq->cpu_wq, singlethread_cpu), work);
  464. } else {
  465. int cpu;
  466. for_each_online_cpu(cpu)
  467. wait_on_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
  468. }
  469. out:
  470. mutex_unlock(&workqueue_mutex);
  471. }
  472. EXPORT_SYMBOL_GPL(flush_work);
  473. static void init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
  474. {
  475. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  476. cwq->wq = wq;
  477. spin_lock_init(&cwq->lock);
  478. INIT_LIST_HEAD(&cwq->worklist);
  479. init_waitqueue_head(&cwq->more_work);
  480. }
  481. static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
  482. int cpu)
  483. {
  484. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  485. struct task_struct *p;
  486. if (is_single_threaded(wq))
  487. p = kthread_create(worker_thread, cwq, "%s", wq->name);
  488. else
  489. p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
  490. if (IS_ERR(p))
  491. return NULL;
  492. cwq->thread = p;
  493. return p;
  494. }
  495. struct workqueue_struct *__create_workqueue(const char *name,
  496. int singlethread, int freezeable)
  497. {
  498. int cpu, destroy = 0;
  499. struct workqueue_struct *wq;
  500. struct task_struct *p;
  501. wq = kzalloc(sizeof(*wq), GFP_KERNEL);
  502. if (!wq)
  503. return NULL;
  504. wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
  505. if (!wq->cpu_wq) {
  506. kfree(wq);
  507. return NULL;
  508. }
  509. wq->name = name;
  510. wq->freezeable = freezeable;
  511. mutex_lock(&workqueue_mutex);
  512. if (singlethread) {
  513. INIT_LIST_HEAD(&wq->list);
  514. init_cpu_workqueue(wq, singlethread_cpu);
  515. p = create_workqueue_thread(wq, singlethread_cpu);
  516. if (!p)
  517. destroy = 1;
  518. else
  519. wake_up_process(p);
  520. } else {
  521. list_add(&wq->list, &workqueues);
  522. for_each_possible_cpu(cpu) {
  523. init_cpu_workqueue(wq, cpu);
  524. if (!cpu_online(cpu))
  525. continue;
  526. p = create_workqueue_thread(wq, cpu);
  527. if (p) {
  528. kthread_bind(p, cpu);
  529. wake_up_process(p);
  530. } else
  531. destroy = 1;
  532. }
  533. }
  534. mutex_unlock(&workqueue_mutex);
  535. /*
  536. * Was there any error during startup? If yes then clean up:
  537. */
  538. if (destroy) {
  539. destroy_workqueue(wq);
  540. wq = NULL;
  541. }
  542. return wq;
  543. }
  544. EXPORT_SYMBOL_GPL(__create_workqueue);
  545. static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
  546. {
  547. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  548. if (cwq->thread) {
  549. kthread_stop(cwq->thread);
  550. cwq->thread = NULL;
  551. }
  552. }
  553. /**
  554. * destroy_workqueue - safely terminate a workqueue
  555. * @wq: target workqueue
  556. *
  557. * Safely destroy a workqueue. All work currently pending will be done first.
  558. */
  559. void destroy_workqueue(struct workqueue_struct *wq)
  560. {
  561. int cpu;
  562. flush_workqueue(wq);
  563. /* We don't need the distraction of CPUs appearing and vanishing. */
  564. mutex_lock(&workqueue_mutex);
  565. if (is_single_threaded(wq))
  566. cleanup_workqueue_thread(wq, singlethread_cpu);
  567. else {
  568. for_each_online_cpu(cpu)
  569. cleanup_workqueue_thread(wq, cpu);
  570. list_del(&wq->list);
  571. }
  572. mutex_unlock(&workqueue_mutex);
  573. free_percpu(wq->cpu_wq);
  574. kfree(wq);
  575. }
  576. EXPORT_SYMBOL_GPL(destroy_workqueue);
  577. static struct workqueue_struct *keventd_wq;
  578. /**
  579. * schedule_work - put work task in global workqueue
  580. * @work: job to be done
  581. *
  582. * This puts a job in the kernel-global workqueue.
  583. */
  584. int fastcall schedule_work(struct work_struct *work)
  585. {
  586. return queue_work(keventd_wq, work);
  587. }
  588. EXPORT_SYMBOL(schedule_work);
  589. /**
  590. * schedule_delayed_work - put work task in global workqueue after delay
  591. * @dwork: job to be done
  592. * @delay: number of jiffies to wait or 0 for immediate execution
  593. *
  594. * After waiting for a given time this puts a job in the kernel-global
  595. * workqueue.
  596. */
  597. int fastcall schedule_delayed_work(struct delayed_work *dwork,
  598. unsigned long delay)
  599. {
  600. timer_stats_timer_set_start_info(&dwork->timer);
  601. return queue_delayed_work(keventd_wq, dwork, delay);
  602. }
  603. EXPORT_SYMBOL(schedule_delayed_work);
  604. /**
  605. * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
  606. * @cpu: cpu to use
  607. * @dwork: job to be done
  608. * @delay: number of jiffies to wait
  609. *
  610. * After waiting for a given time this puts a job in the kernel-global
  611. * workqueue on the specified CPU.
  612. */
  613. int schedule_delayed_work_on(int cpu,
  614. struct delayed_work *dwork, unsigned long delay)
  615. {
  616. return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
  617. }
  618. EXPORT_SYMBOL(schedule_delayed_work_on);
  619. /**
  620. * schedule_on_each_cpu - call a function on each online CPU from keventd
  621. * @func: the function to call
  622. *
  623. * Returns zero on success.
  624. * Returns -ve errno on failure.
  625. *
  626. * Appears to be racy against CPU hotplug.
  627. *
  628. * schedule_on_each_cpu() is very slow.
  629. */
  630. int schedule_on_each_cpu(work_func_t func)
  631. {
  632. int cpu;
  633. struct work_struct *works;
  634. works = alloc_percpu(struct work_struct);
  635. if (!works)
  636. return -ENOMEM;
  637. preempt_disable(); /* CPU hotplug */
  638. for_each_online_cpu(cpu) {
  639. struct work_struct *work = per_cpu_ptr(works, cpu);
  640. INIT_WORK(work, func);
  641. set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
  642. __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
  643. }
  644. preempt_enable();
  645. flush_workqueue(keventd_wq);
  646. free_percpu(works);
  647. return 0;
  648. }
  649. void flush_scheduled_work(void)
  650. {
  651. flush_workqueue(keventd_wq);
  652. }
  653. EXPORT_SYMBOL(flush_scheduled_work);
  654. void flush_work_keventd(struct work_struct *work)
  655. {
  656. flush_work(keventd_wq, work);
  657. }
  658. EXPORT_SYMBOL(flush_work_keventd);
  659. /**
  660. * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work.
  661. * @wq: the controlling workqueue structure
  662. * @dwork: the delayed work struct
  663. */
  664. void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
  665. struct delayed_work *dwork)
  666. {
  667. while (!cancel_delayed_work(dwork))
  668. flush_workqueue(wq);
  669. }
  670. EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
  671. /**
  672. * cancel_rearming_delayed_work - reliably kill off a delayed keventd work whose handler rearms the delayed work.
  673. * @dwork: the delayed work struct
  674. */
  675. void cancel_rearming_delayed_work(struct delayed_work *dwork)
  676. {
  677. cancel_rearming_delayed_workqueue(keventd_wq, dwork);
  678. }
  679. EXPORT_SYMBOL(cancel_rearming_delayed_work);
  680. /**
  681. * execute_in_process_context - reliably execute the routine with user context
  682. * @fn: the function to execute
  683. * @ew: guaranteed storage for the execute work structure (must
  684. * be available when the work executes)
  685. *
  686. * Executes the function immediately if process context is available,
  687. * otherwise schedules the function for delayed execution.
  688. *
  689. * Returns: 0 - function was executed
  690. * 1 - function was scheduled for execution
  691. */
  692. int execute_in_process_context(work_func_t fn, struct execute_work *ew)
  693. {
  694. if (!in_interrupt()) {
  695. fn(&ew->work);
  696. return 0;
  697. }
  698. INIT_WORK(&ew->work, fn);
  699. schedule_work(&ew->work);
  700. return 1;
  701. }
  702. EXPORT_SYMBOL_GPL(execute_in_process_context);
  703. int keventd_up(void)
  704. {
  705. return keventd_wq != NULL;
  706. }
  707. int current_is_keventd(void)
  708. {
  709. struct cpu_workqueue_struct *cwq;
  710. int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */
  711. int ret = 0;
  712. BUG_ON(!keventd_wq);
  713. cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
  714. if (current == cwq->thread)
  715. ret = 1;
  716. return ret;
  717. }
  718. /* Take the work from this (downed) CPU. */
  719. static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
  720. {
  721. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  722. struct list_head list;
  723. struct work_struct *work;
  724. spin_lock_irq(&cwq->lock);
  725. list_replace_init(&cwq->worklist, &list);
  726. migrate_sequence++;
  727. while (!list_empty(&list)) {
  728. printk("Taking work for %s\n", wq->name);
  729. work = list_entry(list.next,struct work_struct,entry);
  730. list_del(&work->entry);
  731. __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
  732. }
  733. spin_unlock_irq(&cwq->lock);
  734. }
  735. /* We're holding the cpucontrol mutex here */
  736. static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
  737. unsigned long action,
  738. void *hcpu)
  739. {
  740. unsigned int hotcpu = (unsigned long)hcpu;
  741. struct workqueue_struct *wq;
  742. switch (action) {
  743. case CPU_UP_PREPARE:
  744. mutex_lock(&workqueue_mutex);
  745. /* Create a new workqueue thread for it. */
  746. list_for_each_entry(wq, &workqueues, list) {
  747. if (!create_workqueue_thread(wq, hotcpu)) {
  748. printk("workqueue for %i failed\n", hotcpu);
  749. return NOTIFY_BAD;
  750. }
  751. }
  752. break;
  753. case CPU_ONLINE:
  754. /* Kick off worker threads. */
  755. list_for_each_entry(wq, &workqueues, list) {
  756. struct cpu_workqueue_struct *cwq;
  757. cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
  758. kthread_bind(cwq->thread, hotcpu);
  759. wake_up_process(cwq->thread);
  760. }
  761. mutex_unlock(&workqueue_mutex);
  762. break;
  763. case CPU_UP_CANCELED:
  764. list_for_each_entry(wq, &workqueues, list) {
  765. if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread)
  766. continue;
  767. /* Unbind so it can run. */
  768. kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
  769. any_online_cpu(cpu_online_map));
  770. cleanup_workqueue_thread(wq, hotcpu);
  771. }
  772. mutex_unlock(&workqueue_mutex);
  773. break;
  774. case CPU_DOWN_PREPARE:
  775. mutex_lock(&workqueue_mutex);
  776. break;
  777. case CPU_DOWN_FAILED:
  778. mutex_unlock(&workqueue_mutex);
  779. break;
  780. case CPU_DEAD:
  781. list_for_each_entry(wq, &workqueues, list)
  782. cleanup_workqueue_thread(wq, hotcpu);
  783. list_for_each_entry(wq, &workqueues, list)
  784. take_over_work(wq, hotcpu);
  785. mutex_unlock(&workqueue_mutex);
  786. break;
  787. }
  788. return NOTIFY_OK;
  789. }
  790. void init_workqueues(void)
  791. {
  792. singlethread_cpu = first_cpu(cpu_possible_map);
  793. hotcpu_notifier(workqueue_cpu_callback, 0);
  794. keventd_wq = create_workqueue("events");
  795. BUG_ON(!keventd_wq);
  796. }