workqueue.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729
  1. /*
  2. * linux/kernel/workqueue.c
  3. *
  4. * Generic mechanism for defining kernel helper threads for running
  5. * arbitrary tasks in process context.
  6. *
  7. * Started by Ingo Molnar, Copyright (C) 2002
  8. *
  9. * Derived from the taskqueue/keventd code by:
  10. *
  11. * David Woodhouse <dwmw2@infradead.org>
  12. * Andrew Morton <andrewm@uow.edu.au>
  13. * Kai Petzke <wpp@marie.physik.tu-berlin.de>
  14. * Theodore Ts'o <tytso@mit.edu>
  15. *
  16. * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
  17. */
  18. #include <linux/module.h>
  19. #include <linux/kernel.h>
  20. #include <linux/sched.h>
  21. #include <linux/init.h>
  22. #include <linux/signal.h>
  23. #include <linux/completion.h>
  24. #include <linux/workqueue.h>
  25. #include <linux/slab.h>
  26. #include <linux/cpu.h>
  27. #include <linux/notifier.h>
  28. #include <linux/kthread.h>
  29. #include <linux/hardirq.h>
  30. #include <linux/mempolicy.h>
  31. /*
  32. * The per-CPU workqueue (if single thread, we always use the first
  33. * possible cpu).
  34. *
  35. * The sequence counters are for flush_scheduled_work(). It wants to wait
  36. * until all currently-scheduled works are completed, but it doesn't
  37. * want to be livelocked by new, incoming ones. So it waits until
  38. * remove_sequence is >= the insert_sequence which pertained when
  39. * flush_scheduled_work() was called.
  40. */
  41. struct cpu_workqueue_struct {
  42. spinlock_t lock;
  43. long remove_sequence; /* Least-recently added (next to run) */
  44. long insert_sequence; /* Next to add */
  45. struct list_head worklist;
  46. wait_queue_head_t more_work;
  47. wait_queue_head_t work_done;
  48. struct workqueue_struct *wq;
  49. struct task_struct *thread;
  50. int run_depth; /* Detect run_workqueue() recursion depth */
  51. } ____cacheline_aligned;
  52. /*
  53. * The externally visible workqueue abstraction is an array of
  54. * per-CPU workqueues:
  55. */
  56. struct workqueue_struct {
  57. struct cpu_workqueue_struct *cpu_wq;
  58. const char *name;
  59. struct list_head list; /* Empty if single thread */
  60. };
  61. /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
  62. threads to each one as cpus come/go. */
  63. static DEFINE_MUTEX(workqueue_mutex);
  64. static LIST_HEAD(workqueues);
  65. static int singlethread_cpu;
  66. /* If it's single threaded, it isn't in the list of workqueues. */
  67. static inline int is_single_threaded(struct workqueue_struct *wq)
  68. {
  69. return list_empty(&wq->list);
  70. }
  71. static inline void set_wq_data(struct work_struct *work, void *wq)
  72. {
  73. unsigned long new, old, res;
  74. /* assume the pending flag is already set and that the task has already
  75. * been queued on this workqueue */
  76. new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING);
  77. res = work->management;
  78. if (res != new) {
  79. do {
  80. old = res;
  81. new = (unsigned long) wq;
  82. new |= (old & WORK_STRUCT_FLAG_MASK);
  83. res = cmpxchg(&work->management, old, new);
  84. } while (res != old);
  85. }
  86. }
  87. static inline void *get_wq_data(struct work_struct *work)
  88. {
  89. return (void *) (work->management & WORK_STRUCT_WQ_DATA_MASK);
  90. }
  91. /* Preempt must be disabled. */
  92. static void __queue_work(struct cpu_workqueue_struct *cwq,
  93. struct work_struct *work)
  94. {
  95. unsigned long flags;
  96. spin_lock_irqsave(&cwq->lock, flags);
  97. set_wq_data(work, cwq);
  98. list_add_tail(&work->entry, &cwq->worklist);
  99. cwq->insert_sequence++;
  100. wake_up(&cwq->more_work);
  101. spin_unlock_irqrestore(&cwq->lock, flags);
  102. }
  103. /**
  104. * queue_work - queue work on a workqueue
  105. * @wq: workqueue to use
  106. * @work: work to queue
  107. *
  108. * Returns 0 if @work was already on a queue, non-zero otherwise.
  109. *
  110. * We queue the work to the CPU it was submitted, but there is no
  111. * guarantee that it will be processed by that CPU.
  112. */
  113. int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
  114. {
  115. int ret = 0, cpu = get_cpu();
  116. if (!test_and_set_bit(WORK_STRUCT_PENDING, &work->management)) {
  117. if (unlikely(is_single_threaded(wq)))
  118. cpu = singlethread_cpu;
  119. BUG_ON(!list_empty(&work->entry));
  120. __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
  121. ret = 1;
  122. }
  123. put_cpu();
  124. return ret;
  125. }
  126. EXPORT_SYMBOL_GPL(queue_work);
  127. static void delayed_work_timer_fn(unsigned long __data)
  128. {
  129. struct delayed_work *dwork = (struct delayed_work *)__data;
  130. struct workqueue_struct *wq = get_wq_data(&dwork->work);
  131. int cpu = smp_processor_id();
  132. if (unlikely(is_single_threaded(wq)))
  133. cpu = singlethread_cpu;
  134. __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work);
  135. }
  136. /**
  137. * queue_delayed_work - queue work on a workqueue after delay
  138. * @wq: workqueue to use
  139. * @work: delayable work to queue
  140. * @delay: number of jiffies to wait before queueing
  141. *
  142. * Returns 0 if @work was already on a queue, non-zero otherwise.
  143. */
  144. int fastcall queue_delayed_work(struct workqueue_struct *wq,
  145. struct delayed_work *dwork, unsigned long delay)
  146. {
  147. int ret = 0;
  148. struct timer_list *timer = &dwork->timer;
  149. struct work_struct *work = &dwork->work;
  150. if (delay == 0)
  151. return queue_work(wq, work);
  152. if (!test_and_set_bit(WORK_STRUCT_PENDING, &work->management)) {
  153. BUG_ON(timer_pending(timer));
  154. BUG_ON(!list_empty(&work->entry));
  155. /* This stores wq for the moment, for the timer_fn */
  156. set_wq_data(work, wq);
  157. timer->expires = jiffies + delay;
  158. timer->data = (unsigned long)dwork;
  159. timer->function = delayed_work_timer_fn;
  160. add_timer(timer);
  161. ret = 1;
  162. }
  163. return ret;
  164. }
  165. EXPORT_SYMBOL_GPL(queue_delayed_work);
  166. /**
  167. * queue_delayed_work_on - queue work on specific CPU after delay
  168. * @cpu: CPU number to execute work on
  169. * @wq: workqueue to use
  170. * @work: work to queue
  171. * @delay: number of jiffies to wait before queueing
  172. *
  173. * Returns 0 if @work was already on a queue, non-zero otherwise.
  174. */
  175. int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
  176. struct delayed_work *dwork, unsigned long delay)
  177. {
  178. int ret = 0;
  179. struct timer_list *timer = &dwork->timer;
  180. struct work_struct *work = &dwork->work;
  181. if (!test_and_set_bit(WORK_STRUCT_PENDING, &work->management)) {
  182. BUG_ON(timer_pending(timer));
  183. BUG_ON(!list_empty(&work->entry));
  184. /* This stores wq for the moment, for the timer_fn */
  185. set_wq_data(work, wq);
  186. timer->expires = jiffies + delay;
  187. timer->data = (unsigned long)dwork;
  188. timer->function = delayed_work_timer_fn;
  189. add_timer_on(timer, cpu);
  190. ret = 1;
  191. }
  192. return ret;
  193. }
  194. EXPORT_SYMBOL_GPL(queue_delayed_work_on);
  195. static void run_workqueue(struct cpu_workqueue_struct *cwq)
  196. {
  197. unsigned long flags;
  198. /*
  199. * Keep taking off work from the queue until
  200. * done.
  201. */
  202. spin_lock_irqsave(&cwq->lock, flags);
  203. cwq->run_depth++;
  204. if (cwq->run_depth > 3) {
  205. /* morton gets to eat his hat */
  206. printk("%s: recursion depth exceeded: %d\n",
  207. __FUNCTION__, cwq->run_depth);
  208. dump_stack();
  209. }
  210. while (!list_empty(&cwq->worklist)) {
  211. struct work_struct *work = list_entry(cwq->worklist.next,
  212. struct work_struct, entry);
  213. work_func_t f = work->func;
  214. list_del_init(cwq->worklist.next);
  215. spin_unlock_irqrestore(&cwq->lock, flags);
  216. BUG_ON(get_wq_data(work) != cwq);
  217. if (!test_bit(WORK_STRUCT_NOAUTOREL, &work->management))
  218. work_release(work);
  219. f(work);
  220. spin_lock_irqsave(&cwq->lock, flags);
  221. cwq->remove_sequence++;
  222. wake_up(&cwq->work_done);
  223. }
  224. cwq->run_depth--;
  225. spin_unlock_irqrestore(&cwq->lock, flags);
  226. }
  227. static int worker_thread(void *__cwq)
  228. {
  229. struct cpu_workqueue_struct *cwq = __cwq;
  230. DECLARE_WAITQUEUE(wait, current);
  231. struct k_sigaction sa;
  232. sigset_t blocked;
  233. current->flags |= PF_NOFREEZE;
  234. set_user_nice(current, -5);
  235. /* Block and flush all signals */
  236. sigfillset(&blocked);
  237. sigprocmask(SIG_BLOCK, &blocked, NULL);
  238. flush_signals(current);
  239. /*
  240. * We inherited MPOL_INTERLEAVE from the booting kernel.
  241. * Set MPOL_DEFAULT to insure node local allocations.
  242. */
  243. numa_default_policy();
  244. /* SIG_IGN makes children autoreap: see do_notify_parent(). */
  245. sa.sa.sa_handler = SIG_IGN;
  246. sa.sa.sa_flags = 0;
  247. siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
  248. do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
  249. set_current_state(TASK_INTERRUPTIBLE);
  250. while (!kthread_should_stop()) {
  251. add_wait_queue(&cwq->more_work, &wait);
  252. if (list_empty(&cwq->worklist))
  253. schedule();
  254. else
  255. __set_current_state(TASK_RUNNING);
  256. remove_wait_queue(&cwq->more_work, &wait);
  257. if (!list_empty(&cwq->worklist))
  258. run_workqueue(cwq);
  259. set_current_state(TASK_INTERRUPTIBLE);
  260. }
  261. __set_current_state(TASK_RUNNING);
  262. return 0;
  263. }
  264. static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
  265. {
  266. if (cwq->thread == current) {
  267. /*
  268. * Probably keventd trying to flush its own queue. So simply run
  269. * it by hand rather than deadlocking.
  270. */
  271. run_workqueue(cwq);
  272. } else {
  273. DEFINE_WAIT(wait);
  274. long sequence_needed;
  275. spin_lock_irq(&cwq->lock);
  276. sequence_needed = cwq->insert_sequence;
  277. while (sequence_needed - cwq->remove_sequence > 0) {
  278. prepare_to_wait(&cwq->work_done, &wait,
  279. TASK_UNINTERRUPTIBLE);
  280. spin_unlock_irq(&cwq->lock);
  281. schedule();
  282. spin_lock_irq(&cwq->lock);
  283. }
  284. finish_wait(&cwq->work_done, &wait);
  285. spin_unlock_irq(&cwq->lock);
  286. }
  287. }
  288. /**
  289. * flush_workqueue - ensure that any scheduled work has run to completion.
  290. * @wq: workqueue to flush
  291. *
  292. * Forces execution of the workqueue and blocks until its completion.
  293. * This is typically used in driver shutdown handlers.
  294. *
  295. * This function will sample each workqueue's current insert_sequence number and
  296. * will sleep until the head sequence is greater than or equal to that. This
  297. * means that we sleep until all works which were queued on entry have been
  298. * handled, but we are not livelocked by new incoming ones.
  299. *
  300. * This function used to run the workqueues itself. Now we just wait for the
  301. * helper threads to do it.
  302. */
  303. void fastcall flush_workqueue(struct workqueue_struct *wq)
  304. {
  305. might_sleep();
  306. if (is_single_threaded(wq)) {
  307. /* Always use first cpu's area. */
  308. flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
  309. } else {
  310. int cpu;
  311. mutex_lock(&workqueue_mutex);
  312. for_each_online_cpu(cpu)
  313. flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
  314. mutex_unlock(&workqueue_mutex);
  315. }
  316. }
  317. EXPORT_SYMBOL_GPL(flush_workqueue);
  318. static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
  319. int cpu)
  320. {
  321. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  322. struct task_struct *p;
  323. spin_lock_init(&cwq->lock);
  324. cwq->wq = wq;
  325. cwq->thread = NULL;
  326. cwq->insert_sequence = 0;
  327. cwq->remove_sequence = 0;
  328. INIT_LIST_HEAD(&cwq->worklist);
  329. init_waitqueue_head(&cwq->more_work);
  330. init_waitqueue_head(&cwq->work_done);
  331. if (is_single_threaded(wq))
  332. p = kthread_create(worker_thread, cwq, "%s", wq->name);
  333. else
  334. p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
  335. if (IS_ERR(p))
  336. return NULL;
  337. cwq->thread = p;
  338. return p;
  339. }
  340. struct workqueue_struct *__create_workqueue(const char *name,
  341. int singlethread)
  342. {
  343. int cpu, destroy = 0;
  344. struct workqueue_struct *wq;
  345. struct task_struct *p;
  346. wq = kzalloc(sizeof(*wq), GFP_KERNEL);
  347. if (!wq)
  348. return NULL;
  349. wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
  350. if (!wq->cpu_wq) {
  351. kfree(wq);
  352. return NULL;
  353. }
  354. wq->name = name;
  355. mutex_lock(&workqueue_mutex);
  356. if (singlethread) {
  357. INIT_LIST_HEAD(&wq->list);
  358. p = create_workqueue_thread(wq, singlethread_cpu);
  359. if (!p)
  360. destroy = 1;
  361. else
  362. wake_up_process(p);
  363. } else {
  364. list_add(&wq->list, &workqueues);
  365. for_each_online_cpu(cpu) {
  366. p = create_workqueue_thread(wq, cpu);
  367. if (p) {
  368. kthread_bind(p, cpu);
  369. wake_up_process(p);
  370. } else
  371. destroy = 1;
  372. }
  373. }
  374. mutex_unlock(&workqueue_mutex);
  375. /*
  376. * Was there any error during startup? If yes then clean up:
  377. */
  378. if (destroy) {
  379. destroy_workqueue(wq);
  380. wq = NULL;
  381. }
  382. return wq;
  383. }
  384. EXPORT_SYMBOL_GPL(__create_workqueue);
  385. static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
  386. {
  387. struct cpu_workqueue_struct *cwq;
  388. unsigned long flags;
  389. struct task_struct *p;
  390. cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  391. spin_lock_irqsave(&cwq->lock, flags);
  392. p = cwq->thread;
  393. cwq->thread = NULL;
  394. spin_unlock_irqrestore(&cwq->lock, flags);
  395. if (p)
  396. kthread_stop(p);
  397. }
  398. /**
  399. * destroy_workqueue - safely terminate a workqueue
  400. * @wq: target workqueue
  401. *
  402. * Safely destroy a workqueue. All work currently pending will be done first.
  403. */
  404. void destroy_workqueue(struct workqueue_struct *wq)
  405. {
  406. int cpu;
  407. flush_workqueue(wq);
  408. /* We don't need the distraction of CPUs appearing and vanishing. */
  409. mutex_lock(&workqueue_mutex);
  410. if (is_single_threaded(wq))
  411. cleanup_workqueue_thread(wq, singlethread_cpu);
  412. else {
  413. for_each_online_cpu(cpu)
  414. cleanup_workqueue_thread(wq, cpu);
  415. list_del(&wq->list);
  416. }
  417. mutex_unlock(&workqueue_mutex);
  418. free_percpu(wq->cpu_wq);
  419. kfree(wq);
  420. }
  421. EXPORT_SYMBOL_GPL(destroy_workqueue);
  422. static struct workqueue_struct *keventd_wq;
  423. /**
  424. * schedule_work - put work task in global workqueue
  425. * @work: job to be done
  426. *
  427. * This puts a job in the kernel-global workqueue.
  428. */
  429. int fastcall schedule_work(struct work_struct *work)
  430. {
  431. return queue_work(keventd_wq, work);
  432. }
  433. EXPORT_SYMBOL(schedule_work);
  434. /**
  435. * schedule_delayed_work - put work task in global workqueue after delay
  436. * @dwork: job to be done
  437. * @delay: number of jiffies to wait or 0 for immediate execution
  438. *
  439. * After waiting for a given time this puts a job in the kernel-global
  440. * workqueue.
  441. */
  442. int fastcall schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)
  443. {
  444. return queue_delayed_work(keventd_wq, dwork, delay);
  445. }
  446. EXPORT_SYMBOL(schedule_delayed_work);
  447. /**
  448. * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
  449. * @cpu: cpu to use
  450. * @dwork: job to be done
  451. * @delay: number of jiffies to wait
  452. *
  453. * After waiting for a given time this puts a job in the kernel-global
  454. * workqueue on the specified CPU.
  455. */
  456. int schedule_delayed_work_on(int cpu,
  457. struct delayed_work *dwork, unsigned long delay)
  458. {
  459. return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
  460. }
  461. EXPORT_SYMBOL(schedule_delayed_work_on);
  462. /**
  463. * schedule_on_each_cpu - call a function on each online CPU from keventd
  464. * @func: the function to call
  465. *
  466. * Returns zero on success.
  467. * Returns -ve errno on failure.
  468. *
  469. * Appears to be racy against CPU hotplug.
  470. *
  471. * schedule_on_each_cpu() is very slow.
  472. */
  473. int schedule_on_each_cpu(work_func_t func)
  474. {
  475. int cpu;
  476. struct work_struct *works;
  477. works = alloc_percpu(struct work_struct);
  478. if (!works)
  479. return -ENOMEM;
  480. mutex_lock(&workqueue_mutex);
  481. for_each_online_cpu(cpu) {
  482. INIT_WORK(per_cpu_ptr(works, cpu), func);
  483. __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu),
  484. per_cpu_ptr(works, cpu));
  485. }
  486. mutex_unlock(&workqueue_mutex);
  487. flush_workqueue(keventd_wq);
  488. free_percpu(works);
  489. return 0;
  490. }
  491. void flush_scheduled_work(void)
  492. {
  493. flush_workqueue(keventd_wq);
  494. }
  495. EXPORT_SYMBOL(flush_scheduled_work);
  496. /**
  497. * cancel_rearming_delayed_workqueue - reliably kill off a delayed
  498. * work whose handler rearms the delayed work.
  499. * @wq: the controlling workqueue structure
  500. * @dwork: the delayed work struct
  501. */
  502. void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
  503. struct delayed_work *dwork)
  504. {
  505. while (!cancel_delayed_work(dwork))
  506. flush_workqueue(wq);
  507. }
  508. EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
  509. /**
  510. * cancel_rearming_delayed_work - reliably kill off a delayed keventd
  511. * work whose handler rearms the delayed work.
  512. * @dwork: the delayed work struct
  513. */
  514. void cancel_rearming_delayed_work(struct delayed_work *dwork)
  515. {
  516. cancel_rearming_delayed_workqueue(keventd_wq, dwork);
  517. }
  518. EXPORT_SYMBOL(cancel_rearming_delayed_work);
  519. /**
  520. * execute_in_process_context - reliably execute the routine with user context
  521. * @fn: the function to execute
  522. * @ew: guaranteed storage for the execute work structure (must
  523. * be available when the work executes)
  524. *
  525. * Executes the function immediately if process context is available,
  526. * otherwise schedules the function for delayed execution.
  527. *
  528. * Returns: 0 - function was executed
  529. * 1 - function was scheduled for execution
  530. */
  531. int execute_in_process_context(work_func_t fn, struct execute_work *ew)
  532. {
  533. if (!in_interrupt()) {
  534. fn(&ew->work);
  535. return 0;
  536. }
  537. INIT_WORK(&ew->work, fn);
  538. schedule_work(&ew->work);
  539. return 1;
  540. }
  541. EXPORT_SYMBOL_GPL(execute_in_process_context);
  542. int keventd_up(void)
  543. {
  544. return keventd_wq != NULL;
  545. }
  546. int current_is_keventd(void)
  547. {
  548. struct cpu_workqueue_struct *cwq;
  549. int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */
  550. int ret = 0;
  551. BUG_ON(!keventd_wq);
  552. cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
  553. if (current == cwq->thread)
  554. ret = 1;
  555. return ret;
  556. }
  557. #ifdef CONFIG_HOTPLUG_CPU
  558. /* Take the work from this (downed) CPU. */
  559. static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
  560. {
  561. struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
  562. struct list_head list;
  563. struct work_struct *work;
  564. spin_lock_irq(&cwq->lock);
  565. list_replace_init(&cwq->worklist, &list);
  566. while (!list_empty(&list)) {
  567. printk("Taking work for %s\n", wq->name);
  568. work = list_entry(list.next,struct work_struct,entry);
  569. list_del(&work->entry);
  570. __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
  571. }
  572. spin_unlock_irq(&cwq->lock);
  573. }
  574. /* We're holding the cpucontrol mutex here */
  575. static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
  576. unsigned long action,
  577. void *hcpu)
  578. {
  579. unsigned int hotcpu = (unsigned long)hcpu;
  580. struct workqueue_struct *wq;
  581. switch (action) {
  582. case CPU_UP_PREPARE:
  583. mutex_lock(&workqueue_mutex);
  584. /* Create a new workqueue thread for it. */
  585. list_for_each_entry(wq, &workqueues, list) {
  586. if (!create_workqueue_thread(wq, hotcpu)) {
  587. printk("workqueue for %i failed\n", hotcpu);
  588. return NOTIFY_BAD;
  589. }
  590. }
  591. break;
  592. case CPU_ONLINE:
  593. /* Kick off worker threads. */
  594. list_for_each_entry(wq, &workqueues, list) {
  595. struct cpu_workqueue_struct *cwq;
  596. cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
  597. kthread_bind(cwq->thread, hotcpu);
  598. wake_up_process(cwq->thread);
  599. }
  600. mutex_unlock(&workqueue_mutex);
  601. break;
  602. case CPU_UP_CANCELED:
  603. list_for_each_entry(wq, &workqueues, list) {
  604. if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread)
  605. continue;
  606. /* Unbind so it can run. */
  607. kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
  608. any_online_cpu(cpu_online_map));
  609. cleanup_workqueue_thread(wq, hotcpu);
  610. }
  611. mutex_unlock(&workqueue_mutex);
  612. break;
  613. case CPU_DOWN_PREPARE:
  614. mutex_lock(&workqueue_mutex);
  615. break;
  616. case CPU_DOWN_FAILED:
  617. mutex_unlock(&workqueue_mutex);
  618. break;
  619. case CPU_DEAD:
  620. list_for_each_entry(wq, &workqueues, list)
  621. cleanup_workqueue_thread(wq, hotcpu);
  622. list_for_each_entry(wq, &workqueues, list)
  623. take_over_work(wq, hotcpu);
  624. mutex_unlock(&workqueue_mutex);
  625. break;
  626. }
  627. return NOTIFY_OK;
  628. }
  629. #endif
  630. void init_workqueues(void)
  631. {
  632. singlethread_cpu = first_cpu(cpu_possible_map);
  633. hotcpu_notifier(workqueue_cpu_callback, 0);
  634. keventd_wq = create_workqueue("events");
  635. BUG_ON(!keventd_wq);
  636. }