sched.c 26 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100
  1. /*
  2. * linux/net/sunrpc/sched.c
  3. *
  4. * Scheduling for synchronous and asynchronous RPC requests.
  5. *
  6. * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
  7. *
  8. * TCP NFS related read + write fixes
  9. * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10. */
  11. #include <linux/module.h>
  12. #include <linux/sched.h>
  13. #include <linux/interrupt.h>
  14. #include <linux/slab.h>
  15. #include <linux/mempool.h>
  16. #include <linux/smp.h>
  17. #include <linux/smp_lock.h>
  18. #include <linux/spinlock.h>
  19. #include <linux/mutex.h>
  20. #include <linux/sunrpc/clnt.h>
  21. #include <linux/sunrpc/xprt.h>
  22. #ifdef RPC_DEBUG
  23. #define RPCDBG_FACILITY RPCDBG_SCHED
  24. #define RPC_TASK_MAGIC_ID 0xf00baa
  25. static int rpc_task_id;
  26. #endif
  27. /*
  28. * RPC slabs and memory pools
  29. */
  30. #define RPC_BUFFER_MAXSIZE (2048)
  31. #define RPC_BUFFER_POOLSIZE (8)
  32. #define RPC_TASK_POOLSIZE (8)
  33. static kmem_cache_t *rpc_task_slabp __read_mostly;
  34. static kmem_cache_t *rpc_buffer_slabp __read_mostly;
  35. static mempool_t *rpc_task_mempool __read_mostly;
  36. static mempool_t *rpc_buffer_mempool __read_mostly;
  37. static void __rpc_default_timer(struct rpc_task *task);
  38. static void rpciod_killall(void);
  39. static void rpc_async_schedule(void *);
  40. /*
  41. * RPC tasks sit here while waiting for conditions to improve.
  42. */
  43. static RPC_WAITQ(delay_queue, "delayq");
  44. /*
  45. * All RPC tasks are linked into this list
  46. */
  47. static LIST_HEAD(all_tasks);
  48. /*
  49. * rpciod-related stuff
  50. */
  51. static DEFINE_MUTEX(rpciod_mutex);
  52. static unsigned int rpciod_users;
  53. struct workqueue_struct *rpciod_workqueue;
  54. /*
  55. * Spinlock for other critical sections of code.
  56. */
  57. static DEFINE_SPINLOCK(rpc_sched_lock);
  58. /*
  59. * Disable the timer for a given RPC task. Should be called with
  60. * queue->lock and bh_disabled in order to avoid races within
  61. * rpc_run_timer().
  62. */
  63. static inline void
  64. __rpc_disable_timer(struct rpc_task *task)
  65. {
  66. dprintk("RPC: %4d disabling timer\n", task->tk_pid);
  67. task->tk_timeout_fn = NULL;
  68. task->tk_timeout = 0;
  69. }
  70. /*
  71. * Run a timeout function.
  72. * We use the callback in order to allow __rpc_wake_up_task()
  73. * and friends to disable the timer synchronously on SMP systems
  74. * without calling del_timer_sync(). The latter could cause a
  75. * deadlock if called while we're holding spinlocks...
  76. */
  77. static void rpc_run_timer(struct rpc_task *task)
  78. {
  79. void (*callback)(struct rpc_task *);
  80. callback = task->tk_timeout_fn;
  81. task->tk_timeout_fn = NULL;
  82. if (callback && RPC_IS_QUEUED(task)) {
  83. dprintk("RPC: %4d running timer\n", task->tk_pid);
  84. callback(task);
  85. }
  86. smp_mb__before_clear_bit();
  87. clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
  88. smp_mb__after_clear_bit();
  89. }
  90. /*
  91. * Set up a timer for the current task.
  92. */
  93. static inline void
  94. __rpc_add_timer(struct rpc_task *task, rpc_action timer)
  95. {
  96. if (!task->tk_timeout)
  97. return;
  98. dprintk("RPC: %4d setting alarm for %lu ms\n",
  99. task->tk_pid, task->tk_timeout * 1000 / HZ);
  100. if (timer)
  101. task->tk_timeout_fn = timer;
  102. else
  103. task->tk_timeout_fn = __rpc_default_timer;
  104. set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
  105. mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
  106. }
  107. /*
  108. * Delete any timer for the current task. Because we use del_timer_sync(),
  109. * this function should never be called while holding queue->lock.
  110. */
  111. static void
  112. rpc_delete_timer(struct rpc_task *task)
  113. {
  114. if (RPC_IS_QUEUED(task))
  115. return;
  116. if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
  117. del_singleshot_timer_sync(&task->tk_timer);
  118. dprintk("RPC: %4d deleting timer\n", task->tk_pid);
  119. }
  120. }
  121. /*
  122. * Add new request to a priority queue.
  123. */
  124. static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
  125. {
  126. struct list_head *q;
  127. struct rpc_task *t;
  128. INIT_LIST_HEAD(&task->u.tk_wait.links);
  129. q = &queue->tasks[task->tk_priority];
  130. if (unlikely(task->tk_priority > queue->maxpriority))
  131. q = &queue->tasks[queue->maxpriority];
  132. list_for_each_entry(t, q, u.tk_wait.list) {
  133. if (t->tk_cookie == task->tk_cookie) {
  134. list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
  135. return;
  136. }
  137. }
  138. list_add_tail(&task->u.tk_wait.list, q);
  139. }
  140. /*
  141. * Add new request to wait queue.
  142. *
  143. * Swapper tasks always get inserted at the head of the queue.
  144. * This should avoid many nasty memory deadlocks and hopefully
  145. * improve overall performance.
  146. * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  147. */
  148. static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
  149. {
  150. BUG_ON (RPC_IS_QUEUED(task));
  151. if (RPC_IS_PRIORITY(queue))
  152. __rpc_add_wait_queue_priority(queue, task);
  153. else if (RPC_IS_SWAPPER(task))
  154. list_add(&task->u.tk_wait.list, &queue->tasks[0]);
  155. else
  156. list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
  157. task->u.tk_wait.rpc_waitq = queue;
  158. queue->qlen++;
  159. rpc_set_queued(task);
  160. dprintk("RPC: %4d added to queue %p \"%s\"\n",
  161. task->tk_pid, queue, rpc_qname(queue));
  162. }
  163. /*
  164. * Remove request from a priority queue.
  165. */
  166. static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
  167. {
  168. struct rpc_task *t;
  169. if (!list_empty(&task->u.tk_wait.links)) {
  170. t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
  171. list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
  172. list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
  173. }
  174. list_del(&task->u.tk_wait.list);
  175. }
  176. /*
  177. * Remove request from queue.
  178. * Note: must be called with spin lock held.
  179. */
  180. static void __rpc_remove_wait_queue(struct rpc_task *task)
  181. {
  182. struct rpc_wait_queue *queue;
  183. queue = task->u.tk_wait.rpc_waitq;
  184. if (RPC_IS_PRIORITY(queue))
  185. __rpc_remove_wait_queue_priority(task);
  186. else
  187. list_del(&task->u.tk_wait.list);
  188. queue->qlen--;
  189. dprintk("RPC: %4d removed from queue %p \"%s\"\n",
  190. task->tk_pid, queue, rpc_qname(queue));
  191. }
  192. static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
  193. {
  194. queue->priority = priority;
  195. queue->count = 1 << (priority * 2);
  196. }
  197. static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie)
  198. {
  199. queue->cookie = cookie;
  200. queue->nr = RPC_BATCH_COUNT;
  201. }
  202. static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
  203. {
  204. rpc_set_waitqueue_priority(queue, queue->maxpriority);
  205. rpc_set_waitqueue_cookie(queue, 0);
  206. }
  207. static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio)
  208. {
  209. int i;
  210. spin_lock_init(&queue->lock);
  211. for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
  212. INIT_LIST_HEAD(&queue->tasks[i]);
  213. queue->maxpriority = maxprio;
  214. rpc_reset_waitqueue_priority(queue);
  215. #ifdef RPC_DEBUG
  216. queue->name = qname;
  217. #endif
  218. }
  219. void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
  220. {
  221. __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH);
  222. }
  223. void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
  224. {
  225. __rpc_init_priority_wait_queue(queue, qname, 0);
  226. }
  227. EXPORT_SYMBOL(rpc_init_wait_queue);
  228. static int rpc_wait_bit_interruptible(void *word)
  229. {
  230. if (signal_pending(current))
  231. return -ERESTARTSYS;
  232. schedule();
  233. return 0;
  234. }
  235. /*
  236. * Mark an RPC call as having completed by clearing the 'active' bit
  237. */
  238. static inline void rpc_mark_complete_task(struct rpc_task *task)
  239. {
  240. rpc_clear_active(task);
  241. wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
  242. }
  243. /*
  244. * Allow callers to wait for completion of an RPC call
  245. */
  246. int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
  247. {
  248. if (action == NULL)
  249. action = rpc_wait_bit_interruptible;
  250. return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
  251. action, TASK_INTERRUPTIBLE);
  252. }
  253. EXPORT_SYMBOL(__rpc_wait_for_completion_task);
  254. /*
  255. * Make an RPC task runnable.
  256. *
  257. * Note: If the task is ASYNC, this must be called with
  258. * the spinlock held to protect the wait queue operation.
  259. */
  260. static void rpc_make_runnable(struct rpc_task *task)
  261. {
  262. int do_ret;
  263. BUG_ON(task->tk_timeout_fn);
  264. do_ret = rpc_test_and_set_running(task);
  265. rpc_clear_queued(task);
  266. if (do_ret)
  267. return;
  268. if (RPC_IS_ASYNC(task)) {
  269. int status;
  270. INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task);
  271. status = queue_work(task->tk_workqueue, &task->u.tk_work);
  272. if (status < 0) {
  273. printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
  274. task->tk_status = status;
  275. return;
  276. }
  277. } else
  278. wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
  279. }
  280. /*
  281. * Prepare for sleeping on a wait queue.
  282. * By always appending tasks to the list we ensure FIFO behavior.
  283. * NB: An RPC task will only receive interrupt-driven events as long
  284. * as it's on a wait queue.
  285. */
  286. static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
  287. rpc_action action, rpc_action timer)
  288. {
  289. dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
  290. rpc_qname(q), jiffies);
  291. if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
  292. printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
  293. return;
  294. }
  295. /* Mark the task as being activated if so needed */
  296. rpc_set_active(task);
  297. __rpc_add_wait_queue(q, task);
  298. BUG_ON(task->tk_callback != NULL);
  299. task->tk_callback = action;
  300. __rpc_add_timer(task, timer);
  301. }
  302. void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
  303. rpc_action action, rpc_action timer)
  304. {
  305. /*
  306. * Protect the queue operations.
  307. */
  308. spin_lock_bh(&q->lock);
  309. __rpc_sleep_on(q, task, action, timer);
  310. spin_unlock_bh(&q->lock);
  311. }
  312. /**
  313. * __rpc_do_wake_up_task - wake up a single rpc_task
  314. * @task: task to be woken up
  315. *
  316. * Caller must hold queue->lock, and have cleared the task queued flag.
  317. */
  318. static void __rpc_do_wake_up_task(struct rpc_task *task)
  319. {
  320. dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies);
  321. #ifdef RPC_DEBUG
  322. BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
  323. #endif
  324. /* Has the task been executed yet? If not, we cannot wake it up! */
  325. if (!RPC_IS_ACTIVATED(task)) {
  326. printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
  327. return;
  328. }
  329. __rpc_disable_timer(task);
  330. __rpc_remove_wait_queue(task);
  331. rpc_make_runnable(task);
  332. dprintk("RPC: __rpc_wake_up_task done\n");
  333. }
  334. /*
  335. * Wake up the specified task
  336. */
  337. static void __rpc_wake_up_task(struct rpc_task *task)
  338. {
  339. if (rpc_start_wakeup(task)) {
  340. if (RPC_IS_QUEUED(task))
  341. __rpc_do_wake_up_task(task);
  342. rpc_finish_wakeup(task);
  343. }
  344. }
  345. /*
  346. * Default timeout handler if none specified by user
  347. */
  348. static void
  349. __rpc_default_timer(struct rpc_task *task)
  350. {
  351. dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
  352. task->tk_status = -ETIMEDOUT;
  353. rpc_wake_up_task(task);
  354. }
  355. /*
  356. * Wake up the specified task
  357. */
  358. void rpc_wake_up_task(struct rpc_task *task)
  359. {
  360. if (rpc_start_wakeup(task)) {
  361. if (RPC_IS_QUEUED(task)) {
  362. struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
  363. spin_lock_bh(&queue->lock);
  364. __rpc_do_wake_up_task(task);
  365. spin_unlock_bh(&queue->lock);
  366. }
  367. rpc_finish_wakeup(task);
  368. }
  369. }
  370. /*
  371. * Wake up the next task on a priority queue.
  372. */
  373. static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
  374. {
  375. struct list_head *q;
  376. struct rpc_task *task;
  377. /*
  378. * Service a batch of tasks from a single cookie.
  379. */
  380. q = &queue->tasks[queue->priority];
  381. if (!list_empty(q)) {
  382. task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
  383. if (queue->cookie == task->tk_cookie) {
  384. if (--queue->nr)
  385. goto out;
  386. list_move_tail(&task->u.tk_wait.list, q);
  387. }
  388. /*
  389. * Check if we need to switch queues.
  390. */
  391. if (--queue->count)
  392. goto new_cookie;
  393. }
  394. /*
  395. * Service the next queue.
  396. */
  397. do {
  398. if (q == &queue->tasks[0])
  399. q = &queue->tasks[queue->maxpriority];
  400. else
  401. q = q - 1;
  402. if (!list_empty(q)) {
  403. task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
  404. goto new_queue;
  405. }
  406. } while (q != &queue->tasks[queue->priority]);
  407. rpc_reset_waitqueue_priority(queue);
  408. return NULL;
  409. new_queue:
  410. rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
  411. new_cookie:
  412. rpc_set_waitqueue_cookie(queue, task->tk_cookie);
  413. out:
  414. __rpc_wake_up_task(task);
  415. return task;
  416. }
  417. /*
  418. * Wake up the next task on the wait queue.
  419. */
  420. struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
  421. {
  422. struct rpc_task *task = NULL;
  423. dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
  424. spin_lock_bh(&queue->lock);
  425. if (RPC_IS_PRIORITY(queue))
  426. task = __rpc_wake_up_next_priority(queue);
  427. else {
  428. task_for_first(task, &queue->tasks[0])
  429. __rpc_wake_up_task(task);
  430. }
  431. spin_unlock_bh(&queue->lock);
  432. return task;
  433. }
  434. /**
  435. * rpc_wake_up - wake up all rpc_tasks
  436. * @queue: rpc_wait_queue on which the tasks are sleeping
  437. *
  438. * Grabs queue->lock
  439. */
  440. void rpc_wake_up(struct rpc_wait_queue *queue)
  441. {
  442. struct rpc_task *task, *next;
  443. struct list_head *head;
  444. spin_lock_bh(&queue->lock);
  445. head = &queue->tasks[queue->maxpriority];
  446. for (;;) {
  447. list_for_each_entry_safe(task, next, head, u.tk_wait.list)
  448. __rpc_wake_up_task(task);
  449. if (head == &queue->tasks[0])
  450. break;
  451. head--;
  452. }
  453. spin_unlock_bh(&queue->lock);
  454. }
  455. /**
  456. * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
  457. * @queue: rpc_wait_queue on which the tasks are sleeping
  458. * @status: status value to set
  459. *
  460. * Grabs queue->lock
  461. */
  462. void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
  463. {
  464. struct rpc_task *task, *next;
  465. struct list_head *head;
  466. spin_lock_bh(&queue->lock);
  467. head = &queue->tasks[queue->maxpriority];
  468. for (;;) {
  469. list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
  470. task->tk_status = status;
  471. __rpc_wake_up_task(task);
  472. }
  473. if (head == &queue->tasks[0])
  474. break;
  475. head--;
  476. }
  477. spin_unlock_bh(&queue->lock);
  478. }
  479. /*
  480. * Run a task at a later time
  481. */
  482. static void __rpc_atrun(struct rpc_task *);
  483. void
  484. rpc_delay(struct rpc_task *task, unsigned long delay)
  485. {
  486. task->tk_timeout = delay;
  487. rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
  488. }
  489. static void
  490. __rpc_atrun(struct rpc_task *task)
  491. {
  492. task->tk_status = 0;
  493. rpc_wake_up_task(task);
  494. }
  495. /*
  496. * Helper to call task->tk_ops->rpc_call_prepare
  497. */
  498. static void rpc_prepare_task(struct rpc_task *task)
  499. {
  500. task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
  501. }
  502. /*
  503. * Helper that calls task->tk_ops->rpc_call_done if it exists
  504. */
  505. void rpc_exit_task(struct rpc_task *task)
  506. {
  507. task->tk_action = NULL;
  508. if (task->tk_ops->rpc_call_done != NULL) {
  509. task->tk_ops->rpc_call_done(task, task->tk_calldata);
  510. if (task->tk_action != NULL) {
  511. WARN_ON(RPC_ASSASSINATED(task));
  512. /* Always release the RPC slot and buffer memory */
  513. xprt_release(task);
  514. }
  515. }
  516. }
  517. EXPORT_SYMBOL(rpc_exit_task);
  518. /*
  519. * This is the RPC `scheduler' (or rather, the finite state machine).
  520. */
  521. static int __rpc_execute(struct rpc_task *task)
  522. {
  523. int status = 0;
  524. dprintk("RPC: %4d rpc_execute flgs %x\n",
  525. task->tk_pid, task->tk_flags);
  526. BUG_ON(RPC_IS_QUEUED(task));
  527. for (;;) {
  528. /*
  529. * Garbage collection of pending timers...
  530. */
  531. rpc_delete_timer(task);
  532. /*
  533. * Execute any pending callback.
  534. */
  535. if (RPC_DO_CALLBACK(task)) {
  536. /* Define a callback save pointer */
  537. void (*save_callback)(struct rpc_task *);
  538. /*
  539. * If a callback exists, save it, reset it,
  540. * call it.
  541. * The save is needed to stop from resetting
  542. * another callback set within the callback handler
  543. * - Dave
  544. */
  545. save_callback=task->tk_callback;
  546. task->tk_callback=NULL;
  547. lock_kernel();
  548. save_callback(task);
  549. unlock_kernel();
  550. }
  551. /*
  552. * Perform the next FSM step.
  553. * tk_action may be NULL when the task has been killed
  554. * by someone else.
  555. */
  556. if (!RPC_IS_QUEUED(task)) {
  557. if (task->tk_action == NULL)
  558. break;
  559. lock_kernel();
  560. task->tk_action(task);
  561. unlock_kernel();
  562. }
  563. /*
  564. * Lockless check for whether task is sleeping or not.
  565. */
  566. if (!RPC_IS_QUEUED(task))
  567. continue;
  568. rpc_clear_running(task);
  569. if (RPC_IS_ASYNC(task)) {
  570. /* Careful! we may have raced... */
  571. if (RPC_IS_QUEUED(task))
  572. return 0;
  573. if (rpc_test_and_set_running(task))
  574. return 0;
  575. continue;
  576. }
  577. /* sync task: sleep here */
  578. dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid);
  579. /* Note: Caller should be using rpc_clnt_sigmask() */
  580. status = out_of_line_wait_on_bit(&task->tk_runstate,
  581. RPC_TASK_QUEUED, rpc_wait_bit_interruptible,
  582. TASK_INTERRUPTIBLE);
  583. if (status == -ERESTARTSYS) {
  584. /*
  585. * When a sync task receives a signal, it exits with
  586. * -ERESTARTSYS. In order to catch any callbacks that
  587. * clean up after sleeping on some queue, we don't
  588. * break the loop here, but go around once more.
  589. */
  590. dprintk("RPC: %4d got signal\n", task->tk_pid);
  591. task->tk_flags |= RPC_TASK_KILLED;
  592. rpc_exit(task, -ERESTARTSYS);
  593. rpc_wake_up_task(task);
  594. }
  595. rpc_set_running(task);
  596. dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
  597. }
  598. dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status);
  599. /* Wake up anyone who is waiting for task completion */
  600. rpc_mark_complete_task(task);
  601. /* Release all resources associated with the task */
  602. rpc_release_task(task);
  603. return status;
  604. }
  605. /*
  606. * User-visible entry point to the scheduler.
  607. *
  608. * This may be called recursively if e.g. an async NFS task updates
  609. * the attributes and finds that dirty pages must be flushed.
  610. * NOTE: Upon exit of this function the task is guaranteed to be
  611. * released. In particular note that tk_release() will have
  612. * been called, so your task memory may have been freed.
  613. */
  614. int
  615. rpc_execute(struct rpc_task *task)
  616. {
  617. rpc_set_active(task);
  618. rpc_set_running(task);
  619. return __rpc_execute(task);
  620. }
  621. static void rpc_async_schedule(void *arg)
  622. {
  623. __rpc_execute((struct rpc_task *)arg);
  624. }
  625. /**
  626. * rpc_malloc - allocate an RPC buffer
  627. * @task: RPC task that will use this buffer
  628. * @size: requested byte size
  629. *
  630. * We try to ensure that some NFS reads and writes can always proceed
  631. * by using a mempool when allocating 'small' buffers.
  632. * In order to avoid memory starvation triggering more writebacks of
  633. * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
  634. */
  635. void * rpc_malloc(struct rpc_task *task, size_t size)
  636. {
  637. struct rpc_rqst *req = task->tk_rqstp;
  638. gfp_t gfp;
  639. if (task->tk_flags & RPC_TASK_SWAPPER)
  640. gfp = GFP_ATOMIC;
  641. else
  642. gfp = GFP_NOFS;
  643. if (size > RPC_BUFFER_MAXSIZE) {
  644. req->rq_buffer = kmalloc(size, gfp);
  645. if (req->rq_buffer)
  646. req->rq_bufsize = size;
  647. } else {
  648. req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
  649. if (req->rq_buffer)
  650. req->rq_bufsize = RPC_BUFFER_MAXSIZE;
  651. }
  652. return req->rq_buffer;
  653. }
  654. /**
  655. * rpc_free - free buffer allocated via rpc_malloc
  656. * @task: RPC task with a buffer to be freed
  657. *
  658. */
  659. void rpc_free(struct rpc_task *task)
  660. {
  661. struct rpc_rqst *req = task->tk_rqstp;
  662. if (req->rq_buffer) {
  663. if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
  664. mempool_free(req->rq_buffer, rpc_buffer_mempool);
  665. else
  666. kfree(req->rq_buffer);
  667. req->rq_buffer = NULL;
  668. req->rq_bufsize = 0;
  669. }
  670. }
  671. /*
  672. * Creation and deletion of RPC task structures
  673. */
  674. void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
  675. {
  676. memset(task, 0, sizeof(*task));
  677. init_timer(&task->tk_timer);
  678. task->tk_timer.data = (unsigned long) task;
  679. task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
  680. atomic_set(&task->tk_count, 1);
  681. task->tk_client = clnt;
  682. task->tk_flags = flags;
  683. task->tk_ops = tk_ops;
  684. if (tk_ops->rpc_call_prepare != NULL)
  685. task->tk_action = rpc_prepare_task;
  686. task->tk_calldata = calldata;
  687. /* Initialize retry counters */
  688. task->tk_garb_retry = 2;
  689. task->tk_cred_retry = 2;
  690. task->tk_priority = RPC_PRIORITY_NORMAL;
  691. task->tk_cookie = (unsigned long)current;
  692. /* Initialize workqueue for async tasks */
  693. task->tk_workqueue = rpciod_workqueue;
  694. if (clnt) {
  695. atomic_inc(&clnt->cl_users);
  696. if (clnt->cl_softrtry)
  697. task->tk_flags |= RPC_TASK_SOFT;
  698. if (!clnt->cl_intr)
  699. task->tk_flags |= RPC_TASK_NOINTR;
  700. }
  701. #ifdef RPC_DEBUG
  702. task->tk_magic = RPC_TASK_MAGIC_ID;
  703. task->tk_pid = rpc_task_id++;
  704. #endif
  705. /* Add to global list of all tasks */
  706. spin_lock(&rpc_sched_lock);
  707. list_add_tail(&task->tk_task, &all_tasks);
  708. spin_unlock(&rpc_sched_lock);
  709. BUG_ON(task->tk_ops == NULL);
  710. /* starting timestamp */
  711. task->tk_start = jiffies;
  712. dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
  713. current->pid);
  714. }
  715. static struct rpc_task *
  716. rpc_alloc_task(void)
  717. {
  718. return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
  719. }
  720. static void rpc_free_task(struct rpc_task *task)
  721. {
  722. dprintk("RPC: %4d freeing task\n", task->tk_pid);
  723. mempool_free(task, rpc_task_mempool);
  724. }
  725. /*
  726. * Create a new task for the specified client. We have to
  727. * clean up after an allocation failure, as the client may
  728. * have specified "oneshot".
  729. */
  730. struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
  731. {
  732. struct rpc_task *task;
  733. task = rpc_alloc_task();
  734. if (!task)
  735. goto cleanup;
  736. rpc_init_task(task, clnt, flags, tk_ops, calldata);
  737. dprintk("RPC: %4d allocated task\n", task->tk_pid);
  738. task->tk_flags |= RPC_TASK_DYNAMIC;
  739. out:
  740. return task;
  741. cleanup:
  742. /* Check whether to release the client */
  743. if (clnt) {
  744. printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
  745. atomic_read(&clnt->cl_users), clnt->cl_oneshot);
  746. atomic_inc(&clnt->cl_users); /* pretend we were used ... */
  747. rpc_release_client(clnt);
  748. }
  749. goto out;
  750. }
  751. void rpc_release_task(struct rpc_task *task)
  752. {
  753. const struct rpc_call_ops *tk_ops = task->tk_ops;
  754. void *calldata = task->tk_calldata;
  755. #ifdef RPC_DEBUG
  756. BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
  757. #endif
  758. if (!atomic_dec_and_test(&task->tk_count))
  759. return;
  760. dprintk("RPC: %4d release task\n", task->tk_pid);
  761. /* Remove from global task list */
  762. spin_lock(&rpc_sched_lock);
  763. list_del(&task->tk_task);
  764. spin_unlock(&rpc_sched_lock);
  765. BUG_ON (RPC_IS_QUEUED(task));
  766. /* Synchronously delete any running timer */
  767. rpc_delete_timer(task);
  768. /* Release resources */
  769. if (task->tk_rqstp)
  770. xprt_release(task);
  771. if (task->tk_msg.rpc_cred)
  772. rpcauth_unbindcred(task);
  773. if (task->tk_client) {
  774. rpc_release_client(task->tk_client);
  775. task->tk_client = NULL;
  776. }
  777. #ifdef RPC_DEBUG
  778. task->tk_magic = 0;
  779. #endif
  780. if (task->tk_flags & RPC_TASK_DYNAMIC)
  781. rpc_free_task(task);
  782. if (tk_ops->rpc_release)
  783. tk_ops->rpc_release(calldata);
  784. }
  785. /**
  786. * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
  787. * @clnt: pointer to RPC client
  788. * @flags: RPC flags
  789. * @ops: RPC call ops
  790. * @data: user call data
  791. */
  792. struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
  793. const struct rpc_call_ops *ops,
  794. void *data)
  795. {
  796. struct rpc_task *task;
  797. task = rpc_new_task(clnt, flags, ops, data);
  798. if (task == NULL) {
  799. if (ops->rpc_release != NULL)
  800. ops->rpc_release(data);
  801. return ERR_PTR(-ENOMEM);
  802. }
  803. atomic_inc(&task->tk_count);
  804. rpc_execute(task);
  805. return task;
  806. }
  807. EXPORT_SYMBOL(rpc_run_task);
  808. /*
  809. * Kill all tasks for the given client.
  810. * XXX: kill their descendants as well?
  811. */
  812. void rpc_killall_tasks(struct rpc_clnt *clnt)
  813. {
  814. struct rpc_task *rovr;
  815. struct list_head *le;
  816. dprintk("RPC: killing all tasks for client %p\n", clnt);
  817. /*
  818. * Spin lock all_tasks to prevent changes...
  819. */
  820. spin_lock(&rpc_sched_lock);
  821. alltask_for_each(rovr, le, &all_tasks) {
  822. if (! RPC_IS_ACTIVATED(rovr))
  823. continue;
  824. if (!clnt || rovr->tk_client == clnt) {
  825. rovr->tk_flags |= RPC_TASK_KILLED;
  826. rpc_exit(rovr, -EIO);
  827. rpc_wake_up_task(rovr);
  828. }
  829. }
  830. spin_unlock(&rpc_sched_lock);
  831. }
  832. static DECLARE_MUTEX_LOCKED(rpciod_running);
  833. static void rpciod_killall(void)
  834. {
  835. unsigned long flags;
  836. while (!list_empty(&all_tasks)) {
  837. clear_thread_flag(TIF_SIGPENDING);
  838. rpc_killall_tasks(NULL);
  839. flush_workqueue(rpciod_workqueue);
  840. if (!list_empty(&all_tasks)) {
  841. dprintk("rpciod_killall: waiting for tasks to exit\n");
  842. yield();
  843. }
  844. }
  845. spin_lock_irqsave(&current->sighand->siglock, flags);
  846. recalc_sigpending();
  847. spin_unlock_irqrestore(&current->sighand->siglock, flags);
  848. }
  849. /*
  850. * Start up the rpciod process if it's not already running.
  851. */
  852. int
  853. rpciod_up(void)
  854. {
  855. struct workqueue_struct *wq;
  856. int error = 0;
  857. mutex_lock(&rpciod_mutex);
  858. dprintk("rpciod_up: users %d\n", rpciod_users);
  859. rpciod_users++;
  860. if (rpciod_workqueue)
  861. goto out;
  862. /*
  863. * If there's no pid, we should be the first user.
  864. */
  865. if (rpciod_users > 1)
  866. printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users);
  867. /*
  868. * Create the rpciod thread and wait for it to start.
  869. */
  870. error = -ENOMEM;
  871. wq = create_workqueue("rpciod");
  872. if (wq == NULL) {
  873. printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
  874. rpciod_users--;
  875. goto out;
  876. }
  877. rpciod_workqueue = wq;
  878. error = 0;
  879. out:
  880. mutex_unlock(&rpciod_mutex);
  881. return error;
  882. }
  883. void
  884. rpciod_down(void)
  885. {
  886. mutex_lock(&rpciod_mutex);
  887. dprintk("rpciod_down sema %d\n", rpciod_users);
  888. if (rpciod_users) {
  889. if (--rpciod_users)
  890. goto out;
  891. } else
  892. printk(KERN_WARNING "rpciod_down: no users??\n");
  893. if (!rpciod_workqueue) {
  894. dprintk("rpciod_down: Nothing to do!\n");
  895. goto out;
  896. }
  897. rpciod_killall();
  898. destroy_workqueue(rpciod_workqueue);
  899. rpciod_workqueue = NULL;
  900. out:
  901. mutex_unlock(&rpciod_mutex);
  902. }
  903. #ifdef RPC_DEBUG
  904. void rpc_show_tasks(void)
  905. {
  906. struct list_head *le;
  907. struct rpc_task *t;
  908. spin_lock(&rpc_sched_lock);
  909. if (list_empty(&all_tasks)) {
  910. spin_unlock(&rpc_sched_lock);
  911. return;
  912. }
  913. printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
  914. "-rpcwait -action- ---ops--\n");
  915. alltask_for_each(t, le, &all_tasks) {
  916. const char *rpc_waitq = "none";
  917. if (RPC_IS_QUEUED(t))
  918. rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
  919. printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
  920. t->tk_pid,
  921. (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
  922. t->tk_flags, t->tk_status,
  923. t->tk_client,
  924. (t->tk_client ? t->tk_client->cl_prog : 0),
  925. t->tk_rqstp, t->tk_timeout,
  926. rpc_waitq,
  927. t->tk_action, t->tk_ops);
  928. }
  929. spin_unlock(&rpc_sched_lock);
  930. }
  931. #endif
  932. void
  933. rpc_destroy_mempool(void)
  934. {
  935. if (rpc_buffer_mempool)
  936. mempool_destroy(rpc_buffer_mempool);
  937. if (rpc_task_mempool)
  938. mempool_destroy(rpc_task_mempool);
  939. if (rpc_task_slabp && kmem_cache_destroy(rpc_task_slabp))
  940. printk(KERN_INFO "rpc_task: not all structures were freed\n");
  941. if (rpc_buffer_slabp && kmem_cache_destroy(rpc_buffer_slabp))
  942. printk(KERN_INFO "rpc_buffers: not all structures were freed\n");
  943. }
  944. int
  945. rpc_init_mempool(void)
  946. {
  947. rpc_task_slabp = kmem_cache_create("rpc_tasks",
  948. sizeof(struct rpc_task),
  949. 0, SLAB_HWCACHE_ALIGN,
  950. NULL, NULL);
  951. if (!rpc_task_slabp)
  952. goto err_nomem;
  953. rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
  954. RPC_BUFFER_MAXSIZE,
  955. 0, SLAB_HWCACHE_ALIGN,
  956. NULL, NULL);
  957. if (!rpc_buffer_slabp)
  958. goto err_nomem;
  959. rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
  960. rpc_task_slabp);
  961. if (!rpc_task_mempool)
  962. goto err_nomem;
  963. rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
  964. rpc_buffer_slabp);
  965. if (!rpc_buffer_mempool)
  966. goto err_nomem;
  967. return 0;
  968. err_nomem:
  969. rpc_destroy_mempool();
  970. return -ENOMEM;
  971. }