sched.c 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178
  1. /*
  2. * linux/net/sunrpc/sched.c
  3. *
  4. * Scheduling for synchronous and asynchronous RPC requests.
  5. *
  6. * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
  7. *
  8. * TCP NFS related read + write fixes
  9. * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10. */
  11. #include <linux/module.h>
  12. #include <linux/sched.h>
  13. #include <linux/interrupt.h>
  14. #include <linux/slab.h>
  15. #include <linux/mempool.h>
  16. #include <linux/smp.h>
  17. #include <linux/smp_lock.h>
  18. #include <linux/spinlock.h>
  19. #include <linux/mutex.h>
  20. #include <linux/sunrpc/clnt.h>
  21. #ifdef RPC_DEBUG
  22. #define RPCDBG_FACILITY RPCDBG_SCHED
  23. #define RPC_TASK_MAGIC_ID 0xf00baa
  24. #endif
  25. /*
  26. * RPC slabs and memory pools
  27. */
  28. #define RPC_BUFFER_MAXSIZE (2048)
  29. #define RPC_BUFFER_POOLSIZE (8)
  30. #define RPC_TASK_POOLSIZE (8)
  31. static struct kmem_cache *rpc_task_slabp __read_mostly;
  32. static struct kmem_cache *rpc_buffer_slabp __read_mostly;
  33. static mempool_t *rpc_task_mempool __read_mostly;
  34. static mempool_t *rpc_buffer_mempool __read_mostly;
  35. static void __rpc_default_timer(struct rpc_task *task);
  36. static void rpciod_killall(void);
  37. static void rpc_async_schedule(struct work_struct *);
  38. static void rpc_release_task(struct rpc_task *task);
  39. /*
  40. * RPC tasks sit here while waiting for conditions to improve.
  41. */
  42. static RPC_WAITQ(delay_queue, "delayq");
  43. /*
  44. * All RPC clients are linked into this list
  45. */
  46. static LIST_HEAD(all_clients);
  47. static DECLARE_WAIT_QUEUE_HEAD(client_kill_wait);
  48. /*
  49. * rpciod-related stuff
  50. */
  51. static DEFINE_MUTEX(rpciod_mutex);
  52. static unsigned int rpciod_users;
  53. struct workqueue_struct *rpciod_workqueue;
  54. /*
  55. * Spinlock for other critical sections of code.
  56. */
  57. static DEFINE_SPINLOCK(rpc_sched_lock);
  58. /*
  59. * Disable the timer for a given RPC task. Should be called with
  60. * queue->lock and bh_disabled in order to avoid races within
  61. * rpc_run_timer().
  62. */
  63. static inline void
  64. __rpc_disable_timer(struct rpc_task *task)
  65. {
  66. dprintk("RPC: %5u disabling timer\n", task->tk_pid);
  67. task->tk_timeout_fn = NULL;
  68. task->tk_timeout = 0;
  69. }
  70. /*
  71. * Run a timeout function.
  72. * We use the callback in order to allow __rpc_wake_up_task()
  73. * and friends to disable the timer synchronously on SMP systems
  74. * without calling del_timer_sync(). The latter could cause a
  75. * deadlock if called while we're holding spinlocks...
  76. */
  77. static void rpc_run_timer(struct rpc_task *task)
  78. {
  79. void (*callback)(struct rpc_task *);
  80. callback = task->tk_timeout_fn;
  81. task->tk_timeout_fn = NULL;
  82. if (callback && RPC_IS_QUEUED(task)) {
  83. dprintk("RPC: %5u running timer\n", task->tk_pid);
  84. callback(task);
  85. }
  86. smp_mb__before_clear_bit();
  87. clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
  88. smp_mb__after_clear_bit();
  89. }
  90. /*
  91. * Set up a timer for the current task.
  92. */
  93. static inline void
  94. __rpc_add_timer(struct rpc_task *task, rpc_action timer)
  95. {
  96. if (!task->tk_timeout)
  97. return;
  98. dprintk("RPC: %5u setting alarm for %lu ms\n",
  99. task->tk_pid, task->tk_timeout * 1000 / HZ);
  100. if (timer)
  101. task->tk_timeout_fn = timer;
  102. else
  103. task->tk_timeout_fn = __rpc_default_timer;
  104. set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
  105. mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
  106. }
  107. /*
  108. * Delete any timer for the current task. Because we use del_timer_sync(),
  109. * this function should never be called while holding queue->lock.
  110. */
  111. static void
  112. rpc_delete_timer(struct rpc_task *task)
  113. {
  114. if (RPC_IS_QUEUED(task))
  115. return;
  116. if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
  117. del_singleshot_timer_sync(&task->tk_timer);
  118. dprintk("RPC: %5u deleting timer\n", task->tk_pid);
  119. }
  120. }
  121. /*
  122. * Add new request to a priority queue.
  123. */
  124. static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
  125. {
  126. struct list_head *q;
  127. struct rpc_task *t;
  128. INIT_LIST_HEAD(&task->u.tk_wait.links);
  129. q = &queue->tasks[task->tk_priority];
  130. if (unlikely(task->tk_priority > queue->maxpriority))
  131. q = &queue->tasks[queue->maxpriority];
  132. list_for_each_entry(t, q, u.tk_wait.list) {
  133. if (t->tk_cookie == task->tk_cookie) {
  134. list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
  135. return;
  136. }
  137. }
  138. list_add_tail(&task->u.tk_wait.list, q);
  139. }
  140. /*
  141. * Add new request to wait queue.
  142. *
  143. * Swapper tasks always get inserted at the head of the queue.
  144. * This should avoid many nasty memory deadlocks and hopefully
  145. * improve overall performance.
  146. * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  147. */
  148. static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
  149. {
  150. BUG_ON (RPC_IS_QUEUED(task));
  151. if (RPC_IS_PRIORITY(queue))
  152. __rpc_add_wait_queue_priority(queue, task);
  153. else if (RPC_IS_SWAPPER(task))
  154. list_add(&task->u.tk_wait.list, &queue->tasks[0]);
  155. else
  156. list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
  157. task->u.tk_wait.rpc_waitq = queue;
  158. queue->qlen++;
  159. rpc_set_queued(task);
  160. dprintk("RPC: %5u added to queue %p \"%s\"\n",
  161. task->tk_pid, queue, rpc_qname(queue));
  162. }
  163. /*
  164. * Remove request from a priority queue.
  165. */
  166. static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
  167. {
  168. struct rpc_task *t;
  169. if (!list_empty(&task->u.tk_wait.links)) {
  170. t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
  171. list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
  172. list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
  173. }
  174. list_del(&task->u.tk_wait.list);
  175. }
  176. /*
  177. * Remove request from queue.
  178. * Note: must be called with spin lock held.
  179. */
  180. static void __rpc_remove_wait_queue(struct rpc_task *task)
  181. {
  182. struct rpc_wait_queue *queue;
  183. queue = task->u.tk_wait.rpc_waitq;
  184. if (RPC_IS_PRIORITY(queue))
  185. __rpc_remove_wait_queue_priority(task);
  186. else
  187. list_del(&task->u.tk_wait.list);
  188. queue->qlen--;
  189. dprintk("RPC: %5u removed from queue %p \"%s\"\n",
  190. task->tk_pid, queue, rpc_qname(queue));
  191. }
  192. static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
  193. {
  194. queue->priority = priority;
  195. queue->count = 1 << (priority * 2);
  196. }
  197. static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie)
  198. {
  199. queue->cookie = cookie;
  200. queue->nr = RPC_BATCH_COUNT;
  201. }
  202. static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
  203. {
  204. rpc_set_waitqueue_priority(queue, queue->maxpriority);
  205. rpc_set_waitqueue_cookie(queue, 0);
  206. }
  207. static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio)
  208. {
  209. int i;
  210. spin_lock_init(&queue->lock);
  211. for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
  212. INIT_LIST_HEAD(&queue->tasks[i]);
  213. queue->maxpriority = maxprio;
  214. rpc_reset_waitqueue_priority(queue);
  215. #ifdef RPC_DEBUG
  216. queue->name = qname;
  217. #endif
  218. }
  219. void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
  220. {
  221. __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH);
  222. }
  223. void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
  224. {
  225. __rpc_init_priority_wait_queue(queue, qname, 0);
  226. }
  227. EXPORT_SYMBOL(rpc_init_wait_queue);
  228. static int rpc_wait_bit_interruptible(void *word)
  229. {
  230. if (signal_pending(current))
  231. return -ERESTARTSYS;
  232. schedule();
  233. return 0;
  234. }
  235. #ifdef RPC_DEBUG
  236. static void rpc_task_set_debuginfo(struct rpc_task *task)
  237. {
  238. static atomic_t rpc_pid;
  239. task->tk_magic = RPC_TASK_MAGIC_ID;
  240. task->tk_pid = atomic_inc_return(&rpc_pid);
  241. }
  242. #else
  243. static inline void rpc_task_set_debuginfo(struct rpc_task *task)
  244. {
  245. }
  246. #endif
  247. static void rpc_set_active(struct rpc_task *task)
  248. {
  249. struct rpc_clnt *clnt;
  250. if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
  251. return;
  252. rpc_task_set_debuginfo(task);
  253. /* Add to global list of all tasks */
  254. clnt = task->tk_client;
  255. if (clnt != NULL) {
  256. spin_lock(&clnt->cl_lock);
  257. list_add_tail(&task->tk_task, &clnt->cl_tasks);
  258. spin_unlock(&clnt->cl_lock);
  259. }
  260. }
  261. /*
  262. * Mark an RPC call as having completed by clearing the 'active' bit
  263. */
  264. static void rpc_mark_complete_task(struct rpc_task *task)
  265. {
  266. smp_mb__before_clear_bit();
  267. clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
  268. smp_mb__after_clear_bit();
  269. wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
  270. }
  271. /*
  272. * Allow callers to wait for completion of an RPC call
  273. */
  274. int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
  275. {
  276. if (action == NULL)
  277. action = rpc_wait_bit_interruptible;
  278. return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
  279. action, TASK_INTERRUPTIBLE);
  280. }
  281. EXPORT_SYMBOL(__rpc_wait_for_completion_task);
  282. /*
  283. * Make an RPC task runnable.
  284. *
  285. * Note: If the task is ASYNC, this must be called with
  286. * the spinlock held to protect the wait queue operation.
  287. */
  288. static void rpc_make_runnable(struct rpc_task *task)
  289. {
  290. BUG_ON(task->tk_timeout_fn);
  291. rpc_clear_queued(task);
  292. if (rpc_test_and_set_running(task))
  293. return;
  294. /* We might have raced */
  295. if (RPC_IS_QUEUED(task)) {
  296. rpc_clear_running(task);
  297. return;
  298. }
  299. if (RPC_IS_ASYNC(task)) {
  300. int status;
  301. INIT_WORK(&task->u.tk_work, rpc_async_schedule);
  302. status = queue_work(task->tk_workqueue, &task->u.tk_work);
  303. if (status < 0) {
  304. printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
  305. task->tk_status = status;
  306. return;
  307. }
  308. } else
  309. wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
  310. }
  311. /*
  312. * Prepare for sleeping on a wait queue.
  313. * By always appending tasks to the list we ensure FIFO behavior.
  314. * NB: An RPC task will only receive interrupt-driven events as long
  315. * as it's on a wait queue.
  316. */
  317. static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
  318. rpc_action action, rpc_action timer)
  319. {
  320. dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
  321. task->tk_pid, rpc_qname(q), jiffies);
  322. if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
  323. printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
  324. return;
  325. }
  326. __rpc_add_wait_queue(q, task);
  327. BUG_ON(task->tk_callback != NULL);
  328. task->tk_callback = action;
  329. __rpc_add_timer(task, timer);
  330. }
  331. void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
  332. rpc_action action, rpc_action timer)
  333. {
  334. /* Mark the task as being activated if so needed */
  335. rpc_set_active(task);
  336. /*
  337. * Protect the queue operations.
  338. */
  339. spin_lock_bh(&q->lock);
  340. __rpc_sleep_on(q, task, action, timer);
  341. spin_unlock_bh(&q->lock);
  342. }
  343. /**
  344. * __rpc_do_wake_up_task - wake up a single rpc_task
  345. * @task: task to be woken up
  346. *
  347. * Caller must hold queue->lock, and have cleared the task queued flag.
  348. */
  349. static void __rpc_do_wake_up_task(struct rpc_task *task)
  350. {
  351. dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
  352. task->tk_pid, jiffies);
  353. #ifdef RPC_DEBUG
  354. BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
  355. #endif
  356. /* Has the task been executed yet? If not, we cannot wake it up! */
  357. if (!RPC_IS_ACTIVATED(task)) {
  358. printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
  359. return;
  360. }
  361. __rpc_disable_timer(task);
  362. __rpc_remove_wait_queue(task);
  363. rpc_make_runnable(task);
  364. dprintk("RPC: __rpc_wake_up_task done\n");
  365. }
  366. /*
  367. * Wake up the specified task
  368. */
  369. static void __rpc_wake_up_task(struct rpc_task *task)
  370. {
  371. if (rpc_start_wakeup(task)) {
  372. if (RPC_IS_QUEUED(task))
  373. __rpc_do_wake_up_task(task);
  374. rpc_finish_wakeup(task);
  375. }
  376. }
  377. /*
  378. * Default timeout handler if none specified by user
  379. */
  380. static void
  381. __rpc_default_timer(struct rpc_task *task)
  382. {
  383. dprintk("RPC: %5u timeout (default timer)\n", task->tk_pid);
  384. task->tk_status = -ETIMEDOUT;
  385. rpc_wake_up_task(task);
  386. }
  387. /*
  388. * Wake up the specified task
  389. */
  390. void rpc_wake_up_task(struct rpc_task *task)
  391. {
  392. rcu_read_lock_bh();
  393. if (rpc_start_wakeup(task)) {
  394. if (RPC_IS_QUEUED(task)) {
  395. struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
  396. /* Note: we're already in a bh-safe context */
  397. spin_lock(&queue->lock);
  398. __rpc_do_wake_up_task(task);
  399. spin_unlock(&queue->lock);
  400. }
  401. rpc_finish_wakeup(task);
  402. }
  403. rcu_read_unlock_bh();
  404. }
  405. /*
  406. * Wake up the next task on a priority queue.
  407. */
  408. static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
  409. {
  410. struct list_head *q;
  411. struct rpc_task *task;
  412. /*
  413. * Service a batch of tasks from a single cookie.
  414. */
  415. q = &queue->tasks[queue->priority];
  416. if (!list_empty(q)) {
  417. task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
  418. if (queue->cookie == task->tk_cookie) {
  419. if (--queue->nr)
  420. goto out;
  421. list_move_tail(&task->u.tk_wait.list, q);
  422. }
  423. /*
  424. * Check if we need to switch queues.
  425. */
  426. if (--queue->count)
  427. goto new_cookie;
  428. }
  429. /*
  430. * Service the next queue.
  431. */
  432. do {
  433. if (q == &queue->tasks[0])
  434. q = &queue->tasks[queue->maxpriority];
  435. else
  436. q = q - 1;
  437. if (!list_empty(q)) {
  438. task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
  439. goto new_queue;
  440. }
  441. } while (q != &queue->tasks[queue->priority]);
  442. rpc_reset_waitqueue_priority(queue);
  443. return NULL;
  444. new_queue:
  445. rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
  446. new_cookie:
  447. rpc_set_waitqueue_cookie(queue, task->tk_cookie);
  448. out:
  449. __rpc_wake_up_task(task);
  450. return task;
  451. }
  452. /*
  453. * Wake up the next task on the wait queue.
  454. */
  455. struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
  456. {
  457. struct rpc_task *task = NULL;
  458. dprintk("RPC: wake_up_next(%p \"%s\")\n",
  459. queue, rpc_qname(queue));
  460. rcu_read_lock_bh();
  461. spin_lock(&queue->lock);
  462. if (RPC_IS_PRIORITY(queue))
  463. task = __rpc_wake_up_next_priority(queue);
  464. else {
  465. task_for_first(task, &queue->tasks[0])
  466. __rpc_wake_up_task(task);
  467. }
  468. spin_unlock(&queue->lock);
  469. rcu_read_unlock_bh();
  470. return task;
  471. }
  472. /**
  473. * rpc_wake_up - wake up all rpc_tasks
  474. * @queue: rpc_wait_queue on which the tasks are sleeping
  475. *
  476. * Grabs queue->lock
  477. */
  478. void rpc_wake_up(struct rpc_wait_queue *queue)
  479. {
  480. struct rpc_task *task, *next;
  481. struct list_head *head;
  482. rcu_read_lock_bh();
  483. spin_lock(&queue->lock);
  484. head = &queue->tasks[queue->maxpriority];
  485. for (;;) {
  486. list_for_each_entry_safe(task, next, head, u.tk_wait.list)
  487. __rpc_wake_up_task(task);
  488. if (head == &queue->tasks[0])
  489. break;
  490. head--;
  491. }
  492. spin_unlock(&queue->lock);
  493. rcu_read_unlock_bh();
  494. }
  495. /**
  496. * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
  497. * @queue: rpc_wait_queue on which the tasks are sleeping
  498. * @status: status value to set
  499. *
  500. * Grabs queue->lock
  501. */
  502. void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
  503. {
  504. struct rpc_task *task, *next;
  505. struct list_head *head;
  506. rcu_read_lock_bh();
  507. spin_lock(&queue->lock);
  508. head = &queue->tasks[queue->maxpriority];
  509. for (;;) {
  510. list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
  511. task->tk_status = status;
  512. __rpc_wake_up_task(task);
  513. }
  514. if (head == &queue->tasks[0])
  515. break;
  516. head--;
  517. }
  518. spin_unlock(&queue->lock);
  519. rcu_read_unlock_bh();
  520. }
  521. static void __rpc_atrun(struct rpc_task *task)
  522. {
  523. rpc_wake_up_task(task);
  524. }
  525. /*
  526. * Run a task at a later time
  527. */
  528. void rpc_delay(struct rpc_task *task, unsigned long delay)
  529. {
  530. task->tk_timeout = delay;
  531. rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
  532. }
  533. /*
  534. * Helper to call task->tk_ops->rpc_call_prepare
  535. */
  536. static void rpc_prepare_task(struct rpc_task *task)
  537. {
  538. lock_kernel();
  539. task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
  540. unlock_kernel();
  541. }
  542. /*
  543. * Helper that calls task->tk_ops->rpc_call_done if it exists
  544. */
  545. void rpc_exit_task(struct rpc_task *task)
  546. {
  547. task->tk_action = NULL;
  548. if (task->tk_ops->rpc_call_done != NULL) {
  549. lock_kernel();
  550. task->tk_ops->rpc_call_done(task, task->tk_calldata);
  551. unlock_kernel();
  552. if (task->tk_action != NULL) {
  553. WARN_ON(RPC_ASSASSINATED(task));
  554. /* Always release the RPC slot and buffer memory */
  555. xprt_release(task);
  556. }
  557. }
  558. }
  559. EXPORT_SYMBOL(rpc_exit_task);
  560. void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
  561. {
  562. if (ops->rpc_release != NULL) {
  563. lock_kernel();
  564. ops->rpc_release(calldata);
  565. unlock_kernel();
  566. }
  567. }
  568. /*
  569. * This is the RPC `scheduler' (or rather, the finite state machine).
  570. */
  571. static void __rpc_execute(struct rpc_task *task)
  572. {
  573. int status = 0;
  574. dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
  575. task->tk_pid, task->tk_flags);
  576. BUG_ON(RPC_IS_QUEUED(task));
  577. for (;;) {
  578. /*
  579. * Garbage collection of pending timers...
  580. */
  581. rpc_delete_timer(task);
  582. /*
  583. * Execute any pending callback.
  584. */
  585. if (RPC_DO_CALLBACK(task)) {
  586. /* Define a callback save pointer */
  587. void (*save_callback)(struct rpc_task *);
  588. /*
  589. * If a callback exists, save it, reset it,
  590. * call it.
  591. * The save is needed to stop from resetting
  592. * another callback set within the callback handler
  593. * - Dave
  594. */
  595. save_callback=task->tk_callback;
  596. task->tk_callback=NULL;
  597. save_callback(task);
  598. }
  599. /*
  600. * Perform the next FSM step.
  601. * tk_action may be NULL when the task has been killed
  602. * by someone else.
  603. */
  604. if (!RPC_IS_QUEUED(task)) {
  605. if (task->tk_action == NULL)
  606. break;
  607. task->tk_action(task);
  608. }
  609. /*
  610. * Lockless check for whether task is sleeping or not.
  611. */
  612. if (!RPC_IS_QUEUED(task))
  613. continue;
  614. rpc_clear_running(task);
  615. if (RPC_IS_ASYNC(task)) {
  616. /* Careful! we may have raced... */
  617. if (RPC_IS_QUEUED(task))
  618. return;
  619. if (rpc_test_and_set_running(task))
  620. return;
  621. continue;
  622. }
  623. /* sync task: sleep here */
  624. dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
  625. /* Note: Caller should be using rpc_clnt_sigmask() */
  626. status = out_of_line_wait_on_bit(&task->tk_runstate,
  627. RPC_TASK_QUEUED, rpc_wait_bit_interruptible,
  628. TASK_INTERRUPTIBLE);
  629. if (status == -ERESTARTSYS) {
  630. /*
  631. * When a sync task receives a signal, it exits with
  632. * -ERESTARTSYS. In order to catch any callbacks that
  633. * clean up after sleeping on some queue, we don't
  634. * break the loop here, but go around once more.
  635. */
  636. dprintk("RPC: %5u got signal\n", task->tk_pid);
  637. task->tk_flags |= RPC_TASK_KILLED;
  638. rpc_exit(task, -ERESTARTSYS);
  639. rpc_wake_up_task(task);
  640. }
  641. rpc_set_running(task);
  642. dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
  643. }
  644. dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
  645. task->tk_status);
  646. /* Release all resources associated with the task */
  647. rpc_release_task(task);
  648. }
  649. /*
  650. * User-visible entry point to the scheduler.
  651. *
  652. * This may be called recursively if e.g. an async NFS task updates
  653. * the attributes and finds that dirty pages must be flushed.
  654. * NOTE: Upon exit of this function the task is guaranteed to be
  655. * released. In particular note that tk_release() will have
  656. * been called, so your task memory may have been freed.
  657. */
  658. void rpc_execute(struct rpc_task *task)
  659. {
  660. rpc_set_active(task);
  661. rpc_set_running(task);
  662. __rpc_execute(task);
  663. }
  664. static void rpc_async_schedule(struct work_struct *work)
  665. {
  666. __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
  667. }
  668. struct rpc_buffer {
  669. size_t len;
  670. char data[];
  671. };
  672. /**
  673. * rpc_malloc - allocate an RPC buffer
  674. * @task: RPC task that will use this buffer
  675. * @size: requested byte size
  676. *
  677. * To prevent rpciod from hanging, this allocator never sleeps,
  678. * returning NULL if the request cannot be serviced immediately.
  679. * The caller can arrange to sleep in a way that is safe for rpciod.
  680. *
  681. * Most requests are 'small' (under 2KiB) and can be serviced from a
  682. * mempool, ensuring that NFS reads and writes can always proceed,
  683. * and that there is good locality of reference for these buffers.
  684. *
  685. * In order to avoid memory starvation triggering more writebacks of
  686. * NFS requests, we avoid using GFP_KERNEL.
  687. */
  688. void *rpc_malloc(struct rpc_task *task, size_t size)
  689. {
  690. struct rpc_buffer *buf;
  691. gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
  692. size += sizeof(struct rpc_buffer);
  693. if (size <= RPC_BUFFER_MAXSIZE)
  694. buf = mempool_alloc(rpc_buffer_mempool, gfp);
  695. else
  696. buf = kmalloc(size, gfp);
  697. if (!buf)
  698. return NULL;
  699. buf->len = size;
  700. dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
  701. task->tk_pid, size, buf);
  702. return &buf->data;
  703. }
  704. /**
  705. * rpc_free - free buffer allocated via rpc_malloc
  706. * @buffer: buffer to free
  707. *
  708. */
  709. void rpc_free(void *buffer)
  710. {
  711. size_t size;
  712. struct rpc_buffer *buf;
  713. if (!buffer)
  714. return;
  715. buf = container_of(buffer, struct rpc_buffer, data);
  716. size = buf->len;
  717. dprintk("RPC: freeing buffer of size %zu at %p\n",
  718. size, buf);
  719. if (size <= RPC_BUFFER_MAXSIZE)
  720. mempool_free(buf, rpc_buffer_mempool);
  721. else
  722. kfree(buf);
  723. }
  724. /*
  725. * Creation and deletion of RPC task structures
  726. */
  727. void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
  728. {
  729. memset(task, 0, sizeof(*task));
  730. init_timer(&task->tk_timer);
  731. task->tk_timer.data = (unsigned long) task;
  732. task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
  733. atomic_set(&task->tk_count, 1);
  734. task->tk_client = clnt;
  735. task->tk_flags = flags;
  736. task->tk_ops = tk_ops;
  737. if (tk_ops->rpc_call_prepare != NULL)
  738. task->tk_action = rpc_prepare_task;
  739. task->tk_calldata = calldata;
  740. INIT_LIST_HEAD(&task->tk_task);
  741. /* Initialize retry counters */
  742. task->tk_garb_retry = 2;
  743. task->tk_cred_retry = 2;
  744. task->tk_priority = RPC_PRIORITY_NORMAL;
  745. task->tk_cookie = (unsigned long)current;
  746. /* Initialize workqueue for async tasks */
  747. task->tk_workqueue = rpciod_workqueue;
  748. if (clnt) {
  749. kref_get(&clnt->cl_kref);
  750. if (clnt->cl_softrtry)
  751. task->tk_flags |= RPC_TASK_SOFT;
  752. if (!clnt->cl_intr)
  753. task->tk_flags |= RPC_TASK_NOINTR;
  754. }
  755. BUG_ON(task->tk_ops == NULL);
  756. /* starting timestamp */
  757. task->tk_start = jiffies;
  758. dprintk("RPC: new task initialized, procpid %u\n",
  759. current->pid);
  760. }
  761. static struct rpc_task *
  762. rpc_alloc_task(void)
  763. {
  764. return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
  765. }
  766. static void rpc_free_task(struct rcu_head *rcu)
  767. {
  768. struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
  769. dprintk("RPC: %5u freeing task\n", task->tk_pid);
  770. mempool_free(task, rpc_task_mempool);
  771. }
  772. /*
  773. * Create a new task for the specified client.
  774. */
  775. struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
  776. {
  777. struct rpc_task *task;
  778. task = rpc_alloc_task();
  779. if (!task)
  780. goto out;
  781. rpc_init_task(task, clnt, flags, tk_ops, calldata);
  782. dprintk("RPC: allocated task %p\n", task);
  783. task->tk_flags |= RPC_TASK_DYNAMIC;
  784. out:
  785. return task;
  786. }
  787. void rpc_put_task(struct rpc_task *task)
  788. {
  789. const struct rpc_call_ops *tk_ops = task->tk_ops;
  790. void *calldata = task->tk_calldata;
  791. if (!atomic_dec_and_test(&task->tk_count))
  792. return;
  793. /* Release resources */
  794. if (task->tk_rqstp)
  795. xprt_release(task);
  796. if (task->tk_msg.rpc_cred)
  797. rpcauth_unbindcred(task);
  798. if (task->tk_client) {
  799. rpc_release_client(task->tk_client);
  800. task->tk_client = NULL;
  801. }
  802. if (task->tk_flags & RPC_TASK_DYNAMIC)
  803. call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
  804. rpc_release_calldata(tk_ops, calldata);
  805. }
  806. EXPORT_SYMBOL(rpc_put_task);
  807. static void rpc_release_task(struct rpc_task *task)
  808. {
  809. #ifdef RPC_DEBUG
  810. BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
  811. #endif
  812. dprintk("RPC: %5u release task\n", task->tk_pid);
  813. if (!list_empty(&task->tk_task)) {
  814. struct rpc_clnt *clnt = task->tk_client;
  815. /* Remove from client task list */
  816. spin_lock(&clnt->cl_lock);
  817. list_del(&task->tk_task);
  818. spin_unlock(&clnt->cl_lock);
  819. }
  820. BUG_ON (RPC_IS_QUEUED(task));
  821. /* Synchronously delete any running timer */
  822. rpc_delete_timer(task);
  823. #ifdef RPC_DEBUG
  824. task->tk_magic = 0;
  825. #endif
  826. /* Wake up anyone who is waiting for task completion */
  827. rpc_mark_complete_task(task);
  828. rpc_put_task(task);
  829. }
  830. /**
  831. * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
  832. * @clnt: pointer to RPC client
  833. * @flags: RPC flags
  834. * @ops: RPC call ops
  835. * @data: user call data
  836. */
  837. struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
  838. const struct rpc_call_ops *ops,
  839. void *data)
  840. {
  841. struct rpc_task *task;
  842. task = rpc_new_task(clnt, flags, ops, data);
  843. if (task == NULL) {
  844. rpc_release_calldata(ops, data);
  845. return ERR_PTR(-ENOMEM);
  846. }
  847. atomic_inc(&task->tk_count);
  848. rpc_execute(task);
  849. return task;
  850. }
  851. EXPORT_SYMBOL(rpc_run_task);
  852. /*
  853. * Kill all tasks for the given client.
  854. * XXX: kill their descendants as well?
  855. */
  856. void rpc_killall_tasks(struct rpc_clnt *clnt)
  857. {
  858. struct rpc_task *rovr;
  859. if (list_empty(&clnt->cl_tasks))
  860. return;
  861. dprintk("RPC: killing all tasks for client %p\n", clnt);
  862. /*
  863. * Spin lock all_tasks to prevent changes...
  864. */
  865. spin_lock(&clnt->cl_lock);
  866. list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
  867. if (! RPC_IS_ACTIVATED(rovr))
  868. continue;
  869. if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
  870. rovr->tk_flags |= RPC_TASK_KILLED;
  871. rpc_exit(rovr, -EIO);
  872. rpc_wake_up_task(rovr);
  873. }
  874. }
  875. spin_unlock(&clnt->cl_lock);
  876. }
  877. static void rpciod_killall(void)
  878. {
  879. struct rpc_clnt *clnt;
  880. unsigned long flags;
  881. for(;;) {
  882. clear_thread_flag(TIF_SIGPENDING);
  883. spin_lock(&rpc_sched_lock);
  884. list_for_each_entry(clnt, &all_clients, cl_clients)
  885. rpc_killall_tasks(clnt);
  886. spin_unlock(&rpc_sched_lock);
  887. flush_workqueue(rpciod_workqueue);
  888. if (!list_empty(&all_clients))
  889. break;
  890. dprintk("RPC: rpciod_killall: waiting for tasks "
  891. "to exit\n");
  892. wait_event_timeout(client_kill_wait,
  893. list_empty(&all_clients), 1*HZ);
  894. }
  895. spin_lock_irqsave(&current->sighand->siglock, flags);
  896. recalc_sigpending();
  897. spin_unlock_irqrestore(&current->sighand->siglock, flags);
  898. }
  899. void rpc_register_client(struct rpc_clnt *clnt)
  900. {
  901. spin_lock(&rpc_sched_lock);
  902. list_add(&clnt->cl_clients, &all_clients);
  903. spin_unlock(&rpc_sched_lock);
  904. }
  905. void rpc_unregister_client(struct rpc_clnt *clnt)
  906. {
  907. spin_lock(&rpc_sched_lock);
  908. list_del(&clnt->cl_clients);
  909. if (list_empty(&all_clients))
  910. wake_up(&client_kill_wait);
  911. spin_unlock(&rpc_sched_lock);
  912. }
  913. /*
  914. * Start up the rpciod process if it's not already running.
  915. */
  916. int
  917. rpciod_up(void)
  918. {
  919. struct workqueue_struct *wq;
  920. int error = 0;
  921. mutex_lock(&rpciod_mutex);
  922. dprintk("RPC: rpciod_up: users %u\n", rpciod_users);
  923. rpciod_users++;
  924. if (rpciod_workqueue)
  925. goto out;
  926. /*
  927. * If there's no pid, we should be the first user.
  928. */
  929. if (rpciod_users > 1)
  930. printk(KERN_WARNING "rpciod_up: no workqueue, %u users??\n", rpciod_users);
  931. /*
  932. * Create the rpciod thread and wait for it to start.
  933. */
  934. error = -ENOMEM;
  935. wq = create_workqueue("rpciod");
  936. if (wq == NULL) {
  937. printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
  938. rpciod_users--;
  939. goto out;
  940. }
  941. rpciod_workqueue = wq;
  942. error = 0;
  943. out:
  944. mutex_unlock(&rpciod_mutex);
  945. return error;
  946. }
  947. void
  948. rpciod_down(void)
  949. {
  950. mutex_lock(&rpciod_mutex);
  951. dprintk("RPC: rpciod_down sema %u\n", rpciod_users);
  952. if (rpciod_users) {
  953. if (--rpciod_users)
  954. goto out;
  955. } else
  956. printk(KERN_WARNING "rpciod_down: no users??\n");
  957. if (!rpciod_workqueue) {
  958. dprintk("RPC: rpciod_down: Nothing to do!\n");
  959. goto out;
  960. }
  961. rpciod_killall();
  962. destroy_workqueue(rpciod_workqueue);
  963. rpciod_workqueue = NULL;
  964. out:
  965. mutex_unlock(&rpciod_mutex);
  966. }
  967. #ifdef RPC_DEBUG
  968. void rpc_show_tasks(void)
  969. {
  970. struct rpc_clnt *clnt;
  971. struct rpc_task *t;
  972. spin_lock(&rpc_sched_lock);
  973. if (list_empty(&all_clients))
  974. goto out;
  975. printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
  976. "-rpcwait -action- ---ops--\n");
  977. list_for_each_entry(clnt, &all_clients, cl_clients) {
  978. if (list_empty(&clnt->cl_tasks))
  979. continue;
  980. spin_lock(&clnt->cl_lock);
  981. list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
  982. const char *rpc_waitq = "none";
  983. if (RPC_IS_QUEUED(t))
  984. rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
  985. printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
  986. t->tk_pid,
  987. (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
  988. t->tk_flags, t->tk_status,
  989. t->tk_client,
  990. (t->tk_client ? t->tk_client->cl_prog : 0),
  991. t->tk_rqstp, t->tk_timeout,
  992. rpc_waitq,
  993. t->tk_action, t->tk_ops);
  994. }
  995. spin_unlock(&clnt->cl_lock);
  996. }
  997. out:
  998. spin_unlock(&rpc_sched_lock);
  999. }
  1000. #endif
  1001. void
  1002. rpc_destroy_mempool(void)
  1003. {
  1004. if (rpc_buffer_mempool)
  1005. mempool_destroy(rpc_buffer_mempool);
  1006. if (rpc_task_mempool)
  1007. mempool_destroy(rpc_task_mempool);
  1008. if (rpc_task_slabp)
  1009. kmem_cache_destroy(rpc_task_slabp);
  1010. if (rpc_buffer_slabp)
  1011. kmem_cache_destroy(rpc_buffer_slabp);
  1012. }
  1013. int
  1014. rpc_init_mempool(void)
  1015. {
  1016. rpc_task_slabp = kmem_cache_create("rpc_tasks",
  1017. sizeof(struct rpc_task),
  1018. 0, SLAB_HWCACHE_ALIGN,
  1019. NULL, NULL);
  1020. if (!rpc_task_slabp)
  1021. goto err_nomem;
  1022. rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
  1023. RPC_BUFFER_MAXSIZE,
  1024. 0, SLAB_HWCACHE_ALIGN,
  1025. NULL, NULL);
  1026. if (!rpc_buffer_slabp)
  1027. goto err_nomem;
  1028. rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
  1029. rpc_task_slabp);
  1030. if (!rpc_task_mempool)
  1031. goto err_nomem;
  1032. rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
  1033. rpc_buffer_slabp);
  1034. if (!rpc_buffer_mempool)
  1035. goto err_nomem;
  1036. return 0;
  1037. err_nomem:
  1038. rpc_destroy_mempool();
  1039. return -ENOMEM;
  1040. }