sched.c 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142
  1. /*
  2. * linux/net/sunrpc/sched.c
  3. *
  4. * Scheduling for synchronous and asynchronous RPC requests.
  5. *
  6. * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
  7. *
  8. * TCP NFS related read + write fixes
  9. * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
  10. */
  11. #include <linux/module.h>
  12. #include <linux/sched.h>
  13. #include <linux/interrupt.h>
  14. #include <linux/slab.h>
  15. #include <linux/mempool.h>
  16. #include <linux/smp.h>
  17. #include <linux/spinlock.h>
  18. #include <linux/mutex.h>
  19. #include <linux/freezer.h>
  20. #include <linux/sunrpc/clnt.h>
  21. #include "sunrpc.h"
  22. #ifdef RPC_DEBUG
  23. #define RPCDBG_FACILITY RPCDBG_SCHED
  24. #endif
  25. #define CREATE_TRACE_POINTS
  26. #include <trace/events/sunrpc.h>
  27. /*
  28. * RPC slabs and memory pools
  29. */
  30. #define RPC_BUFFER_MAXSIZE (2048)
  31. #define RPC_BUFFER_POOLSIZE (8)
  32. #define RPC_TASK_POOLSIZE (8)
  33. static struct kmem_cache *rpc_task_slabp __read_mostly;
  34. static struct kmem_cache *rpc_buffer_slabp __read_mostly;
  35. static mempool_t *rpc_task_mempool __read_mostly;
  36. static mempool_t *rpc_buffer_mempool __read_mostly;
  37. static void rpc_async_schedule(struct work_struct *);
  38. static void rpc_release_task(struct rpc_task *task);
  39. static void __rpc_queue_timer_fn(unsigned long ptr);
  40. /*
  41. * RPC tasks sit here while waiting for conditions to improve.
  42. */
  43. static struct rpc_wait_queue delay_queue;
  44. /*
  45. * rpciod-related stuff
  46. */
  47. struct workqueue_struct *rpciod_workqueue;
  48. /*
  49. * Disable the timer for a given RPC task. Should be called with
  50. * queue->lock and bh_disabled in order to avoid races within
  51. * rpc_run_timer().
  52. */
  53. static void
  54. __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
  55. {
  56. if (task->tk_timeout == 0)
  57. return;
  58. dprintk("RPC: %5u disabling timer\n", task->tk_pid);
  59. task->tk_timeout = 0;
  60. list_del(&task->u.tk_wait.timer_list);
  61. if (list_empty(&queue->timer_list.list))
  62. del_timer(&queue->timer_list.timer);
  63. }
  64. static void
  65. rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
  66. {
  67. queue->timer_list.expires = expires;
  68. mod_timer(&queue->timer_list.timer, expires);
  69. }
  70. /*
  71. * Set up a timer for the current task.
  72. */
  73. static void
  74. __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
  75. {
  76. if (!task->tk_timeout)
  77. return;
  78. dprintk("RPC: %5u setting alarm for %lu ms\n",
  79. task->tk_pid, task->tk_timeout * 1000 / HZ);
  80. task->u.tk_wait.expires = jiffies + task->tk_timeout;
  81. if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
  82. rpc_set_queue_timer(queue, task->u.tk_wait.expires);
  83. list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
  84. }
  85. static void rpc_rotate_queue_owner(struct rpc_wait_queue *queue)
  86. {
  87. struct list_head *q = &queue->tasks[queue->priority];
  88. struct rpc_task *task;
  89. if (!list_empty(q)) {
  90. task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
  91. if (task->tk_owner == queue->owner)
  92. list_move_tail(&task->u.tk_wait.list, q);
  93. }
  94. }
  95. static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
  96. {
  97. if (queue->priority != priority) {
  98. /* Fairness: rotate the list when changing priority */
  99. rpc_rotate_queue_owner(queue);
  100. queue->priority = priority;
  101. }
  102. }
  103. static void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
  104. {
  105. queue->owner = pid;
  106. queue->nr = RPC_BATCH_COUNT;
  107. }
  108. static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
  109. {
  110. rpc_set_waitqueue_priority(queue, queue->maxpriority);
  111. rpc_set_waitqueue_owner(queue, 0);
  112. }
  113. /*
  114. * Add new request to a priority queue.
  115. */
  116. static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
  117. struct rpc_task *task,
  118. unsigned char queue_priority)
  119. {
  120. struct list_head *q;
  121. struct rpc_task *t;
  122. INIT_LIST_HEAD(&task->u.tk_wait.links);
  123. if (unlikely(queue_priority > queue->maxpriority))
  124. queue_priority = queue->maxpriority;
  125. if (queue_priority > queue->priority)
  126. rpc_set_waitqueue_priority(queue, queue_priority);
  127. q = &queue->tasks[queue_priority];
  128. list_for_each_entry(t, q, u.tk_wait.list) {
  129. if (t->tk_owner == task->tk_owner) {
  130. list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
  131. return;
  132. }
  133. }
  134. list_add_tail(&task->u.tk_wait.list, q);
  135. }
  136. /*
  137. * Add new request to wait queue.
  138. *
  139. * Swapper tasks always get inserted at the head of the queue.
  140. * This should avoid many nasty memory deadlocks and hopefully
  141. * improve overall performance.
  142. * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  143. */
  144. static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
  145. struct rpc_task *task,
  146. unsigned char queue_priority)
  147. {
  148. WARN_ON_ONCE(RPC_IS_QUEUED(task));
  149. if (RPC_IS_QUEUED(task))
  150. return;
  151. if (RPC_IS_PRIORITY(queue))
  152. __rpc_add_wait_queue_priority(queue, task, queue_priority);
  153. else if (RPC_IS_SWAPPER(task))
  154. list_add(&task->u.tk_wait.list, &queue->tasks[0]);
  155. else
  156. list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
  157. task->tk_waitqueue = queue;
  158. queue->qlen++;
  159. rpc_set_queued(task);
  160. dprintk("RPC: %5u added to queue %p \"%s\"\n",
  161. task->tk_pid, queue, rpc_qname(queue));
  162. }
  163. /*
  164. * Remove request from a priority queue.
  165. */
  166. static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
  167. {
  168. struct rpc_task *t;
  169. if (!list_empty(&task->u.tk_wait.links)) {
  170. t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
  171. list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
  172. list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
  173. }
  174. }
  175. /*
  176. * Remove request from queue.
  177. * Note: must be called with spin lock held.
  178. */
  179. static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
  180. {
  181. __rpc_disable_timer(queue, task);
  182. if (RPC_IS_PRIORITY(queue))
  183. __rpc_remove_wait_queue_priority(task);
  184. list_del(&task->u.tk_wait.list);
  185. queue->qlen--;
  186. dprintk("RPC: %5u removed from queue %p \"%s\"\n",
  187. task->tk_pid, queue, rpc_qname(queue));
  188. }
  189. static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
  190. {
  191. int i;
  192. spin_lock_init(&queue->lock);
  193. for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
  194. INIT_LIST_HEAD(&queue->tasks[i]);
  195. queue->maxpriority = nr_queues - 1;
  196. rpc_reset_waitqueue_priority(queue);
  197. queue->qlen = 0;
  198. setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
  199. INIT_LIST_HEAD(&queue->timer_list.list);
  200. rpc_assign_waitqueue_name(queue, qname);
  201. }
  202. void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
  203. {
  204. __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
  205. }
  206. EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
  207. void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
  208. {
  209. __rpc_init_priority_wait_queue(queue, qname, 1);
  210. }
  211. EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
  212. void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
  213. {
  214. del_timer_sync(&queue->timer_list.timer);
  215. }
  216. EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
  217. static int rpc_wait_bit_killable(void *word)
  218. {
  219. if (fatal_signal_pending(current))
  220. return -ERESTARTSYS;
  221. freezable_schedule();
  222. return 0;
  223. }
  224. #ifdef RPC_DEBUG
  225. static void rpc_task_set_debuginfo(struct rpc_task *task)
  226. {
  227. static atomic_t rpc_pid;
  228. task->tk_pid = atomic_inc_return(&rpc_pid);
  229. }
  230. #else
  231. static inline void rpc_task_set_debuginfo(struct rpc_task *task)
  232. {
  233. }
  234. #endif
  235. static void rpc_set_active(struct rpc_task *task)
  236. {
  237. trace_rpc_task_begin(task->tk_client, task, NULL);
  238. rpc_task_set_debuginfo(task);
  239. set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
  240. }
  241. /*
  242. * Mark an RPC call as having completed by clearing the 'active' bit
  243. * and then waking up all tasks that were sleeping.
  244. */
  245. static int rpc_complete_task(struct rpc_task *task)
  246. {
  247. void *m = &task->tk_runstate;
  248. wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
  249. struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
  250. unsigned long flags;
  251. int ret;
  252. trace_rpc_task_complete(task->tk_client, task, NULL);
  253. spin_lock_irqsave(&wq->lock, flags);
  254. clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
  255. ret = atomic_dec_and_test(&task->tk_count);
  256. if (waitqueue_active(wq))
  257. __wake_up_locked_key(wq, TASK_NORMAL, &k);
  258. spin_unlock_irqrestore(&wq->lock, flags);
  259. return ret;
  260. }
  261. /*
  262. * Allow callers to wait for completion of an RPC call
  263. *
  264. * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
  265. * to enforce taking of the wq->lock and hence avoid races with
  266. * rpc_complete_task().
  267. */
  268. int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
  269. {
  270. if (action == NULL)
  271. action = rpc_wait_bit_killable;
  272. return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
  273. action, TASK_KILLABLE);
  274. }
  275. EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
  276. /*
  277. * Make an RPC task runnable.
  278. *
  279. * Note: If the task is ASYNC, and is being made runnable after sitting on an
  280. * rpc_wait_queue, this must be called with the queue spinlock held to protect
  281. * the wait queue operation.
  282. */
  283. static void rpc_make_runnable(struct rpc_task *task)
  284. {
  285. rpc_clear_queued(task);
  286. if (rpc_test_and_set_running(task))
  287. return;
  288. if (RPC_IS_ASYNC(task)) {
  289. INIT_WORK(&task->u.tk_work, rpc_async_schedule);
  290. queue_work(rpciod_workqueue, &task->u.tk_work);
  291. } else
  292. wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
  293. }
  294. /*
  295. * Prepare for sleeping on a wait queue.
  296. * By always appending tasks to the list we ensure FIFO behavior.
  297. * NB: An RPC task will only receive interrupt-driven events as long
  298. * as it's on a wait queue.
  299. */
  300. static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
  301. struct rpc_task *task,
  302. rpc_action action,
  303. unsigned char queue_priority)
  304. {
  305. dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
  306. task->tk_pid, rpc_qname(q), jiffies);
  307. trace_rpc_task_sleep(task->tk_client, task, q);
  308. __rpc_add_wait_queue(q, task, queue_priority);
  309. WARN_ON_ONCE(task->tk_callback != NULL);
  310. task->tk_callback = action;
  311. __rpc_add_timer(q, task);
  312. }
  313. void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
  314. rpc_action action)
  315. {
  316. /* We shouldn't ever put an inactive task to sleep */
  317. WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
  318. if (!RPC_IS_ACTIVATED(task)) {
  319. task->tk_status = -EIO;
  320. rpc_put_task_async(task);
  321. return;
  322. }
  323. /*
  324. * Protect the queue operations.
  325. */
  326. spin_lock_bh(&q->lock);
  327. __rpc_sleep_on_priority(q, task, action, task->tk_priority);
  328. spin_unlock_bh(&q->lock);
  329. }
  330. EXPORT_SYMBOL_GPL(rpc_sleep_on);
  331. void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
  332. rpc_action action, int priority)
  333. {
  334. /* We shouldn't ever put an inactive task to sleep */
  335. WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
  336. if (!RPC_IS_ACTIVATED(task)) {
  337. task->tk_status = -EIO;
  338. rpc_put_task_async(task);
  339. return;
  340. }
  341. /*
  342. * Protect the queue operations.
  343. */
  344. spin_lock_bh(&q->lock);
  345. __rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW);
  346. spin_unlock_bh(&q->lock);
  347. }
  348. EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
  349. /**
  350. * __rpc_do_wake_up_task - wake up a single rpc_task
  351. * @queue: wait queue
  352. * @task: task to be woken up
  353. *
  354. * Caller must hold queue->lock, and have cleared the task queued flag.
  355. */
  356. static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
  357. {
  358. dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
  359. task->tk_pid, jiffies);
  360. /* Has the task been executed yet? If not, we cannot wake it up! */
  361. if (!RPC_IS_ACTIVATED(task)) {
  362. printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
  363. return;
  364. }
  365. trace_rpc_task_wakeup(task->tk_client, task, queue);
  366. __rpc_remove_wait_queue(queue, task);
  367. rpc_make_runnable(task);
  368. dprintk("RPC: __rpc_wake_up_task done\n");
  369. }
  370. /*
  371. * Wake up a queued task while the queue lock is being held
  372. */
  373. static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
  374. {
  375. if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
  376. __rpc_do_wake_up_task(queue, task);
  377. }
  378. /*
  379. * Tests whether rpc queue is empty
  380. */
  381. int rpc_queue_empty(struct rpc_wait_queue *queue)
  382. {
  383. int res;
  384. spin_lock_bh(&queue->lock);
  385. res = queue->qlen;
  386. spin_unlock_bh(&queue->lock);
  387. return res == 0;
  388. }
  389. EXPORT_SYMBOL_GPL(rpc_queue_empty);
  390. /*
  391. * Wake up a task on a specific queue
  392. */
  393. void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
  394. {
  395. spin_lock_bh(&queue->lock);
  396. rpc_wake_up_task_queue_locked(queue, task);
  397. spin_unlock_bh(&queue->lock);
  398. }
  399. EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
  400. /*
  401. * Wake up the next task on a priority queue.
  402. */
  403. static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
  404. {
  405. struct list_head *q;
  406. struct rpc_task *task;
  407. /*
  408. * Service a batch of tasks from a single owner.
  409. */
  410. q = &queue->tasks[queue->priority];
  411. if (!list_empty(q)) {
  412. task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
  413. if (queue->owner == task->tk_owner) {
  414. if (--queue->nr)
  415. goto out;
  416. list_move_tail(&task->u.tk_wait.list, q);
  417. }
  418. /*
  419. * Check if we need to switch queues.
  420. */
  421. goto new_owner;
  422. }
  423. /*
  424. * Service the next queue.
  425. */
  426. do {
  427. if (q == &queue->tasks[0])
  428. q = &queue->tasks[queue->maxpriority];
  429. else
  430. q = q - 1;
  431. if (!list_empty(q)) {
  432. task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
  433. goto new_queue;
  434. }
  435. } while (q != &queue->tasks[queue->priority]);
  436. rpc_reset_waitqueue_priority(queue);
  437. return NULL;
  438. new_queue:
  439. rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
  440. new_owner:
  441. rpc_set_waitqueue_owner(queue, task->tk_owner);
  442. out:
  443. return task;
  444. }
  445. static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
  446. {
  447. if (RPC_IS_PRIORITY(queue))
  448. return __rpc_find_next_queued_priority(queue);
  449. if (!list_empty(&queue->tasks[0]))
  450. return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
  451. return NULL;
  452. }
  453. /*
  454. * Wake up the first task on the wait queue.
  455. */
  456. struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
  457. bool (*func)(struct rpc_task *, void *), void *data)
  458. {
  459. struct rpc_task *task = NULL;
  460. dprintk("RPC: wake_up_first(%p \"%s\")\n",
  461. queue, rpc_qname(queue));
  462. spin_lock_bh(&queue->lock);
  463. task = __rpc_find_next_queued(queue);
  464. if (task != NULL) {
  465. if (func(task, data))
  466. rpc_wake_up_task_queue_locked(queue, task);
  467. else
  468. task = NULL;
  469. }
  470. spin_unlock_bh(&queue->lock);
  471. return task;
  472. }
  473. EXPORT_SYMBOL_GPL(rpc_wake_up_first);
  474. static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
  475. {
  476. return true;
  477. }
  478. /*
  479. * Wake up the next task on the wait queue.
  480. */
  481. struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
  482. {
  483. return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
  484. }
  485. EXPORT_SYMBOL_GPL(rpc_wake_up_next);
  486. /**
  487. * rpc_wake_up - wake up all rpc_tasks
  488. * @queue: rpc_wait_queue on which the tasks are sleeping
  489. *
  490. * Grabs queue->lock
  491. */
  492. void rpc_wake_up(struct rpc_wait_queue *queue)
  493. {
  494. struct list_head *head;
  495. spin_lock_bh(&queue->lock);
  496. head = &queue->tasks[queue->maxpriority];
  497. for (;;) {
  498. while (!list_empty(head)) {
  499. struct rpc_task *task;
  500. task = list_first_entry(head,
  501. struct rpc_task,
  502. u.tk_wait.list);
  503. rpc_wake_up_task_queue_locked(queue, task);
  504. }
  505. if (head == &queue->tasks[0])
  506. break;
  507. head--;
  508. }
  509. spin_unlock_bh(&queue->lock);
  510. }
  511. EXPORT_SYMBOL_GPL(rpc_wake_up);
  512. /**
  513. * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
  514. * @queue: rpc_wait_queue on which the tasks are sleeping
  515. * @status: status value to set
  516. *
  517. * Grabs queue->lock
  518. */
  519. void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
  520. {
  521. struct list_head *head;
  522. spin_lock_bh(&queue->lock);
  523. head = &queue->tasks[queue->maxpriority];
  524. for (;;) {
  525. while (!list_empty(head)) {
  526. struct rpc_task *task;
  527. task = list_first_entry(head,
  528. struct rpc_task,
  529. u.tk_wait.list);
  530. task->tk_status = status;
  531. rpc_wake_up_task_queue_locked(queue, task);
  532. }
  533. if (head == &queue->tasks[0])
  534. break;
  535. head--;
  536. }
  537. spin_unlock_bh(&queue->lock);
  538. }
  539. EXPORT_SYMBOL_GPL(rpc_wake_up_status);
  540. static void __rpc_queue_timer_fn(unsigned long ptr)
  541. {
  542. struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
  543. struct rpc_task *task, *n;
  544. unsigned long expires, now, timeo;
  545. spin_lock(&queue->lock);
  546. expires = now = jiffies;
  547. list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
  548. timeo = task->u.tk_wait.expires;
  549. if (time_after_eq(now, timeo)) {
  550. dprintk("RPC: %5u timeout\n", task->tk_pid);
  551. task->tk_status = -ETIMEDOUT;
  552. rpc_wake_up_task_queue_locked(queue, task);
  553. continue;
  554. }
  555. if (expires == now || time_after(expires, timeo))
  556. expires = timeo;
  557. }
  558. if (!list_empty(&queue->timer_list.list))
  559. rpc_set_queue_timer(queue, expires);
  560. spin_unlock(&queue->lock);
  561. }
  562. static void __rpc_atrun(struct rpc_task *task)
  563. {
  564. task->tk_status = 0;
  565. }
  566. /*
  567. * Run a task at a later time
  568. */
  569. void rpc_delay(struct rpc_task *task, unsigned long delay)
  570. {
  571. task->tk_timeout = delay;
  572. rpc_sleep_on(&delay_queue, task, __rpc_atrun);
  573. }
  574. EXPORT_SYMBOL_GPL(rpc_delay);
  575. /*
  576. * Helper to call task->tk_ops->rpc_call_prepare
  577. */
  578. void rpc_prepare_task(struct rpc_task *task)
  579. {
  580. task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
  581. }
  582. static void
  583. rpc_init_task_statistics(struct rpc_task *task)
  584. {
  585. /* Initialize retry counters */
  586. task->tk_garb_retry = 2;
  587. task->tk_cred_retry = 2;
  588. task->tk_rebind_retry = 2;
  589. /* starting timestamp */
  590. task->tk_start = ktime_get();
  591. }
  592. static void
  593. rpc_reset_task_statistics(struct rpc_task *task)
  594. {
  595. task->tk_timeouts = 0;
  596. task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_KILLED|RPC_TASK_SENT);
  597. rpc_init_task_statistics(task);
  598. }
  599. /*
  600. * Helper that calls task->tk_ops->rpc_call_done if it exists
  601. */
  602. void rpc_exit_task(struct rpc_task *task)
  603. {
  604. task->tk_action = NULL;
  605. if (task->tk_ops->rpc_call_done != NULL) {
  606. task->tk_ops->rpc_call_done(task, task->tk_calldata);
  607. if (task->tk_action != NULL) {
  608. WARN_ON(RPC_ASSASSINATED(task));
  609. /* Always release the RPC slot and buffer memory */
  610. xprt_release(task);
  611. rpc_reset_task_statistics(task);
  612. }
  613. }
  614. }
  615. void rpc_exit(struct rpc_task *task, int status)
  616. {
  617. task->tk_status = status;
  618. task->tk_action = rpc_exit_task;
  619. if (RPC_IS_QUEUED(task))
  620. rpc_wake_up_queued_task(task->tk_waitqueue, task);
  621. }
  622. EXPORT_SYMBOL_GPL(rpc_exit);
  623. void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
  624. {
  625. if (ops->rpc_release != NULL)
  626. ops->rpc_release(calldata);
  627. }
  628. /*
  629. * This is the RPC `scheduler' (or rather, the finite state machine).
  630. */
  631. static void __rpc_execute(struct rpc_task *task)
  632. {
  633. struct rpc_wait_queue *queue;
  634. int task_is_async = RPC_IS_ASYNC(task);
  635. int status = 0;
  636. dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
  637. task->tk_pid, task->tk_flags);
  638. WARN_ON_ONCE(RPC_IS_QUEUED(task));
  639. if (RPC_IS_QUEUED(task))
  640. return;
  641. for (;;) {
  642. void (*do_action)(struct rpc_task *);
  643. /*
  644. * Execute any pending callback first.
  645. */
  646. do_action = task->tk_callback;
  647. task->tk_callback = NULL;
  648. if (do_action == NULL) {
  649. /*
  650. * Perform the next FSM step.
  651. * tk_action may be NULL if the task has been killed.
  652. * In particular, note that rpc_killall_tasks may
  653. * do this at any time, so beware when dereferencing.
  654. */
  655. do_action = task->tk_action;
  656. if (do_action == NULL)
  657. break;
  658. }
  659. trace_rpc_task_run_action(task->tk_client, task, task->tk_action);
  660. do_action(task);
  661. /*
  662. * Lockless check for whether task is sleeping or not.
  663. */
  664. if (!RPC_IS_QUEUED(task))
  665. continue;
  666. /*
  667. * The queue->lock protects against races with
  668. * rpc_make_runnable().
  669. *
  670. * Note that once we clear RPC_TASK_RUNNING on an asynchronous
  671. * rpc_task, rpc_make_runnable() can assign it to a
  672. * different workqueue. We therefore cannot assume that the
  673. * rpc_task pointer may still be dereferenced.
  674. */
  675. queue = task->tk_waitqueue;
  676. spin_lock_bh(&queue->lock);
  677. if (!RPC_IS_QUEUED(task)) {
  678. spin_unlock_bh(&queue->lock);
  679. continue;
  680. }
  681. rpc_clear_running(task);
  682. spin_unlock_bh(&queue->lock);
  683. if (task_is_async)
  684. return;
  685. /* sync task: sleep here */
  686. dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
  687. status = out_of_line_wait_on_bit(&task->tk_runstate,
  688. RPC_TASK_QUEUED, rpc_wait_bit_killable,
  689. TASK_KILLABLE);
  690. if (status == -ERESTARTSYS) {
  691. /*
  692. * When a sync task receives a signal, it exits with
  693. * -ERESTARTSYS. In order to catch any callbacks that
  694. * clean up after sleeping on some queue, we don't
  695. * break the loop here, but go around once more.
  696. */
  697. dprintk("RPC: %5u got signal\n", task->tk_pid);
  698. task->tk_flags |= RPC_TASK_KILLED;
  699. rpc_exit(task, -ERESTARTSYS);
  700. }
  701. rpc_set_running(task);
  702. dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
  703. }
  704. dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
  705. task->tk_status);
  706. /* Release all resources associated with the task */
  707. rpc_release_task(task);
  708. }
  709. /*
  710. * User-visible entry point to the scheduler.
  711. *
  712. * This may be called recursively if e.g. an async NFS task updates
  713. * the attributes and finds that dirty pages must be flushed.
  714. * NOTE: Upon exit of this function the task is guaranteed to be
  715. * released. In particular note that tk_release() will have
  716. * been called, so your task memory may have been freed.
  717. */
  718. void rpc_execute(struct rpc_task *task)
  719. {
  720. rpc_set_active(task);
  721. rpc_make_runnable(task);
  722. if (!RPC_IS_ASYNC(task))
  723. __rpc_execute(task);
  724. }
  725. static void rpc_async_schedule(struct work_struct *work)
  726. {
  727. current->flags |= PF_FSTRANS;
  728. __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
  729. current->flags &= ~PF_FSTRANS;
  730. }
  731. /**
  732. * rpc_malloc - allocate an RPC buffer
  733. * @task: RPC task that will use this buffer
  734. * @size: requested byte size
  735. *
  736. * To prevent rpciod from hanging, this allocator never sleeps,
  737. * returning NULL if the request cannot be serviced immediately.
  738. * The caller can arrange to sleep in a way that is safe for rpciod.
  739. *
  740. * Most requests are 'small' (under 2KiB) and can be serviced from a
  741. * mempool, ensuring that NFS reads and writes can always proceed,
  742. * and that there is good locality of reference for these buffers.
  743. *
  744. * In order to avoid memory starvation triggering more writebacks of
  745. * NFS requests, we avoid using GFP_KERNEL.
  746. */
  747. void *rpc_malloc(struct rpc_task *task, size_t size)
  748. {
  749. struct rpc_buffer *buf;
  750. gfp_t gfp = GFP_NOWAIT;
  751. if (RPC_IS_SWAPPER(task))
  752. gfp |= __GFP_MEMALLOC;
  753. size += sizeof(struct rpc_buffer);
  754. if (size <= RPC_BUFFER_MAXSIZE)
  755. buf = mempool_alloc(rpc_buffer_mempool, gfp);
  756. else
  757. buf = kmalloc(size, gfp);
  758. if (!buf)
  759. return NULL;
  760. buf->len = size;
  761. dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
  762. task->tk_pid, size, buf);
  763. return &buf->data;
  764. }
  765. EXPORT_SYMBOL_GPL(rpc_malloc);
  766. /**
  767. * rpc_free - free buffer allocated via rpc_malloc
  768. * @buffer: buffer to free
  769. *
  770. */
  771. void rpc_free(void *buffer)
  772. {
  773. size_t size;
  774. struct rpc_buffer *buf;
  775. if (!buffer)
  776. return;
  777. buf = container_of(buffer, struct rpc_buffer, data);
  778. size = buf->len;
  779. dprintk("RPC: freeing buffer of size %zu at %p\n",
  780. size, buf);
  781. if (size <= RPC_BUFFER_MAXSIZE)
  782. mempool_free(buf, rpc_buffer_mempool);
  783. else
  784. kfree(buf);
  785. }
  786. EXPORT_SYMBOL_GPL(rpc_free);
  787. /*
  788. * Creation and deletion of RPC task structures
  789. */
  790. static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
  791. {
  792. memset(task, 0, sizeof(*task));
  793. atomic_set(&task->tk_count, 1);
  794. task->tk_flags = task_setup_data->flags;
  795. task->tk_ops = task_setup_data->callback_ops;
  796. task->tk_calldata = task_setup_data->callback_data;
  797. INIT_LIST_HEAD(&task->tk_task);
  798. task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
  799. task->tk_owner = current->tgid;
  800. /* Initialize workqueue for async tasks */
  801. task->tk_workqueue = task_setup_data->workqueue;
  802. if (task->tk_ops->rpc_call_prepare != NULL)
  803. task->tk_action = rpc_prepare_task;
  804. rpc_init_task_statistics(task);
  805. dprintk("RPC: new task initialized, procpid %u\n",
  806. task_pid_nr(current));
  807. }
  808. static struct rpc_task *
  809. rpc_alloc_task(void)
  810. {
  811. return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOIO);
  812. }
  813. /*
  814. * Create a new task for the specified client.
  815. */
  816. struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
  817. {
  818. struct rpc_task *task = setup_data->task;
  819. unsigned short flags = 0;
  820. if (task == NULL) {
  821. task = rpc_alloc_task();
  822. if (task == NULL) {
  823. rpc_release_calldata(setup_data->callback_ops,
  824. setup_data->callback_data);
  825. return ERR_PTR(-ENOMEM);
  826. }
  827. flags = RPC_TASK_DYNAMIC;
  828. }
  829. rpc_init_task(task, setup_data);
  830. task->tk_flags |= flags;
  831. dprintk("RPC: allocated task %p\n", task);
  832. return task;
  833. }
  834. /*
  835. * rpc_free_task - release rpc task and perform cleanups
  836. *
  837. * Note that we free up the rpc_task _after_ rpc_release_calldata()
  838. * in order to work around a workqueue dependency issue.
  839. *
  840. * Tejun Heo states:
  841. * "Workqueue currently considers two work items to be the same if they're
  842. * on the same address and won't execute them concurrently - ie. it
  843. * makes a work item which is queued again while being executed wait
  844. * for the previous execution to complete.
  845. *
  846. * If a work function frees the work item, and then waits for an event
  847. * which should be performed by another work item and *that* work item
  848. * recycles the freed work item, it can create a false dependency loop.
  849. * There really is no reliable way to detect this short of verifying
  850. * every memory free."
  851. *
  852. */
  853. static void rpc_free_task(struct rpc_task *task)
  854. {
  855. unsigned short tk_flags = task->tk_flags;
  856. rpc_release_calldata(task->tk_ops, task->tk_calldata);
  857. if (tk_flags & RPC_TASK_DYNAMIC) {
  858. dprintk("RPC: %5u freeing task\n", task->tk_pid);
  859. mempool_free(task, rpc_task_mempool);
  860. }
  861. }
  862. static void rpc_async_release(struct work_struct *work)
  863. {
  864. rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
  865. }
  866. static void rpc_release_resources_task(struct rpc_task *task)
  867. {
  868. xprt_release(task);
  869. if (task->tk_msg.rpc_cred) {
  870. put_rpccred(task->tk_msg.rpc_cred);
  871. task->tk_msg.rpc_cred = NULL;
  872. }
  873. rpc_task_release_client(task);
  874. }
  875. static void rpc_final_put_task(struct rpc_task *task,
  876. struct workqueue_struct *q)
  877. {
  878. if (q != NULL) {
  879. INIT_WORK(&task->u.tk_work, rpc_async_release);
  880. queue_work(q, &task->u.tk_work);
  881. } else
  882. rpc_free_task(task);
  883. }
  884. static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
  885. {
  886. if (atomic_dec_and_test(&task->tk_count)) {
  887. rpc_release_resources_task(task);
  888. rpc_final_put_task(task, q);
  889. }
  890. }
  891. void rpc_put_task(struct rpc_task *task)
  892. {
  893. rpc_do_put_task(task, NULL);
  894. }
  895. EXPORT_SYMBOL_GPL(rpc_put_task);
  896. void rpc_put_task_async(struct rpc_task *task)
  897. {
  898. rpc_do_put_task(task, task->tk_workqueue);
  899. }
  900. EXPORT_SYMBOL_GPL(rpc_put_task_async);
  901. static void rpc_release_task(struct rpc_task *task)
  902. {
  903. dprintk("RPC: %5u release task\n", task->tk_pid);
  904. WARN_ON_ONCE(RPC_IS_QUEUED(task));
  905. rpc_release_resources_task(task);
  906. /*
  907. * Note: at this point we have been removed from rpc_clnt->cl_tasks,
  908. * so it should be safe to use task->tk_count as a test for whether
  909. * or not any other processes still hold references to our rpc_task.
  910. */
  911. if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
  912. /* Wake up anyone who may be waiting for task completion */
  913. if (!rpc_complete_task(task))
  914. return;
  915. } else {
  916. if (!atomic_dec_and_test(&task->tk_count))
  917. return;
  918. }
  919. rpc_final_put_task(task, task->tk_workqueue);
  920. }
  921. int rpciod_up(void)
  922. {
  923. return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
  924. }
  925. void rpciod_down(void)
  926. {
  927. module_put(THIS_MODULE);
  928. }
  929. /*
  930. * Start up the rpciod workqueue.
  931. */
  932. static int rpciod_start(void)
  933. {
  934. struct workqueue_struct *wq;
  935. /*
  936. * Create the rpciod thread and wait for it to start.
  937. */
  938. dprintk("RPC: creating workqueue rpciod\n");
  939. wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 1);
  940. rpciod_workqueue = wq;
  941. return rpciod_workqueue != NULL;
  942. }
  943. static void rpciod_stop(void)
  944. {
  945. struct workqueue_struct *wq = NULL;
  946. if (rpciod_workqueue == NULL)
  947. return;
  948. dprintk("RPC: destroying workqueue rpciod\n");
  949. wq = rpciod_workqueue;
  950. rpciod_workqueue = NULL;
  951. destroy_workqueue(wq);
  952. }
  953. void
  954. rpc_destroy_mempool(void)
  955. {
  956. rpciod_stop();
  957. if (rpc_buffer_mempool)
  958. mempool_destroy(rpc_buffer_mempool);
  959. if (rpc_task_mempool)
  960. mempool_destroy(rpc_task_mempool);
  961. if (rpc_task_slabp)
  962. kmem_cache_destroy(rpc_task_slabp);
  963. if (rpc_buffer_slabp)
  964. kmem_cache_destroy(rpc_buffer_slabp);
  965. rpc_destroy_wait_queue(&delay_queue);
  966. }
  967. int
  968. rpc_init_mempool(void)
  969. {
  970. /*
  971. * The following is not strictly a mempool initialisation,
  972. * but there is no harm in doing it here
  973. */
  974. rpc_init_wait_queue(&delay_queue, "delayq");
  975. if (!rpciod_start())
  976. goto err_nomem;
  977. rpc_task_slabp = kmem_cache_create("rpc_tasks",
  978. sizeof(struct rpc_task),
  979. 0, SLAB_HWCACHE_ALIGN,
  980. NULL);
  981. if (!rpc_task_slabp)
  982. goto err_nomem;
  983. rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
  984. RPC_BUFFER_MAXSIZE,
  985. 0, SLAB_HWCACHE_ALIGN,
  986. NULL);
  987. if (!rpc_buffer_slabp)
  988. goto err_nomem;
  989. rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
  990. rpc_task_slabp);
  991. if (!rpc_task_mempool)
  992. goto err_nomem;
  993. rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
  994. rpc_buffer_slabp);
  995. if (!rpc_buffer_mempool)
  996. goto err_nomem;
  997. return 0;
  998. err_nomem:
  999. rpc_destroy_mempool();
  1000. return -ENOMEM;
  1001. }