padata.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. /*
  2. * padata.c - generic interface to process data streams in parallel
  3. *
  4. * Copyright (C) 2008, 2009 secunet Security Networks AG
  5. * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
  6. *
  7. * This program is free software; you can redistribute it and/or modify it
  8. * under the terms and conditions of the GNU General Public License,
  9. * version 2, as published by the Free Software Foundation.
  10. *
  11. * This program is distributed in the hope it will be useful, but WITHOUT
  12. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13. * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  14. * more details.
  15. *
  16. * You should have received a copy of the GNU General Public License along with
  17. * this program; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
  19. */
  20. #include <linux/module.h>
  21. #include <linux/cpumask.h>
  22. #include <linux/err.h>
  23. #include <linux/cpu.h>
  24. #include <linux/padata.h>
  25. #include <linux/mutex.h>
  26. #include <linux/sched.h>
  27. #include <linux/rcupdate.h>
  28. #define MAX_SEQ_NR INT_MAX - NR_CPUS
  29. #define MAX_OBJ_NUM 10000 * NR_CPUS
  30. static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
  31. {
  32. int cpu, target_cpu;
  33. target_cpu = cpumask_first(pd->cpumask);
  34. for (cpu = 0; cpu < cpu_index; cpu++)
  35. target_cpu = cpumask_next(target_cpu, pd->cpumask);
  36. return target_cpu;
  37. }
  38. static int padata_cpu_hash(struct padata_priv *padata)
  39. {
  40. int cpu_index;
  41. struct parallel_data *pd;
  42. pd = padata->pd;
  43. /*
  44. * Hash the sequence numbers to the cpus by taking
  45. * seq_nr mod. number of cpus in use.
  46. */
  47. cpu_index = padata->seq_nr % cpumask_weight(pd->cpumask);
  48. return padata_index_to_cpu(pd, cpu_index);
  49. }
  50. static void padata_parallel_worker(struct work_struct *work)
  51. {
  52. struct padata_queue *queue;
  53. struct parallel_data *pd;
  54. struct padata_instance *pinst;
  55. LIST_HEAD(local_list);
  56. local_bh_disable();
  57. queue = container_of(work, struct padata_queue, pwork);
  58. pd = queue->pd;
  59. pinst = pd->pinst;
  60. spin_lock(&queue->parallel.lock);
  61. list_replace_init(&queue->parallel.list, &local_list);
  62. spin_unlock(&queue->parallel.lock);
  63. while (!list_empty(&local_list)) {
  64. struct padata_priv *padata;
  65. padata = list_entry(local_list.next,
  66. struct padata_priv, list);
  67. list_del_init(&padata->list);
  68. padata->parallel(padata);
  69. }
  70. local_bh_enable();
  71. }
  72. /*
  73. * padata_do_parallel - padata parallelization function
  74. *
  75. * @pinst: padata instance
  76. * @padata: object to be parallelized
  77. * @cb_cpu: cpu the serialization callback function will run on,
  78. * must be in the cpumask of padata.
  79. *
  80. * The parallelization callback function will run with BHs off.
  81. * Note: Every object which is parallelized by padata_do_parallel
  82. * must be seen by padata_do_serial.
  83. */
  84. int padata_do_parallel(struct padata_instance *pinst,
  85. struct padata_priv *padata, int cb_cpu)
  86. {
  87. int target_cpu, err;
  88. struct padata_queue *queue;
  89. struct parallel_data *pd;
  90. rcu_read_lock_bh();
  91. pd = rcu_dereference(pinst->pd);
  92. err = 0;
  93. if (!(pinst->flags & PADATA_INIT))
  94. goto out;
  95. err = -EBUSY;
  96. if ((pinst->flags & PADATA_RESET))
  97. goto out;
  98. if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM)
  99. goto out;
  100. err = -EINVAL;
  101. if (!cpumask_test_cpu(cb_cpu, pd->cpumask))
  102. goto out;
  103. err = -EINPROGRESS;
  104. atomic_inc(&pd->refcnt);
  105. padata->pd = pd;
  106. padata->cb_cpu = cb_cpu;
  107. if (unlikely(atomic_read(&pd->seq_nr) == pd->max_seq_nr))
  108. atomic_set(&pd->seq_nr, -1);
  109. padata->seq_nr = atomic_inc_return(&pd->seq_nr);
  110. target_cpu = padata_cpu_hash(padata);
  111. queue = per_cpu_ptr(pd->queue, target_cpu);
  112. spin_lock(&queue->parallel.lock);
  113. list_add_tail(&padata->list, &queue->parallel.list);
  114. spin_unlock(&queue->parallel.lock);
  115. queue_work_on(target_cpu, pinst->wq, &queue->pwork);
  116. out:
  117. rcu_read_unlock_bh();
  118. return err;
  119. }
  120. EXPORT_SYMBOL(padata_do_parallel);
  121. static struct padata_priv *padata_get_next(struct parallel_data *pd)
  122. {
  123. int cpu, num_cpus, empty, calc_seq_nr;
  124. int seq_nr, next_nr, overrun, next_overrun;
  125. struct padata_queue *queue, *next_queue;
  126. struct padata_priv *padata;
  127. struct padata_list *reorder;
  128. empty = 0;
  129. next_nr = -1;
  130. next_overrun = 0;
  131. next_queue = NULL;
  132. num_cpus = cpumask_weight(pd->cpumask);
  133. for_each_cpu(cpu, pd->cpumask) {
  134. queue = per_cpu_ptr(pd->queue, cpu);
  135. reorder = &queue->reorder;
  136. /*
  137. * Calculate the seq_nr of the object that should be
  138. * next in this queue.
  139. */
  140. overrun = 0;
  141. calc_seq_nr = (atomic_read(&queue->num_obj) * num_cpus)
  142. + queue->cpu_index;
  143. if (unlikely(calc_seq_nr > pd->max_seq_nr)) {
  144. calc_seq_nr = calc_seq_nr - pd->max_seq_nr - 1;
  145. overrun = 1;
  146. }
  147. if (!list_empty(&reorder->list)) {
  148. padata = list_entry(reorder->list.next,
  149. struct padata_priv, list);
  150. seq_nr = padata->seq_nr;
  151. BUG_ON(calc_seq_nr != seq_nr);
  152. } else {
  153. seq_nr = calc_seq_nr;
  154. empty++;
  155. }
  156. if (next_nr < 0 || seq_nr < next_nr
  157. || (next_overrun && !overrun)) {
  158. next_nr = seq_nr;
  159. next_overrun = overrun;
  160. next_queue = queue;
  161. }
  162. }
  163. padata = NULL;
  164. if (empty == num_cpus)
  165. goto out;
  166. reorder = &next_queue->reorder;
  167. if (!list_empty(&reorder->list)) {
  168. padata = list_entry(reorder->list.next,
  169. struct padata_priv, list);
  170. if (unlikely(next_overrun)) {
  171. for_each_cpu(cpu, pd->cpumask) {
  172. queue = per_cpu_ptr(pd->queue, cpu);
  173. atomic_set(&queue->num_obj, 0);
  174. }
  175. }
  176. spin_lock(&reorder->lock);
  177. list_del_init(&padata->list);
  178. atomic_dec(&pd->reorder_objects);
  179. spin_unlock(&reorder->lock);
  180. atomic_inc(&next_queue->num_obj);
  181. goto out;
  182. }
  183. if (next_nr % num_cpus == next_queue->cpu_index) {
  184. padata = ERR_PTR(-ENODATA);
  185. goto out;
  186. }
  187. padata = ERR_PTR(-EINPROGRESS);
  188. out:
  189. return padata;
  190. }
  191. static void padata_reorder(struct parallel_data *pd)
  192. {
  193. struct padata_priv *padata;
  194. struct padata_queue *queue;
  195. struct padata_instance *pinst = pd->pinst;
  196. try_again:
  197. if (!spin_trylock_bh(&pd->lock))
  198. goto out;
  199. while (1) {
  200. padata = padata_get_next(pd);
  201. if (!padata || PTR_ERR(padata) == -EINPROGRESS)
  202. break;
  203. if (PTR_ERR(padata) == -ENODATA) {
  204. spin_unlock_bh(&pd->lock);
  205. goto out;
  206. }
  207. queue = per_cpu_ptr(pd->queue, padata->cb_cpu);
  208. spin_lock(&queue->serial.lock);
  209. list_add_tail(&padata->list, &queue->serial.list);
  210. spin_unlock(&queue->serial.lock);
  211. queue_work_on(padata->cb_cpu, pinst->wq, &queue->swork);
  212. }
  213. spin_unlock_bh(&pd->lock);
  214. if (atomic_read(&pd->reorder_objects))
  215. goto try_again;
  216. out:
  217. return;
  218. }
  219. static void padata_serial_worker(struct work_struct *work)
  220. {
  221. struct padata_queue *queue;
  222. struct parallel_data *pd;
  223. LIST_HEAD(local_list);
  224. local_bh_disable();
  225. queue = container_of(work, struct padata_queue, swork);
  226. pd = queue->pd;
  227. spin_lock(&queue->serial.lock);
  228. list_replace_init(&queue->serial.list, &local_list);
  229. spin_unlock(&queue->serial.lock);
  230. while (!list_empty(&local_list)) {
  231. struct padata_priv *padata;
  232. padata = list_entry(local_list.next,
  233. struct padata_priv, list);
  234. list_del_init(&padata->list);
  235. padata->serial(padata);
  236. atomic_dec(&pd->refcnt);
  237. }
  238. local_bh_enable();
  239. }
  240. /*
  241. * padata_do_serial - padata serialization function
  242. *
  243. * @padata: object to be serialized.
  244. *
  245. * padata_do_serial must be called for every parallelized object.
  246. * The serialization callback function will run with BHs off.
  247. */
  248. void padata_do_serial(struct padata_priv *padata)
  249. {
  250. int cpu;
  251. struct padata_queue *queue;
  252. struct parallel_data *pd;
  253. pd = padata->pd;
  254. cpu = get_cpu();
  255. queue = per_cpu_ptr(pd->queue, cpu);
  256. spin_lock(&queue->reorder.lock);
  257. atomic_inc(&pd->reorder_objects);
  258. list_add_tail(&padata->list, &queue->reorder.list);
  259. spin_unlock(&queue->reorder.lock);
  260. put_cpu();
  261. padata_reorder(pd);
  262. }
  263. EXPORT_SYMBOL(padata_do_serial);
  264. static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
  265. const struct cpumask *cpumask)
  266. {
  267. int cpu, cpu_index, num_cpus;
  268. struct padata_queue *queue;
  269. struct parallel_data *pd;
  270. cpu_index = 0;
  271. pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
  272. if (!pd)
  273. goto err;
  274. pd->queue = alloc_percpu(struct padata_queue);
  275. if (!pd->queue)
  276. goto err_free_pd;
  277. if (!alloc_cpumask_var(&pd->cpumask, GFP_KERNEL))
  278. goto err_free_queue;
  279. for_each_possible_cpu(cpu) {
  280. queue = per_cpu_ptr(pd->queue, cpu);
  281. queue->pd = pd;
  282. if (cpumask_test_cpu(cpu, cpumask)
  283. && cpumask_test_cpu(cpu, cpu_active_mask)) {
  284. queue->cpu_index = cpu_index;
  285. cpu_index++;
  286. } else
  287. queue->cpu_index = -1;
  288. INIT_LIST_HEAD(&queue->reorder.list);
  289. INIT_LIST_HEAD(&queue->parallel.list);
  290. INIT_LIST_HEAD(&queue->serial.list);
  291. spin_lock_init(&queue->reorder.lock);
  292. spin_lock_init(&queue->parallel.lock);
  293. spin_lock_init(&queue->serial.lock);
  294. INIT_WORK(&queue->pwork, padata_parallel_worker);
  295. INIT_WORK(&queue->swork, padata_serial_worker);
  296. atomic_set(&queue->num_obj, 0);
  297. }
  298. cpumask_and(pd->cpumask, cpumask, cpu_active_mask);
  299. num_cpus = cpumask_weight(pd->cpumask);
  300. pd->max_seq_nr = (MAX_SEQ_NR / num_cpus) * num_cpus - 1;
  301. atomic_set(&pd->seq_nr, -1);
  302. atomic_set(&pd->reorder_objects, 0);
  303. atomic_set(&pd->refcnt, 0);
  304. pd->pinst = pinst;
  305. spin_lock_init(&pd->lock);
  306. return pd;
  307. err_free_queue:
  308. free_percpu(pd->queue);
  309. err_free_pd:
  310. kfree(pd);
  311. err:
  312. return NULL;
  313. }
  314. static void padata_free_pd(struct parallel_data *pd)
  315. {
  316. free_cpumask_var(pd->cpumask);
  317. free_percpu(pd->queue);
  318. kfree(pd);
  319. }
  320. static void padata_replace(struct padata_instance *pinst,
  321. struct parallel_data *pd_new)
  322. {
  323. struct parallel_data *pd_old = pinst->pd;
  324. pinst->flags |= PADATA_RESET;
  325. rcu_assign_pointer(pinst->pd, pd_new);
  326. synchronize_rcu();
  327. while (atomic_read(&pd_old->refcnt) != 0)
  328. yield();
  329. flush_workqueue(pinst->wq);
  330. padata_free_pd(pd_old);
  331. pinst->flags &= ~PADATA_RESET;
  332. }
  333. /*
  334. * padata_set_cpumask - set the cpumask that padata should use
  335. *
  336. * @pinst: padata instance
  337. * @cpumask: the cpumask to use
  338. */
  339. int padata_set_cpumask(struct padata_instance *pinst,
  340. cpumask_var_t cpumask)
  341. {
  342. struct parallel_data *pd;
  343. int err = 0;
  344. might_sleep();
  345. mutex_lock(&pinst->lock);
  346. pd = padata_alloc_pd(pinst, cpumask);
  347. if (!pd) {
  348. err = -ENOMEM;
  349. goto out;
  350. }
  351. cpumask_copy(pinst->cpumask, cpumask);
  352. padata_replace(pinst, pd);
  353. out:
  354. mutex_unlock(&pinst->lock);
  355. return err;
  356. }
  357. EXPORT_SYMBOL(padata_set_cpumask);
  358. static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
  359. {
  360. struct parallel_data *pd;
  361. if (cpumask_test_cpu(cpu, cpu_active_mask)) {
  362. pd = padata_alloc_pd(pinst, pinst->cpumask);
  363. if (!pd)
  364. return -ENOMEM;
  365. padata_replace(pinst, pd);
  366. }
  367. return 0;
  368. }
  369. /*
  370. * padata_add_cpu - add a cpu to the padata cpumask
  371. *
  372. * @pinst: padata instance
  373. * @cpu: cpu to add
  374. */
  375. int padata_add_cpu(struct padata_instance *pinst, int cpu)
  376. {
  377. int err;
  378. might_sleep();
  379. mutex_lock(&pinst->lock);
  380. cpumask_set_cpu(cpu, pinst->cpumask);
  381. err = __padata_add_cpu(pinst, cpu);
  382. mutex_unlock(&pinst->lock);
  383. return err;
  384. }
  385. EXPORT_SYMBOL(padata_add_cpu);
  386. static int __padata_remove_cpu(struct padata_instance *pinst, int cpu)
  387. {
  388. struct parallel_data *pd;
  389. if (cpumask_test_cpu(cpu, cpu_online_mask)) {
  390. pd = padata_alloc_pd(pinst, pinst->cpumask);
  391. if (!pd)
  392. return -ENOMEM;
  393. padata_replace(pinst, pd);
  394. }
  395. return 0;
  396. }
  397. /*
  398. * padata_remove_cpu - remove a cpu from the padata cpumask
  399. *
  400. * @pinst: padata instance
  401. * @cpu: cpu to remove
  402. */
  403. int padata_remove_cpu(struct padata_instance *pinst, int cpu)
  404. {
  405. int err;
  406. might_sleep();
  407. mutex_lock(&pinst->lock);
  408. cpumask_clear_cpu(cpu, pinst->cpumask);
  409. err = __padata_remove_cpu(pinst, cpu);
  410. mutex_unlock(&pinst->lock);
  411. return err;
  412. }
  413. EXPORT_SYMBOL(padata_remove_cpu);
  414. /*
  415. * padata_start - start the parallel processing
  416. *
  417. * @pinst: padata instance to start
  418. */
  419. void padata_start(struct padata_instance *pinst)
  420. {
  421. might_sleep();
  422. mutex_lock(&pinst->lock);
  423. pinst->flags |= PADATA_INIT;
  424. mutex_unlock(&pinst->lock);
  425. }
  426. EXPORT_SYMBOL(padata_start);
  427. /*
  428. * padata_stop - stop the parallel processing
  429. *
  430. * @pinst: padata instance to stop
  431. */
  432. void padata_stop(struct padata_instance *pinst)
  433. {
  434. might_sleep();
  435. mutex_lock(&pinst->lock);
  436. pinst->flags &= ~PADATA_INIT;
  437. mutex_unlock(&pinst->lock);
  438. }
  439. EXPORT_SYMBOL(padata_stop);
  440. static int __cpuinit padata_cpu_callback(struct notifier_block *nfb,
  441. unsigned long action, void *hcpu)
  442. {
  443. int err;
  444. struct padata_instance *pinst;
  445. int cpu = (unsigned long)hcpu;
  446. pinst = container_of(nfb, struct padata_instance, cpu_notifier);
  447. switch (action) {
  448. case CPU_ONLINE:
  449. case CPU_ONLINE_FROZEN:
  450. if (!cpumask_test_cpu(cpu, pinst->cpumask))
  451. break;
  452. mutex_lock(&pinst->lock);
  453. err = __padata_add_cpu(pinst, cpu);
  454. mutex_unlock(&pinst->lock);
  455. if (err)
  456. return NOTIFY_BAD;
  457. break;
  458. case CPU_DOWN_PREPARE:
  459. case CPU_DOWN_PREPARE_FROZEN:
  460. if (!cpumask_test_cpu(cpu, pinst->cpumask))
  461. break;
  462. mutex_lock(&pinst->lock);
  463. err = __padata_remove_cpu(pinst, cpu);
  464. mutex_unlock(&pinst->lock);
  465. if (err)
  466. return NOTIFY_BAD;
  467. break;
  468. case CPU_UP_CANCELED:
  469. case CPU_UP_CANCELED_FROZEN:
  470. if (!cpumask_test_cpu(cpu, pinst->cpumask))
  471. break;
  472. mutex_lock(&pinst->lock);
  473. __padata_remove_cpu(pinst, cpu);
  474. mutex_unlock(&pinst->lock);
  475. case CPU_DOWN_FAILED:
  476. case CPU_DOWN_FAILED_FROZEN:
  477. if (!cpumask_test_cpu(cpu, pinst->cpumask))
  478. break;
  479. mutex_lock(&pinst->lock);
  480. __padata_add_cpu(pinst, cpu);
  481. mutex_unlock(&pinst->lock);
  482. }
  483. return NOTIFY_OK;
  484. }
  485. /*
  486. * padata_alloc - allocate and initialize a padata instance
  487. *
  488. * @cpumask: cpumask that padata uses for parallelization
  489. * @wq: workqueue to use for the allocated padata instance
  490. */
  491. struct padata_instance *padata_alloc(const struct cpumask *cpumask,
  492. struct workqueue_struct *wq)
  493. {
  494. int err;
  495. struct padata_instance *pinst;
  496. struct parallel_data *pd;
  497. pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL);
  498. if (!pinst)
  499. goto err;
  500. pd = padata_alloc_pd(pinst, cpumask);
  501. if (!pd)
  502. goto err_free_inst;
  503. if (!alloc_cpumask_var(&pinst->cpumask, GFP_KERNEL))
  504. goto err_free_pd;
  505. rcu_assign_pointer(pinst->pd, pd);
  506. pinst->wq = wq;
  507. cpumask_copy(pinst->cpumask, cpumask);
  508. pinst->flags = 0;
  509. pinst->cpu_notifier.notifier_call = padata_cpu_callback;
  510. pinst->cpu_notifier.priority = 0;
  511. err = register_hotcpu_notifier(&pinst->cpu_notifier);
  512. if (err)
  513. goto err_free_cpumask;
  514. mutex_init(&pinst->lock);
  515. return pinst;
  516. err_free_cpumask:
  517. free_cpumask_var(pinst->cpumask);
  518. err_free_pd:
  519. padata_free_pd(pd);
  520. err_free_inst:
  521. kfree(pinst);
  522. err:
  523. return NULL;
  524. }
  525. EXPORT_SYMBOL(padata_alloc);
  526. /*
  527. * padata_free - free a padata instance
  528. *
  529. * @ padata_inst: padata instance to free
  530. */
  531. void padata_free(struct padata_instance *pinst)
  532. {
  533. padata_stop(pinst);
  534. synchronize_rcu();
  535. while (atomic_read(&pinst->pd->refcnt) != 0)
  536. yield();
  537. unregister_hotcpu_notifier(&pinst->cpu_notifier);
  538. padata_free_pd(pinst->pd);
  539. free_cpumask_var(pinst->cpumask);
  540. kfree(pinst);
  541. }
  542. EXPORT_SYMBOL(padata_free);