xprt.c 26 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000
  1. /*
  2. * linux/net/sunrpc/xprt.c
  3. *
  4. * This is a generic RPC call interface supporting congestion avoidance,
  5. * and asynchronous calls.
  6. *
  7. * The interface works like this:
  8. *
  9. * - When a process places a call, it allocates a request slot if
  10. * one is available. Otherwise, it sleeps on the backlog queue
  11. * (xprt_reserve).
  12. * - Next, the caller puts together the RPC message, stuffs it into
  13. * the request struct, and calls xprt_transmit().
  14. * - xprt_transmit sends the message and installs the caller on the
  15. * transport's wait list. At the same time, it installs a timer that
  16. * is run after the packet's timeout has expired.
  17. * - When a packet arrives, the data_ready handler walks the list of
  18. * pending requests for that transport. If a matching XID is found, the
  19. * caller is woken up, and the timer removed.
  20. * - When no reply arrives within the timeout interval, the timer is
  21. * fired by the kernel and runs xprt_timer(). It either adjusts the
  22. * timeout values (minor timeout) or wakes up the caller with a status
  23. * of -ETIMEDOUT.
  24. * - When the caller receives a notification from RPC that a reply arrived,
  25. * it should release the RPC slot, and process the reply.
  26. * If the call timed out, it may choose to retry the operation by
  27. * adjusting the initial timeout value, and simply calling rpc_call
  28. * again.
  29. *
  30. * Support for async RPC is done through a set of RPC-specific scheduling
  31. * primitives that `transparently' work for processes as well as async
  32. * tasks that rely on callbacks.
  33. *
  34. * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  35. *
  36. * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
  37. */
  38. #include <linux/module.h>
  39. #include <linux/types.h>
  40. #include <linux/interrupt.h>
  41. #include <linux/workqueue.h>
  42. #include <linux/net.h>
  43. #include <linux/sunrpc/clnt.h>
  44. #include <linux/sunrpc/metrics.h>
  45. /*
  46. * Local variables
  47. */
  48. #ifdef RPC_DEBUG
  49. # define RPCDBG_FACILITY RPCDBG_XPRT
  50. #endif
  51. /*
  52. * Local functions
  53. */
  54. static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
  55. static inline void do_xprt_reserve(struct rpc_task *);
  56. static void xprt_connect_status(struct rpc_task *task);
  57. static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
  58. /*
  59. * The transport code maintains an estimate on the maximum number of out-
  60. * standing RPC requests, using a smoothed version of the congestion
  61. * avoidance implemented in 44BSD. This is basically the Van Jacobson
  62. * congestion algorithm: If a retransmit occurs, the congestion window is
  63. * halved; otherwise, it is incremented by 1/cwnd when
  64. *
  65. * - a reply is received and
  66. * - a full number of requests are outstanding and
  67. * - the congestion window hasn't been updated recently.
  68. */
  69. #define RPC_CWNDSHIFT (8U)
  70. #define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT)
  71. #define RPC_INITCWND RPC_CWNDSCALE
  72. #define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT)
  73. #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
  74. /**
  75. * xprt_reserve_xprt - serialize write access to transports
  76. * @task: task that is requesting access to the transport
  77. *
  78. * This prevents mixing the payload of separate requests, and prevents
  79. * transport connects from colliding with writes. No congestion control
  80. * is provided.
  81. */
  82. int xprt_reserve_xprt(struct rpc_task *task)
  83. {
  84. struct rpc_xprt *xprt = task->tk_xprt;
  85. struct rpc_rqst *req = task->tk_rqstp;
  86. if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
  87. if (task == xprt->snd_task)
  88. return 1;
  89. if (task == NULL)
  90. return 0;
  91. goto out_sleep;
  92. }
  93. xprt->snd_task = task;
  94. if (req) {
  95. req->rq_bytes_sent = 0;
  96. req->rq_ntrans++;
  97. }
  98. return 1;
  99. out_sleep:
  100. dprintk("RPC: %5u failed to lock transport %p\n",
  101. task->tk_pid, xprt);
  102. task->tk_timeout = 0;
  103. task->tk_status = -EAGAIN;
  104. if (req && req->rq_ntrans)
  105. rpc_sleep_on(&xprt->resend, task, NULL, NULL);
  106. else
  107. rpc_sleep_on(&xprt->sending, task, NULL, NULL);
  108. return 0;
  109. }
  110. EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
  111. static void xprt_clear_locked(struct rpc_xprt *xprt)
  112. {
  113. xprt->snd_task = NULL;
  114. if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
  115. smp_mb__before_clear_bit();
  116. clear_bit(XPRT_LOCKED, &xprt->state);
  117. smp_mb__after_clear_bit();
  118. } else
  119. queue_work(rpciod_workqueue, &xprt->task_cleanup);
  120. }
  121. /*
  122. * xprt_reserve_xprt_cong - serialize write access to transports
  123. * @task: task that is requesting access to the transport
  124. *
  125. * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
  126. * integrated into the decision of whether a request is allowed to be
  127. * woken up and given access to the transport.
  128. */
  129. int xprt_reserve_xprt_cong(struct rpc_task *task)
  130. {
  131. struct rpc_xprt *xprt = task->tk_xprt;
  132. struct rpc_rqst *req = task->tk_rqstp;
  133. if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
  134. if (task == xprt->snd_task)
  135. return 1;
  136. goto out_sleep;
  137. }
  138. if (__xprt_get_cong(xprt, task)) {
  139. xprt->snd_task = task;
  140. if (req) {
  141. req->rq_bytes_sent = 0;
  142. req->rq_ntrans++;
  143. }
  144. return 1;
  145. }
  146. xprt_clear_locked(xprt);
  147. out_sleep:
  148. dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
  149. task->tk_timeout = 0;
  150. task->tk_status = -EAGAIN;
  151. if (req && req->rq_ntrans)
  152. rpc_sleep_on(&xprt->resend, task, NULL, NULL);
  153. else
  154. rpc_sleep_on(&xprt->sending, task, NULL, NULL);
  155. return 0;
  156. }
  157. EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
  158. static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
  159. {
  160. int retval;
  161. spin_lock_bh(&xprt->transport_lock);
  162. retval = xprt->ops->reserve_xprt(task);
  163. spin_unlock_bh(&xprt->transport_lock);
  164. return retval;
  165. }
  166. static void __xprt_lock_write_next(struct rpc_xprt *xprt)
  167. {
  168. struct rpc_task *task;
  169. struct rpc_rqst *req;
  170. if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
  171. return;
  172. task = rpc_wake_up_next(&xprt->resend);
  173. if (!task) {
  174. task = rpc_wake_up_next(&xprt->sending);
  175. if (!task)
  176. goto out_unlock;
  177. }
  178. req = task->tk_rqstp;
  179. xprt->snd_task = task;
  180. if (req) {
  181. req->rq_bytes_sent = 0;
  182. req->rq_ntrans++;
  183. }
  184. return;
  185. out_unlock:
  186. xprt_clear_locked(xprt);
  187. }
  188. static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
  189. {
  190. struct rpc_task *task;
  191. if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
  192. return;
  193. if (RPCXPRT_CONGESTED(xprt))
  194. goto out_unlock;
  195. task = rpc_wake_up_next(&xprt->resend);
  196. if (!task) {
  197. task = rpc_wake_up_next(&xprt->sending);
  198. if (!task)
  199. goto out_unlock;
  200. }
  201. if (__xprt_get_cong(xprt, task)) {
  202. struct rpc_rqst *req = task->tk_rqstp;
  203. xprt->snd_task = task;
  204. if (req) {
  205. req->rq_bytes_sent = 0;
  206. req->rq_ntrans++;
  207. }
  208. return;
  209. }
  210. out_unlock:
  211. xprt_clear_locked(xprt);
  212. }
  213. /**
  214. * xprt_release_xprt - allow other requests to use a transport
  215. * @xprt: transport with other tasks potentially waiting
  216. * @task: task that is releasing access to the transport
  217. *
  218. * Note that "task" can be NULL. No congestion control is provided.
  219. */
  220. void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
  221. {
  222. if (xprt->snd_task == task) {
  223. xprt_clear_locked(xprt);
  224. __xprt_lock_write_next(xprt);
  225. }
  226. }
  227. EXPORT_SYMBOL_GPL(xprt_release_xprt);
  228. /**
  229. * xprt_release_xprt_cong - allow other requests to use a transport
  230. * @xprt: transport with other tasks potentially waiting
  231. * @task: task that is releasing access to the transport
  232. *
  233. * Note that "task" can be NULL. Another task is awoken to use the
  234. * transport if the transport's congestion window allows it.
  235. */
  236. void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
  237. {
  238. if (xprt->snd_task == task) {
  239. xprt_clear_locked(xprt);
  240. __xprt_lock_write_next_cong(xprt);
  241. }
  242. }
  243. EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
  244. static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
  245. {
  246. spin_lock_bh(&xprt->transport_lock);
  247. xprt->ops->release_xprt(xprt, task);
  248. spin_unlock_bh(&xprt->transport_lock);
  249. }
  250. /*
  251. * Van Jacobson congestion avoidance. Check if the congestion window
  252. * overflowed. Put the task to sleep if this is the case.
  253. */
  254. static int
  255. __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
  256. {
  257. struct rpc_rqst *req = task->tk_rqstp;
  258. if (req->rq_cong)
  259. return 1;
  260. dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
  261. task->tk_pid, xprt->cong, xprt->cwnd);
  262. if (RPCXPRT_CONGESTED(xprt))
  263. return 0;
  264. req->rq_cong = 1;
  265. xprt->cong += RPC_CWNDSCALE;
  266. return 1;
  267. }
  268. /*
  269. * Adjust the congestion window, and wake up the next task
  270. * that has been sleeping due to congestion
  271. */
  272. static void
  273. __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
  274. {
  275. if (!req->rq_cong)
  276. return;
  277. req->rq_cong = 0;
  278. xprt->cong -= RPC_CWNDSCALE;
  279. __xprt_lock_write_next_cong(xprt);
  280. }
  281. /**
  282. * xprt_release_rqst_cong - housekeeping when request is complete
  283. * @task: RPC request that recently completed
  284. *
  285. * Useful for transports that require congestion control.
  286. */
  287. void xprt_release_rqst_cong(struct rpc_task *task)
  288. {
  289. __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
  290. }
  291. EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
  292. /**
  293. * xprt_adjust_cwnd - adjust transport congestion window
  294. * @task: recently completed RPC request used to adjust window
  295. * @result: result code of completed RPC request
  296. *
  297. * We use a time-smoothed congestion estimator to avoid heavy oscillation.
  298. */
  299. void xprt_adjust_cwnd(struct rpc_task *task, int result)
  300. {
  301. struct rpc_rqst *req = task->tk_rqstp;
  302. struct rpc_xprt *xprt = task->tk_xprt;
  303. unsigned long cwnd = xprt->cwnd;
  304. if (result >= 0 && cwnd <= xprt->cong) {
  305. /* The (cwnd >> 1) term makes sure
  306. * the result gets rounded properly. */
  307. cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
  308. if (cwnd > RPC_MAXCWND(xprt))
  309. cwnd = RPC_MAXCWND(xprt);
  310. __xprt_lock_write_next_cong(xprt);
  311. } else if (result == -ETIMEDOUT) {
  312. cwnd >>= 1;
  313. if (cwnd < RPC_CWNDSCALE)
  314. cwnd = RPC_CWNDSCALE;
  315. }
  316. dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
  317. xprt->cong, xprt->cwnd, cwnd);
  318. xprt->cwnd = cwnd;
  319. __xprt_put_cong(xprt, req);
  320. }
  321. EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
  322. /**
  323. * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
  324. * @xprt: transport with waiting tasks
  325. * @status: result code to plant in each task before waking it
  326. *
  327. */
  328. void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
  329. {
  330. if (status < 0)
  331. rpc_wake_up_status(&xprt->pending, status);
  332. else
  333. rpc_wake_up(&xprt->pending);
  334. }
  335. EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
  336. /**
  337. * xprt_wait_for_buffer_space - wait for transport output buffer to clear
  338. * @task: task to be put to sleep
  339. *
  340. */
  341. void xprt_wait_for_buffer_space(struct rpc_task *task)
  342. {
  343. struct rpc_rqst *req = task->tk_rqstp;
  344. struct rpc_xprt *xprt = req->rq_xprt;
  345. task->tk_timeout = req->rq_timeout;
  346. rpc_sleep_on(&xprt->pending, task, NULL, NULL);
  347. }
  348. EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
  349. /**
  350. * xprt_write_space - wake the task waiting for transport output buffer space
  351. * @xprt: transport with waiting tasks
  352. *
  353. * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
  354. */
  355. void xprt_write_space(struct rpc_xprt *xprt)
  356. {
  357. if (unlikely(xprt->shutdown))
  358. return;
  359. spin_lock_bh(&xprt->transport_lock);
  360. if (xprt->snd_task) {
  361. dprintk("RPC: write space: waking waiting task on "
  362. "xprt %p\n", xprt);
  363. rpc_wake_up_task(xprt->snd_task);
  364. }
  365. spin_unlock_bh(&xprt->transport_lock);
  366. }
  367. EXPORT_SYMBOL_GPL(xprt_write_space);
  368. /**
  369. * xprt_set_retrans_timeout_def - set a request's retransmit timeout
  370. * @task: task whose timeout is to be set
  371. *
  372. * Set a request's retransmit timeout based on the transport's
  373. * default timeout parameters. Used by transports that don't adjust
  374. * the retransmit timeout based on round-trip time estimation.
  375. */
  376. void xprt_set_retrans_timeout_def(struct rpc_task *task)
  377. {
  378. task->tk_timeout = task->tk_rqstp->rq_timeout;
  379. }
  380. EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
  381. /*
  382. * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
  383. * @task: task whose timeout is to be set
  384. *
  385. * Set a request's retransmit timeout using the RTT estimator.
  386. */
  387. void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
  388. {
  389. int timer = task->tk_msg.rpc_proc->p_timer;
  390. struct rpc_rtt *rtt = task->tk_client->cl_rtt;
  391. struct rpc_rqst *req = task->tk_rqstp;
  392. unsigned long max_timeout = req->rq_xprt->timeout.to_maxval;
  393. task->tk_timeout = rpc_calc_rto(rtt, timer);
  394. task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
  395. if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
  396. task->tk_timeout = max_timeout;
  397. }
  398. EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
  399. static void xprt_reset_majortimeo(struct rpc_rqst *req)
  400. {
  401. struct rpc_timeout *to = &req->rq_xprt->timeout;
  402. req->rq_majortimeo = req->rq_timeout;
  403. if (to->to_exponential)
  404. req->rq_majortimeo <<= to->to_retries;
  405. else
  406. req->rq_majortimeo += to->to_increment * to->to_retries;
  407. if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
  408. req->rq_majortimeo = to->to_maxval;
  409. req->rq_majortimeo += jiffies;
  410. }
  411. /**
  412. * xprt_adjust_timeout - adjust timeout values for next retransmit
  413. * @req: RPC request containing parameters to use for the adjustment
  414. *
  415. */
  416. int xprt_adjust_timeout(struct rpc_rqst *req)
  417. {
  418. struct rpc_xprt *xprt = req->rq_xprt;
  419. struct rpc_timeout *to = &xprt->timeout;
  420. int status = 0;
  421. if (time_before(jiffies, req->rq_majortimeo)) {
  422. if (to->to_exponential)
  423. req->rq_timeout <<= 1;
  424. else
  425. req->rq_timeout += to->to_increment;
  426. if (to->to_maxval && req->rq_timeout >= to->to_maxval)
  427. req->rq_timeout = to->to_maxval;
  428. req->rq_retries++;
  429. } else {
  430. req->rq_timeout = to->to_initval;
  431. req->rq_retries = 0;
  432. xprt_reset_majortimeo(req);
  433. /* Reset the RTT counters == "slow start" */
  434. spin_lock_bh(&xprt->transport_lock);
  435. rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
  436. spin_unlock_bh(&xprt->transport_lock);
  437. status = -ETIMEDOUT;
  438. }
  439. if (req->rq_timeout == 0) {
  440. printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
  441. req->rq_timeout = 5 * HZ;
  442. }
  443. return status;
  444. }
  445. static void xprt_autoclose(struct work_struct *work)
  446. {
  447. struct rpc_xprt *xprt =
  448. container_of(work, struct rpc_xprt, task_cleanup);
  449. xprt_disconnect(xprt);
  450. xprt->ops->close(xprt);
  451. xprt_release_write(xprt, NULL);
  452. }
  453. /**
  454. * xprt_disconnect - mark a transport as disconnected
  455. * @xprt: transport to flag for disconnect
  456. *
  457. */
  458. void xprt_disconnect(struct rpc_xprt *xprt)
  459. {
  460. dprintk("RPC: disconnected transport %p\n", xprt);
  461. spin_lock_bh(&xprt->transport_lock);
  462. xprt_clear_connected(xprt);
  463. xprt_wake_pending_tasks(xprt, -ENOTCONN);
  464. spin_unlock_bh(&xprt->transport_lock);
  465. }
  466. EXPORT_SYMBOL_GPL(xprt_disconnect);
  467. static void
  468. xprt_init_autodisconnect(unsigned long data)
  469. {
  470. struct rpc_xprt *xprt = (struct rpc_xprt *)data;
  471. spin_lock(&xprt->transport_lock);
  472. if (!list_empty(&xprt->recv) || xprt->shutdown)
  473. goto out_abort;
  474. if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
  475. goto out_abort;
  476. spin_unlock(&xprt->transport_lock);
  477. if (xprt_connecting(xprt))
  478. xprt_release_write(xprt, NULL);
  479. else
  480. queue_work(rpciod_workqueue, &xprt->task_cleanup);
  481. return;
  482. out_abort:
  483. spin_unlock(&xprt->transport_lock);
  484. }
  485. /**
  486. * xprt_connect - schedule a transport connect operation
  487. * @task: RPC task that is requesting the connect
  488. *
  489. */
  490. void xprt_connect(struct rpc_task *task)
  491. {
  492. struct rpc_xprt *xprt = task->tk_xprt;
  493. dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
  494. xprt, (xprt_connected(xprt) ? "is" : "is not"));
  495. if (!xprt_bound(xprt)) {
  496. task->tk_status = -EIO;
  497. return;
  498. }
  499. if (!xprt_lock_write(xprt, task))
  500. return;
  501. if (xprt_connected(xprt))
  502. xprt_release_write(xprt, task);
  503. else {
  504. if (task->tk_rqstp)
  505. task->tk_rqstp->rq_bytes_sent = 0;
  506. task->tk_timeout = xprt->connect_timeout;
  507. rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
  508. xprt->stat.connect_start = jiffies;
  509. xprt->ops->connect(task);
  510. }
  511. return;
  512. }
  513. static void xprt_connect_status(struct rpc_task *task)
  514. {
  515. struct rpc_xprt *xprt = task->tk_xprt;
  516. if (task->tk_status >= 0) {
  517. xprt->stat.connect_count++;
  518. xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
  519. dprintk("RPC: %5u xprt_connect_status: connection established\n",
  520. task->tk_pid);
  521. return;
  522. }
  523. switch (task->tk_status) {
  524. case -ECONNREFUSED:
  525. case -ECONNRESET:
  526. dprintk("RPC: %5u xprt_connect_status: server %s refused "
  527. "connection\n", task->tk_pid,
  528. task->tk_client->cl_server);
  529. break;
  530. case -ENOTCONN:
  531. dprintk("RPC: %5u xprt_connect_status: connection broken\n",
  532. task->tk_pid);
  533. break;
  534. case -ETIMEDOUT:
  535. dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
  536. "out\n", task->tk_pid);
  537. break;
  538. default:
  539. dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
  540. "server %s\n", task->tk_pid, -task->tk_status,
  541. task->tk_client->cl_server);
  542. xprt_release_write(xprt, task);
  543. task->tk_status = -EIO;
  544. }
  545. }
  546. /**
  547. * xprt_lookup_rqst - find an RPC request corresponding to an XID
  548. * @xprt: transport on which the original request was transmitted
  549. * @xid: RPC XID of incoming reply
  550. *
  551. */
  552. struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
  553. {
  554. struct list_head *pos;
  555. list_for_each(pos, &xprt->recv) {
  556. struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
  557. if (entry->rq_xid == xid)
  558. return entry;
  559. }
  560. dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n",
  561. ntohl(xid));
  562. xprt->stat.bad_xids++;
  563. return NULL;
  564. }
  565. EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
  566. /**
  567. * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
  568. * @task: RPC request that recently completed
  569. *
  570. */
  571. void xprt_update_rtt(struct rpc_task *task)
  572. {
  573. struct rpc_rqst *req = task->tk_rqstp;
  574. struct rpc_rtt *rtt = task->tk_client->cl_rtt;
  575. unsigned timer = task->tk_msg.rpc_proc->p_timer;
  576. if (timer) {
  577. if (req->rq_ntrans == 1)
  578. rpc_update_rtt(rtt, timer,
  579. (long)jiffies - req->rq_xtime);
  580. rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
  581. }
  582. }
  583. EXPORT_SYMBOL_GPL(xprt_update_rtt);
  584. /**
  585. * xprt_complete_rqst - called when reply processing is complete
  586. * @task: RPC request that recently completed
  587. * @copied: actual number of bytes received from the transport
  588. *
  589. * Caller holds transport lock.
  590. */
  591. void xprt_complete_rqst(struct rpc_task *task, int copied)
  592. {
  593. struct rpc_rqst *req = task->tk_rqstp;
  594. dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
  595. task->tk_pid, ntohl(req->rq_xid), copied);
  596. task->tk_xprt->stat.recvs++;
  597. task->tk_rtt = (long)jiffies - req->rq_xtime;
  598. list_del_init(&req->rq_list);
  599. /* Ensure all writes are done before we update req->rq_received */
  600. smp_wmb();
  601. req->rq_received = req->rq_private_buf.len = copied;
  602. rpc_wake_up_task(task);
  603. }
  604. EXPORT_SYMBOL_GPL(xprt_complete_rqst);
  605. static void xprt_timer(struct rpc_task *task)
  606. {
  607. struct rpc_rqst *req = task->tk_rqstp;
  608. struct rpc_xprt *xprt = req->rq_xprt;
  609. dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
  610. spin_lock(&xprt->transport_lock);
  611. if (!req->rq_received) {
  612. if (xprt->ops->timer)
  613. xprt->ops->timer(task);
  614. task->tk_status = -ETIMEDOUT;
  615. }
  616. task->tk_timeout = 0;
  617. rpc_wake_up_task(task);
  618. spin_unlock(&xprt->transport_lock);
  619. }
  620. /**
  621. * xprt_prepare_transmit - reserve the transport before sending a request
  622. * @task: RPC task about to send a request
  623. *
  624. */
  625. int xprt_prepare_transmit(struct rpc_task *task)
  626. {
  627. struct rpc_rqst *req = task->tk_rqstp;
  628. struct rpc_xprt *xprt = req->rq_xprt;
  629. int err = 0;
  630. dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
  631. spin_lock_bh(&xprt->transport_lock);
  632. if (req->rq_received && !req->rq_bytes_sent) {
  633. err = req->rq_received;
  634. goto out_unlock;
  635. }
  636. if (!xprt->ops->reserve_xprt(task)) {
  637. err = -EAGAIN;
  638. goto out_unlock;
  639. }
  640. if (!xprt_connected(xprt)) {
  641. err = -ENOTCONN;
  642. goto out_unlock;
  643. }
  644. out_unlock:
  645. spin_unlock_bh(&xprt->transport_lock);
  646. return err;
  647. }
  648. void xprt_end_transmit(struct rpc_task *task)
  649. {
  650. xprt_release_write(task->tk_xprt, task);
  651. }
  652. /**
  653. * xprt_transmit - send an RPC request on a transport
  654. * @task: controlling RPC task
  655. *
  656. * We have to copy the iovec because sendmsg fiddles with its contents.
  657. */
  658. void xprt_transmit(struct rpc_task *task)
  659. {
  660. struct rpc_rqst *req = task->tk_rqstp;
  661. struct rpc_xprt *xprt = req->rq_xprt;
  662. int status;
  663. dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
  664. if (!req->rq_received) {
  665. if (list_empty(&req->rq_list)) {
  666. spin_lock_bh(&xprt->transport_lock);
  667. /* Update the softirq receive buffer */
  668. memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
  669. sizeof(req->rq_private_buf));
  670. /* Add request to the receive list */
  671. list_add_tail(&req->rq_list, &xprt->recv);
  672. spin_unlock_bh(&xprt->transport_lock);
  673. xprt_reset_majortimeo(req);
  674. /* Turn off autodisconnect */
  675. del_singleshot_timer_sync(&xprt->timer);
  676. }
  677. } else if (!req->rq_bytes_sent)
  678. return;
  679. status = xprt->ops->send_request(task);
  680. if (status == 0) {
  681. dprintk("RPC: %5u xmit complete\n", task->tk_pid);
  682. spin_lock_bh(&xprt->transport_lock);
  683. xprt->ops->set_retrans_timeout(task);
  684. xprt->stat.sends++;
  685. xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
  686. xprt->stat.bklog_u += xprt->backlog.qlen;
  687. /* Don't race with disconnect */
  688. if (!xprt_connected(xprt))
  689. task->tk_status = -ENOTCONN;
  690. else if (!req->rq_received)
  691. rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
  692. spin_unlock_bh(&xprt->transport_lock);
  693. return;
  694. }
  695. /* Note: at this point, task->tk_sleeping has not yet been set,
  696. * hence there is no danger of the waking up task being put on
  697. * schedq, and being picked up by a parallel run of rpciod().
  698. */
  699. task->tk_status = status;
  700. if (status == -ECONNREFUSED)
  701. rpc_sleep_on(&xprt->sending, task, NULL, NULL);
  702. }
  703. static inline void do_xprt_reserve(struct rpc_task *task)
  704. {
  705. struct rpc_xprt *xprt = task->tk_xprt;
  706. task->tk_status = 0;
  707. if (task->tk_rqstp)
  708. return;
  709. if (!list_empty(&xprt->free)) {
  710. struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
  711. list_del_init(&req->rq_list);
  712. task->tk_rqstp = req;
  713. xprt_request_init(task, xprt);
  714. return;
  715. }
  716. dprintk("RPC: waiting for request slot\n");
  717. task->tk_status = -EAGAIN;
  718. task->tk_timeout = 0;
  719. rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
  720. }
  721. /**
  722. * xprt_reserve - allocate an RPC request slot
  723. * @task: RPC task requesting a slot allocation
  724. *
  725. * If no more slots are available, place the task on the transport's
  726. * backlog queue.
  727. */
  728. void xprt_reserve(struct rpc_task *task)
  729. {
  730. struct rpc_xprt *xprt = task->tk_xprt;
  731. task->tk_status = -EIO;
  732. spin_lock(&xprt->reserve_lock);
  733. do_xprt_reserve(task);
  734. spin_unlock(&xprt->reserve_lock);
  735. }
  736. static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
  737. {
  738. return xprt->xid++;
  739. }
  740. static inline void xprt_init_xid(struct rpc_xprt *xprt)
  741. {
  742. xprt->xid = net_random();
  743. }
  744. static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
  745. {
  746. struct rpc_rqst *req = task->tk_rqstp;
  747. req->rq_timeout = xprt->timeout.to_initval;
  748. req->rq_task = task;
  749. req->rq_xprt = xprt;
  750. req->rq_buffer = NULL;
  751. req->rq_xid = xprt_alloc_xid(xprt);
  752. req->rq_release_snd_buf = NULL;
  753. xprt_reset_majortimeo(req);
  754. dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
  755. req, ntohl(req->rq_xid));
  756. }
  757. /**
  758. * xprt_release - release an RPC request slot
  759. * @task: task which is finished with the slot
  760. *
  761. */
  762. void xprt_release(struct rpc_task *task)
  763. {
  764. struct rpc_xprt *xprt = task->tk_xprt;
  765. struct rpc_rqst *req;
  766. if (!(req = task->tk_rqstp))
  767. return;
  768. rpc_count_iostats(task);
  769. spin_lock_bh(&xprt->transport_lock);
  770. xprt->ops->release_xprt(xprt, task);
  771. if (xprt->ops->release_request)
  772. xprt->ops->release_request(task);
  773. if (!list_empty(&req->rq_list))
  774. list_del(&req->rq_list);
  775. xprt->last_used = jiffies;
  776. if (list_empty(&xprt->recv))
  777. mod_timer(&xprt->timer,
  778. xprt->last_used + xprt->idle_timeout);
  779. spin_unlock_bh(&xprt->transport_lock);
  780. xprt->ops->buf_free(req->rq_buffer);
  781. task->tk_rqstp = NULL;
  782. if (req->rq_release_snd_buf)
  783. req->rq_release_snd_buf(req);
  784. memset(req, 0, sizeof(*req)); /* mark unused */
  785. dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
  786. spin_lock(&xprt->reserve_lock);
  787. list_add(&req->rq_list, &xprt->free);
  788. rpc_wake_up_next(&xprt->backlog);
  789. spin_unlock(&xprt->reserve_lock);
  790. }
  791. /**
  792. * xprt_set_timeout - set constant RPC timeout
  793. * @to: RPC timeout parameters to set up
  794. * @retr: number of retries
  795. * @incr: amount of increase after each retry
  796. *
  797. */
  798. void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
  799. {
  800. to->to_initval =
  801. to->to_increment = incr;
  802. to->to_maxval = to->to_initval + (incr * retr);
  803. to->to_retries = retr;
  804. to->to_exponential = 0;
  805. }
  806. /**
  807. * xprt_create_transport - create an RPC transport
  808. * @args: rpc transport creation arguments
  809. *
  810. */
  811. struct rpc_xprt *xprt_create_transport(struct rpc_xprtsock_create *args)
  812. {
  813. struct rpc_xprt *xprt;
  814. struct rpc_rqst *req;
  815. switch (args->proto) {
  816. case IPPROTO_UDP:
  817. xprt = xs_setup_udp(args);
  818. break;
  819. case IPPROTO_TCP:
  820. xprt = xs_setup_tcp(args);
  821. break;
  822. default:
  823. printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
  824. args->proto);
  825. return ERR_PTR(-EIO);
  826. }
  827. if (IS_ERR(xprt)) {
  828. dprintk("RPC: xprt_create_transport: failed, %ld\n",
  829. -PTR_ERR(xprt));
  830. return xprt;
  831. }
  832. kref_init(&xprt->kref);
  833. spin_lock_init(&xprt->transport_lock);
  834. spin_lock_init(&xprt->reserve_lock);
  835. INIT_LIST_HEAD(&xprt->free);
  836. INIT_LIST_HEAD(&xprt->recv);
  837. INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
  838. init_timer(&xprt->timer);
  839. xprt->timer.function = xprt_init_autodisconnect;
  840. xprt->timer.data = (unsigned long) xprt;
  841. xprt->last_used = jiffies;
  842. xprt->cwnd = RPC_INITCWND;
  843. xprt->bind_index = 0;
  844. rpc_init_wait_queue(&xprt->binding, "xprt_binding");
  845. rpc_init_wait_queue(&xprt->pending, "xprt_pending");
  846. rpc_init_wait_queue(&xprt->sending, "xprt_sending");
  847. rpc_init_wait_queue(&xprt->resend, "xprt_resend");
  848. rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
  849. /* initialize free list */
  850. for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
  851. list_add(&req->rq_list, &xprt->free);
  852. xprt_init_xid(xprt);
  853. dprintk("RPC: created transport %p with %u slots\n", xprt,
  854. xprt->max_reqs);
  855. return xprt;
  856. }
  857. /**
  858. * xprt_destroy - destroy an RPC transport, killing off all requests.
  859. * @kref: kref for the transport to destroy
  860. *
  861. */
  862. static void xprt_destroy(struct kref *kref)
  863. {
  864. struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
  865. dprintk("RPC: destroying transport %p\n", xprt);
  866. xprt->shutdown = 1;
  867. del_timer_sync(&xprt->timer);
  868. /*
  869. * Tear down transport state and free the rpc_xprt
  870. */
  871. xprt->ops->destroy(xprt);
  872. }
  873. /**
  874. * xprt_put - release a reference to an RPC transport.
  875. * @xprt: pointer to the transport
  876. *
  877. */
  878. void xprt_put(struct rpc_xprt *xprt)
  879. {
  880. kref_put(&xprt->kref, xprt_destroy);
  881. }
  882. /**
  883. * xprt_get - return a reference to an RPC transport.
  884. * @xprt: pointer to the transport
  885. *
  886. */
  887. struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
  888. {
  889. kref_get(&xprt->kref);
  890. return xprt;
  891. }