kcopyd.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687
  1. /*
  2. * Copyright (C) 2002 Sistina Software (UK) Limited.
  3. *
  4. * This file is released under the GPL.
  5. *
  6. * Kcopyd provides a simple interface for copying an area of one
  7. * block-device to one or more other block-devices, with an asynchronous
  8. * completion notification.
  9. */
  10. #include <asm/atomic.h>
  11. #include <linux/blkdev.h>
  12. #include <linux/config.h>
  13. #include <linux/fs.h>
  14. #include <linux/init.h>
  15. #include <linux/list.h>
  16. #include <linux/mempool.h>
  17. #include <linux/module.h>
  18. #include <linux/pagemap.h>
  19. #include <linux/slab.h>
  20. #include <linux/vmalloc.h>
  21. #include <linux/workqueue.h>
  22. #include "kcopyd.h"
  23. static struct workqueue_struct *_kcopyd_wq;
  24. static struct work_struct _kcopyd_work;
  25. static inline void wake(void)
  26. {
  27. queue_work(_kcopyd_wq, &_kcopyd_work);
  28. }
  29. /*-----------------------------------------------------------------
  30. * Each kcopyd client has its own little pool of preallocated
  31. * pages for kcopyd io.
  32. *---------------------------------------------------------------*/
  33. struct kcopyd_client {
  34. struct list_head list;
  35. spinlock_t lock;
  36. struct page_list *pages;
  37. unsigned int nr_pages;
  38. unsigned int nr_free_pages;
  39. };
  40. static struct page_list *alloc_pl(void)
  41. {
  42. struct page_list *pl;
  43. pl = kmalloc(sizeof(*pl), GFP_KERNEL);
  44. if (!pl)
  45. return NULL;
  46. pl->page = alloc_page(GFP_KERNEL);
  47. if (!pl->page) {
  48. kfree(pl);
  49. return NULL;
  50. }
  51. return pl;
  52. }
  53. static void free_pl(struct page_list *pl)
  54. {
  55. __free_page(pl->page);
  56. kfree(pl);
  57. }
  58. static int kcopyd_get_pages(struct kcopyd_client *kc,
  59. unsigned int nr, struct page_list **pages)
  60. {
  61. struct page_list *pl;
  62. spin_lock(&kc->lock);
  63. if (kc->nr_free_pages < nr) {
  64. spin_unlock(&kc->lock);
  65. return -ENOMEM;
  66. }
  67. kc->nr_free_pages -= nr;
  68. for (*pages = pl = kc->pages; --nr; pl = pl->next)
  69. ;
  70. kc->pages = pl->next;
  71. pl->next = NULL;
  72. spin_unlock(&kc->lock);
  73. return 0;
  74. }
  75. static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl)
  76. {
  77. struct page_list *cursor;
  78. spin_lock(&kc->lock);
  79. for (cursor = pl; cursor->next; cursor = cursor->next)
  80. kc->nr_free_pages++;
  81. kc->nr_free_pages++;
  82. cursor->next = kc->pages;
  83. kc->pages = pl;
  84. spin_unlock(&kc->lock);
  85. }
  86. /*
  87. * These three functions resize the page pool.
  88. */
  89. static void drop_pages(struct page_list *pl)
  90. {
  91. struct page_list *next;
  92. while (pl) {
  93. next = pl->next;
  94. free_pl(pl);
  95. pl = next;
  96. }
  97. }
  98. static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr)
  99. {
  100. unsigned int i;
  101. struct page_list *pl = NULL, *next;
  102. for (i = 0; i < nr; i++) {
  103. next = alloc_pl();
  104. if (!next) {
  105. if (pl)
  106. drop_pages(pl);
  107. return -ENOMEM;
  108. }
  109. next->next = pl;
  110. pl = next;
  111. }
  112. kcopyd_put_pages(kc, pl);
  113. kc->nr_pages += nr;
  114. return 0;
  115. }
  116. static void client_free_pages(struct kcopyd_client *kc)
  117. {
  118. BUG_ON(kc->nr_free_pages != kc->nr_pages);
  119. drop_pages(kc->pages);
  120. kc->pages = NULL;
  121. kc->nr_free_pages = kc->nr_pages = 0;
  122. }
  123. /*-----------------------------------------------------------------
  124. * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
  125. * for this reason we use a mempool to prevent the client from
  126. * ever having to do io (which could cause a deadlock).
  127. *---------------------------------------------------------------*/
  128. struct kcopyd_job {
  129. struct kcopyd_client *kc;
  130. struct list_head list;
  131. unsigned long flags;
  132. /*
  133. * Error state of the job.
  134. */
  135. int read_err;
  136. unsigned int write_err;
  137. /*
  138. * Either READ or WRITE
  139. */
  140. int rw;
  141. struct io_region source;
  142. /*
  143. * The destinations for the transfer.
  144. */
  145. unsigned int num_dests;
  146. struct io_region dests[KCOPYD_MAX_REGIONS];
  147. sector_t offset;
  148. unsigned int nr_pages;
  149. struct page_list *pages;
  150. /*
  151. * Set this to ensure you are notified when the job has
  152. * completed. 'context' is for callback to use.
  153. */
  154. kcopyd_notify_fn fn;
  155. void *context;
  156. /*
  157. * These fields are only used if the job has been split
  158. * into more manageable parts.
  159. */
  160. struct semaphore lock;
  161. atomic_t sub_jobs;
  162. sector_t progress;
  163. };
  164. /* FIXME: this should scale with the number of pages */
  165. #define MIN_JOBS 512
  166. static kmem_cache_t *_job_cache;
  167. static mempool_t *_job_pool;
  168. /*
  169. * We maintain three lists of jobs:
  170. *
  171. * i) jobs waiting for pages
  172. * ii) jobs that have pages, and are waiting for the io to be issued.
  173. * iii) jobs that have completed.
  174. *
  175. * All three of these are protected by job_lock.
  176. */
  177. static DEFINE_SPINLOCK(_job_lock);
  178. static LIST_HEAD(_complete_jobs);
  179. static LIST_HEAD(_io_jobs);
  180. static LIST_HEAD(_pages_jobs);
  181. static int jobs_init(void)
  182. {
  183. _job_cache = kmem_cache_create("kcopyd-jobs",
  184. sizeof(struct kcopyd_job),
  185. __alignof__(struct kcopyd_job),
  186. 0, NULL, NULL);
  187. if (!_job_cache)
  188. return -ENOMEM;
  189. _job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab,
  190. mempool_free_slab, _job_cache);
  191. if (!_job_pool) {
  192. kmem_cache_destroy(_job_cache);
  193. return -ENOMEM;
  194. }
  195. return 0;
  196. }
  197. static void jobs_exit(void)
  198. {
  199. BUG_ON(!list_empty(&_complete_jobs));
  200. BUG_ON(!list_empty(&_io_jobs));
  201. BUG_ON(!list_empty(&_pages_jobs));
  202. mempool_destroy(_job_pool);
  203. kmem_cache_destroy(_job_cache);
  204. _job_pool = NULL;
  205. _job_cache = NULL;
  206. }
  207. /*
  208. * Functions to push and pop a job onto the head of a given job
  209. * list.
  210. */
  211. static inline struct kcopyd_job *pop(struct list_head *jobs)
  212. {
  213. struct kcopyd_job *job = NULL;
  214. unsigned long flags;
  215. spin_lock_irqsave(&_job_lock, flags);
  216. if (!list_empty(jobs)) {
  217. job = list_entry(jobs->next, struct kcopyd_job, list);
  218. list_del(&job->list);
  219. }
  220. spin_unlock_irqrestore(&_job_lock, flags);
  221. return job;
  222. }
  223. static inline void push(struct list_head *jobs, struct kcopyd_job *job)
  224. {
  225. unsigned long flags;
  226. spin_lock_irqsave(&_job_lock, flags);
  227. list_add_tail(&job->list, jobs);
  228. spin_unlock_irqrestore(&_job_lock, flags);
  229. }
  230. /*
  231. * These three functions process 1 item from the corresponding
  232. * job list.
  233. *
  234. * They return:
  235. * < 0: error
  236. * 0: success
  237. * > 0: can't process yet.
  238. */
  239. static int run_complete_job(struct kcopyd_job *job)
  240. {
  241. void *context = job->context;
  242. int read_err = job->read_err;
  243. unsigned int write_err = job->write_err;
  244. kcopyd_notify_fn fn = job->fn;
  245. kcopyd_put_pages(job->kc, job->pages);
  246. mempool_free(job, _job_pool);
  247. fn(read_err, write_err, context);
  248. return 0;
  249. }
  250. static void complete_io(unsigned long error, void *context)
  251. {
  252. struct kcopyd_job *job = (struct kcopyd_job *) context;
  253. if (error) {
  254. if (job->rw == WRITE)
  255. job->write_err &= error;
  256. else
  257. job->read_err = 1;
  258. if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
  259. push(&_complete_jobs, job);
  260. wake();
  261. return;
  262. }
  263. }
  264. if (job->rw == WRITE)
  265. push(&_complete_jobs, job);
  266. else {
  267. job->rw = WRITE;
  268. push(&_io_jobs, job);
  269. }
  270. wake();
  271. }
  272. /*
  273. * Request io on as many buffer heads as we can currently get for
  274. * a particular job.
  275. */
  276. static int run_io_job(struct kcopyd_job *job)
  277. {
  278. int r;
  279. if (job->rw == READ)
  280. r = dm_io_async(1, &job->source, job->rw,
  281. job->pages,
  282. job->offset, complete_io, job);
  283. else
  284. r = dm_io_async(job->num_dests, job->dests, job->rw,
  285. job->pages,
  286. job->offset, complete_io, job);
  287. return r;
  288. }
  289. static int run_pages_job(struct kcopyd_job *job)
  290. {
  291. int r;
  292. job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
  293. PAGE_SIZE >> 9);
  294. r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
  295. if (!r) {
  296. /* this job is ready for io */
  297. push(&_io_jobs, job);
  298. return 0;
  299. }
  300. if (r == -ENOMEM)
  301. /* can't complete now */
  302. return 1;
  303. return r;
  304. }
  305. /*
  306. * Run through a list for as long as possible. Returns the count
  307. * of successful jobs.
  308. */
  309. static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
  310. {
  311. struct kcopyd_job *job;
  312. int r, count = 0;
  313. while ((job = pop(jobs))) {
  314. r = fn(job);
  315. if (r < 0) {
  316. /* error this rogue job */
  317. if (job->rw == WRITE)
  318. job->write_err = (unsigned int) -1;
  319. else
  320. job->read_err = 1;
  321. push(&_complete_jobs, job);
  322. break;
  323. }
  324. if (r > 0) {
  325. /*
  326. * We couldn't service this job ATM, so
  327. * push this job back onto the list.
  328. */
  329. push(jobs, job);
  330. break;
  331. }
  332. count++;
  333. }
  334. return count;
  335. }
  336. /*
  337. * kcopyd does this every time it's woken up.
  338. */
  339. static void do_work(void *ignored)
  340. {
  341. /*
  342. * The order that these are called is *very* important.
  343. * complete jobs can free some pages for pages jobs.
  344. * Pages jobs when successful will jump onto the io jobs
  345. * list. io jobs call wake when they complete and it all
  346. * starts again.
  347. */
  348. process_jobs(&_complete_jobs, run_complete_job);
  349. process_jobs(&_pages_jobs, run_pages_job);
  350. process_jobs(&_io_jobs, run_io_job);
  351. }
  352. /*
  353. * If we are copying a small region we just dispatch a single job
  354. * to do the copy, otherwise the io has to be split up into many
  355. * jobs.
  356. */
  357. static void dispatch_job(struct kcopyd_job *job)
  358. {
  359. push(&_pages_jobs, job);
  360. wake();
  361. }
  362. #define SUB_JOB_SIZE 128
  363. static void segment_complete(int read_err,
  364. unsigned int write_err, void *context)
  365. {
  366. /* FIXME: tidy this function */
  367. sector_t progress = 0;
  368. sector_t count = 0;
  369. struct kcopyd_job *job = (struct kcopyd_job *) context;
  370. down(&job->lock);
  371. /* update the error */
  372. if (read_err)
  373. job->read_err = 1;
  374. if (write_err)
  375. job->write_err &= write_err;
  376. /*
  377. * Only dispatch more work if there hasn't been an error.
  378. */
  379. if ((!job->read_err && !job->write_err) ||
  380. test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
  381. /* get the next chunk of work */
  382. progress = job->progress;
  383. count = job->source.count - progress;
  384. if (count) {
  385. if (count > SUB_JOB_SIZE)
  386. count = SUB_JOB_SIZE;
  387. job->progress += count;
  388. }
  389. }
  390. up(&job->lock);
  391. if (count) {
  392. int i;
  393. struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO);
  394. *sub_job = *job;
  395. sub_job->source.sector += progress;
  396. sub_job->source.count = count;
  397. for (i = 0; i < job->num_dests; i++) {
  398. sub_job->dests[i].sector += progress;
  399. sub_job->dests[i].count = count;
  400. }
  401. sub_job->fn = segment_complete;
  402. sub_job->context = job;
  403. dispatch_job(sub_job);
  404. } else if (atomic_dec_and_test(&job->sub_jobs)) {
  405. /*
  406. * To avoid a race we must keep the job around
  407. * until after the notify function has completed.
  408. * Otherwise the client may try and stop the job
  409. * after we've completed.
  410. */
  411. job->fn(read_err, write_err, job->context);
  412. mempool_free(job, _job_pool);
  413. }
  414. }
  415. /*
  416. * Create some little jobs that will do the move between
  417. * them.
  418. */
  419. #define SPLIT_COUNT 8
  420. static void split_job(struct kcopyd_job *job)
  421. {
  422. int i;
  423. atomic_set(&job->sub_jobs, SPLIT_COUNT);
  424. for (i = 0; i < SPLIT_COUNT; i++)
  425. segment_complete(0, 0u, job);
  426. }
  427. int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from,
  428. unsigned int num_dests, struct io_region *dests,
  429. unsigned int flags, kcopyd_notify_fn fn, void *context)
  430. {
  431. struct kcopyd_job *job;
  432. /*
  433. * Allocate a new job.
  434. */
  435. job = mempool_alloc(_job_pool, GFP_NOIO);
  436. /*
  437. * set up for the read.
  438. */
  439. job->kc = kc;
  440. job->flags = flags;
  441. job->read_err = 0;
  442. job->write_err = 0;
  443. job->rw = READ;
  444. job->source = *from;
  445. job->num_dests = num_dests;
  446. memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
  447. job->offset = 0;
  448. job->nr_pages = 0;
  449. job->pages = NULL;
  450. job->fn = fn;
  451. job->context = context;
  452. if (job->source.count < SUB_JOB_SIZE)
  453. dispatch_job(job);
  454. else {
  455. init_MUTEX(&job->lock);
  456. job->progress = 0;
  457. split_job(job);
  458. }
  459. return 0;
  460. }
  461. /*
  462. * Cancels a kcopyd job, eg. someone might be deactivating a
  463. * mirror.
  464. */
  465. int kcopyd_cancel(struct kcopyd_job *job, int block)
  466. {
  467. /* FIXME: finish */
  468. return -1;
  469. }
  470. /*-----------------------------------------------------------------
  471. * Unit setup
  472. *---------------------------------------------------------------*/
  473. static DECLARE_MUTEX(_client_lock);
  474. static LIST_HEAD(_clients);
  475. static void client_add(struct kcopyd_client *kc)
  476. {
  477. down(&_client_lock);
  478. list_add(&kc->list, &_clients);
  479. up(&_client_lock);
  480. }
  481. static void client_del(struct kcopyd_client *kc)
  482. {
  483. down(&_client_lock);
  484. list_del(&kc->list);
  485. up(&_client_lock);
  486. }
  487. static DECLARE_MUTEX(kcopyd_init_lock);
  488. static int kcopyd_clients = 0;
  489. static int kcopyd_init(void)
  490. {
  491. int r;
  492. down(&kcopyd_init_lock);
  493. if (kcopyd_clients) {
  494. /* Already initialized. */
  495. kcopyd_clients++;
  496. up(&kcopyd_init_lock);
  497. return 0;
  498. }
  499. r = jobs_init();
  500. if (r) {
  501. up(&kcopyd_init_lock);
  502. return r;
  503. }
  504. _kcopyd_wq = create_singlethread_workqueue("kcopyd");
  505. if (!_kcopyd_wq) {
  506. jobs_exit();
  507. up(&kcopyd_init_lock);
  508. return -ENOMEM;
  509. }
  510. kcopyd_clients++;
  511. INIT_WORK(&_kcopyd_work, do_work, NULL);
  512. up(&kcopyd_init_lock);
  513. return 0;
  514. }
  515. static void kcopyd_exit(void)
  516. {
  517. down(&kcopyd_init_lock);
  518. kcopyd_clients--;
  519. if (!kcopyd_clients) {
  520. jobs_exit();
  521. destroy_workqueue(_kcopyd_wq);
  522. _kcopyd_wq = NULL;
  523. }
  524. up(&kcopyd_init_lock);
  525. }
  526. int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result)
  527. {
  528. int r = 0;
  529. struct kcopyd_client *kc;
  530. r = kcopyd_init();
  531. if (r)
  532. return r;
  533. kc = kmalloc(sizeof(*kc), GFP_KERNEL);
  534. if (!kc) {
  535. kcopyd_exit();
  536. return -ENOMEM;
  537. }
  538. spin_lock_init(&kc->lock);
  539. kc->pages = NULL;
  540. kc->nr_pages = kc->nr_free_pages = 0;
  541. r = client_alloc_pages(kc, nr_pages);
  542. if (r) {
  543. kfree(kc);
  544. kcopyd_exit();
  545. return r;
  546. }
  547. r = dm_io_get(nr_pages);
  548. if (r) {
  549. client_free_pages(kc);
  550. kfree(kc);
  551. kcopyd_exit();
  552. return r;
  553. }
  554. client_add(kc);
  555. *result = kc;
  556. return 0;
  557. }
  558. void kcopyd_client_destroy(struct kcopyd_client *kc)
  559. {
  560. dm_io_put(kc->nr_pages);
  561. client_free_pages(kc);
  562. client_del(kc);
  563. kfree(kc);
  564. kcopyd_exit();
  565. }
  566. EXPORT_SYMBOL(kcopyd_client_create);
  567. EXPORT_SYMBOL(kcopyd_client_destroy);
  568. EXPORT_SYMBOL(kcopyd_copy);
  569. EXPORT_SYMBOL(kcopyd_cancel);