sched.c 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129
  1. /*
  2. * linux/net/sunrpc/sched.c
  3. *
  4. * Scheduling for synchronous and asynchronous RPC requests.
  5. *
  6. * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
  7. *
  8. * TCP NFS related read + write fixes
  9. * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10. */
  11. #include <linux/module.h>
  12. #include <linux/sched.h>
  13. #include <linux/interrupt.h>
  14. #include <linux/slab.h>
  15. #include <linux/mempool.h>
  16. #include <linux/smp.h>
  17. #include <linux/smp_lock.h>
  18. #include <linux/spinlock.h>
  19. #include <linux/mutex.h>
  20. #include <linux/sunrpc/clnt.h>
  21. #ifdef RPC_DEBUG
  22. #define RPCDBG_FACILITY RPCDBG_SCHED
  23. #define RPC_TASK_MAGIC_ID 0xf00baa
  24. static int rpc_task_id;
  25. #endif
  26. /*
  27. * RPC slabs and memory pools
  28. */
  29. #define RPC_BUFFER_MAXSIZE (2048)
  30. #define RPC_BUFFER_POOLSIZE (8)
  31. #define RPC_TASK_POOLSIZE (8)
  32. static struct kmem_cache *rpc_task_slabp __read_mostly;
  33. static struct kmem_cache *rpc_buffer_slabp __read_mostly;
  34. static mempool_t *rpc_task_mempool __read_mostly;
  35. static mempool_t *rpc_buffer_mempool __read_mostly;
  36. static void __rpc_default_timer(struct rpc_task *task);
  37. static void rpciod_killall(void);
  38. static void rpc_async_schedule(struct work_struct *);
  39. static void rpc_release_task(struct rpc_task *task);
  40. /*
  41. * RPC tasks sit here while waiting for conditions to improve.
  42. */
  43. static RPC_WAITQ(delay_queue, "delayq");
  44. /*
  45. * All RPC tasks are linked into this list
  46. */
  47. static LIST_HEAD(all_tasks);
  48. /*
  49. * rpciod-related stuff
  50. */
  51. static DEFINE_MUTEX(rpciod_mutex);
  52. static unsigned int rpciod_users;
  53. struct workqueue_struct *rpciod_workqueue;
  54. /*
  55. * Spinlock for other critical sections of code.
  56. */
  57. static DEFINE_SPINLOCK(rpc_sched_lock);
  58. /*
  59. * Disable the timer for a given RPC task. Should be called with
  60. * queue->lock and bh_disabled in order to avoid races within
  61. * rpc_run_timer().
  62. */
  63. static inline void
  64. __rpc_disable_timer(struct rpc_task *task)
  65. {
  66. dprintk("RPC: %4d disabling timer\n", task->tk_pid);
  67. task->tk_timeout_fn = NULL;
  68. task->tk_timeout = 0;
  69. }
  70. /*
  71. * Run a timeout function.
  72. * We use the callback in order to allow __rpc_wake_up_task()
  73. * and friends to disable the timer synchronously on SMP systems
  74. * without calling del_timer_sync(). The latter could cause a
  75. * deadlock if called while we're holding spinlocks...
  76. */
  77. static void rpc_run_timer(struct rpc_task *task)
  78. {
  79. void (*callback)(struct rpc_task *);
  80. callback = task->tk_timeout_fn;
  81. task->tk_timeout_fn = NULL;
  82. if (callback && RPC_IS_QUEUED(task)) {
  83. dprintk("RPC: %4d running timer\n", task->tk_pid);
  84. callback(task);
  85. }
  86. smp_mb__before_clear_bit();
  87. clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
  88. smp_mb__after_clear_bit();
  89. }
  90. /*
  91. * Set up a timer for the current task.
  92. */
  93. static inline void
  94. __rpc_add_timer(struct rpc_task *task, rpc_action timer)
  95. {
  96. if (!task->tk_timeout)
  97. return;
  98. dprintk("RPC: %4d setting alarm for %lu ms\n",
  99. task->tk_pid, task->tk_timeout * 1000 / HZ);
  100. if (timer)
  101. task->tk_timeout_fn = timer;
  102. else
  103. task->tk_timeout_fn = __rpc_default_timer;
  104. set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
  105. mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
  106. }
  107. /*
  108. * Delete any timer for the current task. Because we use del_timer_sync(),
  109. * this function should never be called while holding queue->lock.
  110. */
  111. static void
  112. rpc_delete_timer(struct rpc_task *task)
  113. {
  114. if (RPC_IS_QUEUED(task))
  115. return;
  116. if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
  117. del_singleshot_timer_sync(&task->tk_timer);
  118. dprintk("RPC: %4d deleting timer\n", task->tk_pid);
  119. }
  120. }
  121. /*
  122. * Add new request to a priority queue.
  123. */
  124. static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
  125. {
  126. struct list_head *q;
  127. struct rpc_task *t;
  128. INIT_LIST_HEAD(&task->u.tk_wait.links);
  129. q = &queue->tasks[task->tk_priority];
  130. if (unlikely(task->tk_priority > queue->maxpriority))
  131. q = &queue->tasks[queue->maxpriority];
  132. list_for_each_entry(t, q, u.tk_wait.list) {
  133. if (t->tk_cookie == task->tk_cookie) {
  134. list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
  135. return;
  136. }
  137. }
  138. list_add_tail(&task->u.tk_wait.list, q);
  139. }
  140. /*
  141. * Add new request to wait queue.
  142. *
  143. * Swapper tasks always get inserted at the head of the queue.
  144. * This should avoid many nasty memory deadlocks and hopefully
  145. * improve overall performance.
  146. * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  147. */
  148. static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
  149. {
  150. BUG_ON (RPC_IS_QUEUED(task));
  151. if (RPC_IS_PRIORITY(queue))
  152. __rpc_add_wait_queue_priority(queue, task);
  153. else if (RPC_IS_SWAPPER(task))
  154. list_add(&task->u.tk_wait.list, &queue->tasks[0]);
  155. else
  156. list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
  157. task->u.tk_wait.rpc_waitq = queue;
  158. queue->qlen++;
  159. rpc_set_queued(task);
  160. dprintk("RPC: %4d added to queue %p \"%s\"\n",
  161. task->tk_pid, queue, rpc_qname(queue));
  162. }
  163. /*
  164. * Remove request from a priority queue.
  165. */
  166. static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
  167. {
  168. struct rpc_task *t;
  169. if (!list_empty(&task->u.tk_wait.links)) {
  170. t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
  171. list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
  172. list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
  173. }
  174. list_del(&task->u.tk_wait.list);
  175. }
  176. /*
  177. * Remove request from queue.
  178. * Note: must be called with spin lock held.
  179. */
  180. static void __rpc_remove_wait_queue(struct rpc_task *task)
  181. {
  182. struct rpc_wait_queue *queue;
  183. queue = task->u.tk_wait.rpc_waitq;
  184. if (RPC_IS_PRIORITY(queue))
  185. __rpc_remove_wait_queue_priority(task);
  186. else
  187. list_del(&task->u.tk_wait.list);
  188. queue->qlen--;
  189. dprintk("RPC: %4d removed from queue %p \"%s\"\n",
  190. task->tk_pid, queue, rpc_qname(queue));
  191. }
  192. static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
  193. {
  194. queue->priority = priority;
  195. queue->count = 1 << (priority * 2);
  196. }
  197. static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie)
  198. {
  199. queue->cookie = cookie;
  200. queue->nr = RPC_BATCH_COUNT;
  201. }
  202. static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
  203. {
  204. rpc_set_waitqueue_priority(queue, queue->maxpriority);
  205. rpc_set_waitqueue_cookie(queue, 0);
  206. }
  207. static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio)
  208. {
  209. int i;
  210. spin_lock_init(&queue->lock);
  211. for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
  212. INIT_LIST_HEAD(&queue->tasks[i]);
  213. queue->maxpriority = maxprio;
  214. rpc_reset_waitqueue_priority(queue);
  215. #ifdef RPC_DEBUG
  216. queue->name = qname;
  217. #endif
  218. }
  219. void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
  220. {
  221. __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH);
  222. }
  223. void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
  224. {
  225. __rpc_init_priority_wait_queue(queue, qname, 0);
  226. }
  227. EXPORT_SYMBOL(rpc_init_wait_queue);
  228. static int rpc_wait_bit_interruptible(void *word)
  229. {
  230. if (signal_pending(current))
  231. return -ERESTARTSYS;
  232. schedule();
  233. return 0;
  234. }
  235. static void rpc_set_active(struct rpc_task *task)
  236. {
  237. if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
  238. return;
  239. spin_lock(&rpc_sched_lock);
  240. #ifdef RPC_DEBUG
  241. task->tk_magic = RPC_TASK_MAGIC_ID;
  242. task->tk_pid = rpc_task_id++;
  243. #endif
  244. /* Add to global list of all tasks */
  245. list_add_tail(&task->tk_task, &all_tasks);
  246. spin_unlock(&rpc_sched_lock);
  247. }
  248. /*
  249. * Mark an RPC call as having completed by clearing the 'active' bit
  250. */
  251. static void rpc_mark_complete_task(struct rpc_task *task)
  252. {
  253. smp_mb__before_clear_bit();
  254. clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
  255. smp_mb__after_clear_bit();
  256. wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
  257. }
  258. /*
  259. * Allow callers to wait for completion of an RPC call
  260. */
  261. int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
  262. {
  263. if (action == NULL)
  264. action = rpc_wait_bit_interruptible;
  265. return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
  266. action, TASK_INTERRUPTIBLE);
  267. }
  268. EXPORT_SYMBOL(__rpc_wait_for_completion_task);
  269. /*
  270. * Make an RPC task runnable.
  271. *
  272. * Note: If the task is ASYNC, this must be called with
  273. * the spinlock held to protect the wait queue operation.
  274. */
  275. static void rpc_make_runnable(struct rpc_task *task)
  276. {
  277. BUG_ON(task->tk_timeout_fn);
  278. rpc_clear_queued(task);
  279. if (rpc_test_and_set_running(task))
  280. return;
  281. /* We might have raced */
  282. if (RPC_IS_QUEUED(task)) {
  283. rpc_clear_running(task);
  284. return;
  285. }
  286. if (RPC_IS_ASYNC(task)) {
  287. int status;
  288. INIT_WORK(&task->u.tk_work, rpc_async_schedule);
  289. status = queue_work(task->tk_workqueue, &task->u.tk_work);
  290. if (status < 0) {
  291. printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
  292. task->tk_status = status;
  293. return;
  294. }
  295. } else
  296. wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
  297. }
  298. /*
  299. * Prepare for sleeping on a wait queue.
  300. * By always appending tasks to the list we ensure FIFO behavior.
  301. * NB: An RPC task will only receive interrupt-driven events as long
  302. * as it's on a wait queue.
  303. */
  304. static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
  305. rpc_action action, rpc_action timer)
  306. {
  307. dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
  308. rpc_qname(q), jiffies);
  309. if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
  310. printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
  311. return;
  312. }
  313. __rpc_add_wait_queue(q, task);
  314. BUG_ON(task->tk_callback != NULL);
  315. task->tk_callback = action;
  316. __rpc_add_timer(task, timer);
  317. }
  318. void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
  319. rpc_action action, rpc_action timer)
  320. {
  321. /* Mark the task as being activated if so needed */
  322. rpc_set_active(task);
  323. /*
  324. * Protect the queue operations.
  325. */
  326. spin_lock_bh(&q->lock);
  327. __rpc_sleep_on(q, task, action, timer);
  328. spin_unlock_bh(&q->lock);
  329. }
  330. /**
  331. * __rpc_do_wake_up_task - wake up a single rpc_task
  332. * @task: task to be woken up
  333. *
  334. * Caller must hold queue->lock, and have cleared the task queued flag.
  335. */
  336. static void __rpc_do_wake_up_task(struct rpc_task *task)
  337. {
  338. dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies);
  339. #ifdef RPC_DEBUG
  340. BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
  341. #endif
  342. /* Has the task been executed yet? If not, we cannot wake it up! */
  343. if (!RPC_IS_ACTIVATED(task)) {
  344. printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
  345. return;
  346. }
  347. __rpc_disable_timer(task);
  348. __rpc_remove_wait_queue(task);
  349. rpc_make_runnable(task);
  350. dprintk("RPC: __rpc_wake_up_task done\n");
  351. }
  352. /*
  353. * Wake up the specified task
  354. */
  355. static void __rpc_wake_up_task(struct rpc_task *task)
  356. {
  357. if (rpc_start_wakeup(task)) {
  358. if (RPC_IS_QUEUED(task))
  359. __rpc_do_wake_up_task(task);
  360. rpc_finish_wakeup(task);
  361. }
  362. }
  363. /*
  364. * Default timeout handler if none specified by user
  365. */
  366. static void
  367. __rpc_default_timer(struct rpc_task *task)
  368. {
  369. dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
  370. task->tk_status = -ETIMEDOUT;
  371. rpc_wake_up_task(task);
  372. }
  373. /*
  374. * Wake up the specified task
  375. */
  376. void rpc_wake_up_task(struct rpc_task *task)
  377. {
  378. rcu_read_lock_bh();
  379. if (rpc_start_wakeup(task)) {
  380. if (RPC_IS_QUEUED(task)) {
  381. struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
  382. /* Note: we're already in a bh-safe context */
  383. spin_lock(&queue->lock);
  384. __rpc_do_wake_up_task(task);
  385. spin_unlock(&queue->lock);
  386. }
  387. rpc_finish_wakeup(task);
  388. }
  389. rcu_read_unlock_bh();
  390. }
  391. /*
  392. * Wake up the next task on a priority queue.
  393. */
  394. static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
  395. {
  396. struct list_head *q;
  397. struct rpc_task *task;
  398. /*
  399. * Service a batch of tasks from a single cookie.
  400. */
  401. q = &queue->tasks[queue->priority];
  402. if (!list_empty(q)) {
  403. task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
  404. if (queue->cookie == task->tk_cookie) {
  405. if (--queue->nr)
  406. goto out;
  407. list_move_tail(&task->u.tk_wait.list, q);
  408. }
  409. /*
  410. * Check if we need to switch queues.
  411. */
  412. if (--queue->count)
  413. goto new_cookie;
  414. }
  415. /*
  416. * Service the next queue.
  417. */
  418. do {
  419. if (q == &queue->tasks[0])
  420. q = &queue->tasks[queue->maxpriority];
  421. else
  422. q = q - 1;
  423. if (!list_empty(q)) {
  424. task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
  425. goto new_queue;
  426. }
  427. } while (q != &queue->tasks[queue->priority]);
  428. rpc_reset_waitqueue_priority(queue);
  429. return NULL;
  430. new_queue:
  431. rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
  432. new_cookie:
  433. rpc_set_waitqueue_cookie(queue, task->tk_cookie);
  434. out:
  435. __rpc_wake_up_task(task);
  436. return task;
  437. }
  438. /*
  439. * Wake up the next task on the wait queue.
  440. */
  441. struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
  442. {
  443. struct rpc_task *task = NULL;
  444. dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
  445. rcu_read_lock_bh();
  446. spin_lock(&queue->lock);
  447. if (RPC_IS_PRIORITY(queue))
  448. task = __rpc_wake_up_next_priority(queue);
  449. else {
  450. task_for_first(task, &queue->tasks[0])
  451. __rpc_wake_up_task(task);
  452. }
  453. spin_unlock(&queue->lock);
  454. rcu_read_unlock_bh();
  455. return task;
  456. }
  457. /**
  458. * rpc_wake_up - wake up all rpc_tasks
  459. * @queue: rpc_wait_queue on which the tasks are sleeping
  460. *
  461. * Grabs queue->lock
  462. */
  463. void rpc_wake_up(struct rpc_wait_queue *queue)
  464. {
  465. struct rpc_task *task, *next;
  466. struct list_head *head;
  467. rcu_read_lock_bh();
  468. spin_lock(&queue->lock);
  469. head = &queue->tasks[queue->maxpriority];
  470. for (;;) {
  471. list_for_each_entry_safe(task, next, head, u.tk_wait.list)
  472. __rpc_wake_up_task(task);
  473. if (head == &queue->tasks[0])
  474. break;
  475. head--;
  476. }
  477. spin_unlock(&queue->lock);
  478. rcu_read_unlock_bh();
  479. }
  480. /**
  481. * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
  482. * @queue: rpc_wait_queue on which the tasks are sleeping
  483. * @status: status value to set
  484. *
  485. * Grabs queue->lock
  486. */
  487. void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
  488. {
  489. struct rpc_task *task, *next;
  490. struct list_head *head;
  491. rcu_read_lock_bh();
  492. spin_lock(&queue->lock);
  493. head = &queue->tasks[queue->maxpriority];
  494. for (;;) {
  495. list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
  496. task->tk_status = status;
  497. __rpc_wake_up_task(task);
  498. }
  499. if (head == &queue->tasks[0])
  500. break;
  501. head--;
  502. }
  503. spin_unlock(&queue->lock);
  504. rcu_read_unlock_bh();
  505. }
  506. static void __rpc_atrun(struct rpc_task *task)
  507. {
  508. rpc_wake_up_task(task);
  509. }
  510. /*
  511. * Run a task at a later time
  512. */
  513. void rpc_delay(struct rpc_task *task, unsigned long delay)
  514. {
  515. task->tk_timeout = delay;
  516. rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
  517. }
  518. /*
  519. * Helper to call task->tk_ops->rpc_call_prepare
  520. */
  521. static void rpc_prepare_task(struct rpc_task *task)
  522. {
  523. lock_kernel();
  524. task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
  525. unlock_kernel();
  526. }
  527. /*
  528. * Helper that calls task->tk_ops->rpc_call_done if it exists
  529. */
  530. void rpc_exit_task(struct rpc_task *task)
  531. {
  532. task->tk_action = NULL;
  533. if (task->tk_ops->rpc_call_done != NULL) {
  534. lock_kernel();
  535. task->tk_ops->rpc_call_done(task, task->tk_calldata);
  536. unlock_kernel();
  537. if (task->tk_action != NULL) {
  538. WARN_ON(RPC_ASSASSINATED(task));
  539. /* Always release the RPC slot and buffer memory */
  540. xprt_release(task);
  541. }
  542. }
  543. }
  544. EXPORT_SYMBOL(rpc_exit_task);
  545. void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
  546. {
  547. if (ops->rpc_release != NULL) {
  548. lock_kernel();
  549. ops->rpc_release(calldata);
  550. unlock_kernel();
  551. }
  552. }
  553. /*
  554. * This is the RPC `scheduler' (or rather, the finite state machine).
  555. */
  556. static int __rpc_execute(struct rpc_task *task)
  557. {
  558. int status = 0;
  559. dprintk("RPC: %4d rpc_execute flgs %x\n",
  560. task->tk_pid, task->tk_flags);
  561. BUG_ON(RPC_IS_QUEUED(task));
  562. for (;;) {
  563. /*
  564. * Garbage collection of pending timers...
  565. */
  566. rpc_delete_timer(task);
  567. /*
  568. * Execute any pending callback.
  569. */
  570. if (RPC_DO_CALLBACK(task)) {
  571. /* Define a callback save pointer */
  572. void (*save_callback)(struct rpc_task *);
  573. /*
  574. * If a callback exists, save it, reset it,
  575. * call it.
  576. * The save is needed to stop from resetting
  577. * another callback set within the callback handler
  578. * - Dave
  579. */
  580. save_callback=task->tk_callback;
  581. task->tk_callback=NULL;
  582. save_callback(task);
  583. }
  584. /*
  585. * Perform the next FSM step.
  586. * tk_action may be NULL when the task has been killed
  587. * by someone else.
  588. */
  589. if (!RPC_IS_QUEUED(task)) {
  590. if (task->tk_action == NULL)
  591. break;
  592. task->tk_action(task);
  593. }
  594. /*
  595. * Lockless check for whether task is sleeping or not.
  596. */
  597. if (!RPC_IS_QUEUED(task))
  598. continue;
  599. rpc_clear_running(task);
  600. if (RPC_IS_ASYNC(task)) {
  601. /* Careful! we may have raced... */
  602. if (RPC_IS_QUEUED(task))
  603. return 0;
  604. if (rpc_test_and_set_running(task))
  605. return 0;
  606. continue;
  607. }
  608. /* sync task: sleep here */
  609. dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid);
  610. /* Note: Caller should be using rpc_clnt_sigmask() */
  611. status = out_of_line_wait_on_bit(&task->tk_runstate,
  612. RPC_TASK_QUEUED, rpc_wait_bit_interruptible,
  613. TASK_INTERRUPTIBLE);
  614. if (status == -ERESTARTSYS) {
  615. /*
  616. * When a sync task receives a signal, it exits with
  617. * -ERESTARTSYS. In order to catch any callbacks that
  618. * clean up after sleeping on some queue, we don't
  619. * break the loop here, but go around once more.
  620. */
  621. dprintk("RPC: %4d got signal\n", task->tk_pid);
  622. task->tk_flags |= RPC_TASK_KILLED;
  623. rpc_exit(task, -ERESTARTSYS);
  624. rpc_wake_up_task(task);
  625. }
  626. rpc_set_running(task);
  627. dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
  628. }
  629. dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status);
  630. /* Release all resources associated with the task */
  631. rpc_release_task(task);
  632. return status;
  633. }
  634. /*
  635. * User-visible entry point to the scheduler.
  636. *
  637. * This may be called recursively if e.g. an async NFS task updates
  638. * the attributes and finds that dirty pages must be flushed.
  639. * NOTE: Upon exit of this function the task is guaranteed to be
  640. * released. In particular note that tk_release() will have
  641. * been called, so your task memory may have been freed.
  642. */
  643. int
  644. rpc_execute(struct rpc_task *task)
  645. {
  646. rpc_set_active(task);
  647. rpc_set_running(task);
  648. return __rpc_execute(task);
  649. }
  650. static void rpc_async_schedule(struct work_struct *work)
  651. {
  652. __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
  653. }
  654. /**
  655. * rpc_malloc - allocate an RPC buffer
  656. * @task: RPC task that will use this buffer
  657. * @size: requested byte size
  658. *
  659. * We try to ensure that some NFS reads and writes can always proceed
  660. * by using a mempool when allocating 'small' buffers.
  661. * In order to avoid memory starvation triggering more writebacks of
  662. * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
  663. */
  664. void * rpc_malloc(struct rpc_task *task, size_t size)
  665. {
  666. struct rpc_rqst *req = task->tk_rqstp;
  667. gfp_t gfp;
  668. if (task->tk_flags & RPC_TASK_SWAPPER)
  669. gfp = GFP_ATOMIC;
  670. else
  671. gfp = GFP_NOFS;
  672. if (size > RPC_BUFFER_MAXSIZE) {
  673. req->rq_buffer = kmalloc(size, gfp);
  674. if (req->rq_buffer)
  675. req->rq_bufsize = size;
  676. } else {
  677. req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
  678. if (req->rq_buffer)
  679. req->rq_bufsize = RPC_BUFFER_MAXSIZE;
  680. }
  681. return req->rq_buffer;
  682. }
  683. /**
  684. * rpc_free - free buffer allocated via rpc_malloc
  685. * @task: RPC task with a buffer to be freed
  686. *
  687. */
  688. void rpc_free(struct rpc_task *task)
  689. {
  690. struct rpc_rqst *req = task->tk_rqstp;
  691. if (req->rq_buffer) {
  692. if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
  693. mempool_free(req->rq_buffer, rpc_buffer_mempool);
  694. else
  695. kfree(req->rq_buffer);
  696. req->rq_buffer = NULL;
  697. req->rq_bufsize = 0;
  698. }
  699. }
  700. /*
  701. * Creation and deletion of RPC task structures
  702. */
  703. void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
  704. {
  705. memset(task, 0, sizeof(*task));
  706. init_timer(&task->tk_timer);
  707. task->tk_timer.data = (unsigned long) task;
  708. task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
  709. atomic_set(&task->tk_count, 1);
  710. task->tk_client = clnt;
  711. task->tk_flags = flags;
  712. task->tk_ops = tk_ops;
  713. if (tk_ops->rpc_call_prepare != NULL)
  714. task->tk_action = rpc_prepare_task;
  715. task->tk_calldata = calldata;
  716. /* Initialize retry counters */
  717. task->tk_garb_retry = 2;
  718. task->tk_cred_retry = 2;
  719. task->tk_priority = RPC_PRIORITY_NORMAL;
  720. task->tk_cookie = (unsigned long)current;
  721. /* Initialize workqueue for async tasks */
  722. task->tk_workqueue = rpciod_workqueue;
  723. if (clnt) {
  724. atomic_inc(&clnt->cl_users);
  725. if (clnt->cl_softrtry)
  726. task->tk_flags |= RPC_TASK_SOFT;
  727. if (!clnt->cl_intr)
  728. task->tk_flags |= RPC_TASK_NOINTR;
  729. }
  730. BUG_ON(task->tk_ops == NULL);
  731. /* starting timestamp */
  732. task->tk_start = jiffies;
  733. dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
  734. current->pid);
  735. }
  736. static struct rpc_task *
  737. rpc_alloc_task(void)
  738. {
  739. return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
  740. }
  741. static void rpc_free_task(struct rcu_head *rcu)
  742. {
  743. struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
  744. dprintk("RPC: %4d freeing task\n", task->tk_pid);
  745. mempool_free(task, rpc_task_mempool);
  746. }
  747. /*
  748. * Create a new task for the specified client. We have to
  749. * clean up after an allocation failure, as the client may
  750. * have specified "oneshot".
  751. */
  752. struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
  753. {
  754. struct rpc_task *task;
  755. task = rpc_alloc_task();
  756. if (!task)
  757. goto cleanup;
  758. rpc_init_task(task, clnt, flags, tk_ops, calldata);
  759. dprintk("RPC: %4d allocated task\n", task->tk_pid);
  760. task->tk_flags |= RPC_TASK_DYNAMIC;
  761. out:
  762. return task;
  763. cleanup:
  764. /* Check whether to release the client */
  765. if (clnt) {
  766. printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
  767. atomic_read(&clnt->cl_users), clnt->cl_oneshot);
  768. atomic_inc(&clnt->cl_users); /* pretend we were used ... */
  769. rpc_release_client(clnt);
  770. }
  771. goto out;
  772. }
  773. void rpc_put_task(struct rpc_task *task)
  774. {
  775. const struct rpc_call_ops *tk_ops = task->tk_ops;
  776. void *calldata = task->tk_calldata;
  777. if (!atomic_dec_and_test(&task->tk_count))
  778. return;
  779. /* Release resources */
  780. if (task->tk_rqstp)
  781. xprt_release(task);
  782. if (task->tk_msg.rpc_cred)
  783. rpcauth_unbindcred(task);
  784. if (task->tk_client) {
  785. rpc_release_client(task->tk_client);
  786. task->tk_client = NULL;
  787. }
  788. if (task->tk_flags & RPC_TASK_DYNAMIC)
  789. call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
  790. rpc_release_calldata(tk_ops, calldata);
  791. }
  792. EXPORT_SYMBOL(rpc_put_task);
  793. static void rpc_release_task(struct rpc_task *task)
  794. {
  795. #ifdef RPC_DEBUG
  796. BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
  797. #endif
  798. dprintk("RPC: %4d release task\n", task->tk_pid);
  799. /* Remove from global task list */
  800. spin_lock(&rpc_sched_lock);
  801. list_del(&task->tk_task);
  802. spin_unlock(&rpc_sched_lock);
  803. BUG_ON (RPC_IS_QUEUED(task));
  804. /* Synchronously delete any running timer */
  805. rpc_delete_timer(task);
  806. #ifdef RPC_DEBUG
  807. task->tk_magic = 0;
  808. #endif
  809. /* Wake up anyone who is waiting for task completion */
  810. rpc_mark_complete_task(task);
  811. rpc_put_task(task);
  812. }
  813. /**
  814. * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
  815. * @clnt: pointer to RPC client
  816. * @flags: RPC flags
  817. * @ops: RPC call ops
  818. * @data: user call data
  819. */
  820. struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
  821. const struct rpc_call_ops *ops,
  822. void *data)
  823. {
  824. struct rpc_task *task;
  825. task = rpc_new_task(clnt, flags, ops, data);
  826. if (task == NULL) {
  827. rpc_release_calldata(ops, data);
  828. return ERR_PTR(-ENOMEM);
  829. }
  830. atomic_inc(&task->tk_count);
  831. rpc_execute(task);
  832. return task;
  833. }
  834. EXPORT_SYMBOL(rpc_run_task);
  835. /*
  836. * Kill all tasks for the given client.
  837. * XXX: kill their descendants as well?
  838. */
  839. void rpc_killall_tasks(struct rpc_clnt *clnt)
  840. {
  841. struct rpc_task *rovr;
  842. struct list_head *le;
  843. dprintk("RPC: killing all tasks for client %p\n", clnt);
  844. /*
  845. * Spin lock all_tasks to prevent changes...
  846. */
  847. spin_lock(&rpc_sched_lock);
  848. alltask_for_each(rovr, le, &all_tasks) {
  849. if (! RPC_IS_ACTIVATED(rovr))
  850. continue;
  851. if (!clnt || rovr->tk_client == clnt) {
  852. rovr->tk_flags |= RPC_TASK_KILLED;
  853. rpc_exit(rovr, -EIO);
  854. rpc_wake_up_task(rovr);
  855. }
  856. }
  857. spin_unlock(&rpc_sched_lock);
  858. }
  859. static DECLARE_MUTEX_LOCKED(rpciod_running);
  860. static void rpciod_killall(void)
  861. {
  862. unsigned long flags;
  863. while (!list_empty(&all_tasks)) {
  864. clear_thread_flag(TIF_SIGPENDING);
  865. rpc_killall_tasks(NULL);
  866. flush_workqueue(rpciod_workqueue);
  867. if (!list_empty(&all_tasks)) {
  868. dprintk("rpciod_killall: waiting for tasks to exit\n");
  869. yield();
  870. }
  871. }
  872. spin_lock_irqsave(&current->sighand->siglock, flags);
  873. recalc_sigpending();
  874. spin_unlock_irqrestore(&current->sighand->siglock, flags);
  875. }
  876. /*
  877. * Start up the rpciod process if it's not already running.
  878. */
  879. int
  880. rpciod_up(void)
  881. {
  882. struct workqueue_struct *wq;
  883. int error = 0;
  884. mutex_lock(&rpciod_mutex);
  885. dprintk("rpciod_up: users %d\n", rpciod_users);
  886. rpciod_users++;
  887. if (rpciod_workqueue)
  888. goto out;
  889. /*
  890. * If there's no pid, we should be the first user.
  891. */
  892. if (rpciod_users > 1)
  893. printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users);
  894. /*
  895. * Create the rpciod thread and wait for it to start.
  896. */
  897. error = -ENOMEM;
  898. wq = create_workqueue("rpciod");
  899. if (wq == NULL) {
  900. printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
  901. rpciod_users--;
  902. goto out;
  903. }
  904. rpciod_workqueue = wq;
  905. error = 0;
  906. out:
  907. mutex_unlock(&rpciod_mutex);
  908. return error;
  909. }
  910. void
  911. rpciod_down(void)
  912. {
  913. mutex_lock(&rpciod_mutex);
  914. dprintk("rpciod_down sema %d\n", rpciod_users);
  915. if (rpciod_users) {
  916. if (--rpciod_users)
  917. goto out;
  918. } else
  919. printk(KERN_WARNING "rpciod_down: no users??\n");
  920. if (!rpciod_workqueue) {
  921. dprintk("rpciod_down: Nothing to do!\n");
  922. goto out;
  923. }
  924. rpciod_killall();
  925. destroy_workqueue(rpciod_workqueue);
  926. rpciod_workqueue = NULL;
  927. out:
  928. mutex_unlock(&rpciod_mutex);
  929. }
  930. #ifdef RPC_DEBUG
  931. void rpc_show_tasks(void)
  932. {
  933. struct list_head *le;
  934. struct rpc_task *t;
  935. spin_lock(&rpc_sched_lock);
  936. if (list_empty(&all_tasks)) {
  937. spin_unlock(&rpc_sched_lock);
  938. return;
  939. }
  940. printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
  941. "-rpcwait -action- ---ops--\n");
  942. alltask_for_each(t, le, &all_tasks) {
  943. const char *rpc_waitq = "none";
  944. if (RPC_IS_QUEUED(t))
  945. rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
  946. printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
  947. t->tk_pid,
  948. (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
  949. t->tk_flags, t->tk_status,
  950. t->tk_client,
  951. (t->tk_client ? t->tk_client->cl_prog : 0),
  952. t->tk_rqstp, t->tk_timeout,
  953. rpc_waitq,
  954. t->tk_action, t->tk_ops);
  955. }
  956. spin_unlock(&rpc_sched_lock);
  957. }
  958. #endif
  959. void
  960. rpc_destroy_mempool(void)
  961. {
  962. if (rpc_buffer_mempool)
  963. mempool_destroy(rpc_buffer_mempool);
  964. if (rpc_task_mempool)
  965. mempool_destroy(rpc_task_mempool);
  966. if (rpc_task_slabp)
  967. kmem_cache_destroy(rpc_task_slabp);
  968. if (rpc_buffer_slabp)
  969. kmem_cache_destroy(rpc_buffer_slabp);
  970. }
  971. int
  972. rpc_init_mempool(void)
  973. {
  974. rpc_task_slabp = kmem_cache_create("rpc_tasks",
  975. sizeof(struct rpc_task),
  976. 0, SLAB_HWCACHE_ALIGN,
  977. NULL, NULL);
  978. if (!rpc_task_slabp)
  979. goto err_nomem;
  980. rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
  981. RPC_BUFFER_MAXSIZE,
  982. 0, SLAB_HWCACHE_ALIGN,
  983. NULL, NULL);
  984. if (!rpc_buffer_slabp)
  985. goto err_nomem;
  986. rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
  987. rpc_task_slabp);
  988. if (!rpc_task_mempool)
  989. goto err_nomem;
  990. rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
  991. rpc_buffer_slabp);
  992. if (!rpc_buffer_mempool)
  993. goto err_nomem;
  994. return 0;
  995. err_nomem:
  996. rpc_destroy_mempool();
  997. return -ENOMEM;
  998. }