xprt.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899
  1. /*
  2. * linux/net/sunrpc/xprt.c
  3. *
  4. * This is a generic RPC call interface supporting congestion avoidance,
  5. * and asynchronous calls.
  6. *
  7. * The interface works like this:
  8. *
  9. * - When a process places a call, it allocates a request slot if
  10. * one is available. Otherwise, it sleeps on the backlog queue
  11. * (xprt_reserve).
  12. * - Next, the caller puts together the RPC message, stuffs it into
  13. * the request struct, and calls xprt_transmit().
  14. * - xprt_transmit sends the message and installs the caller on the
  15. * transport's wait list. At the same time, it installs a timer that
  16. * is run after the packet's timeout has expired.
  17. * - When a packet arrives, the data_ready handler walks the list of
  18. * pending requests for that transport. If a matching XID is found, the
  19. * caller is woken up, and the timer removed.
  20. * - When no reply arrives within the timeout interval, the timer is
  21. * fired by the kernel and runs xprt_timer(). It either adjusts the
  22. * timeout values (minor timeout) or wakes up the caller with a status
  23. * of -ETIMEDOUT.
  24. * - When the caller receives a notification from RPC that a reply arrived,
  25. * it should release the RPC slot, and process the reply.
  26. * If the call timed out, it may choose to retry the operation by
  27. * adjusting the initial timeout value, and simply calling rpc_call
  28. * again.
  29. *
  30. * Support for async RPC is done through a set of RPC-specific scheduling
  31. * primitives that `transparently' work for processes as well as async
  32. * tasks that rely on callbacks.
  33. *
  34. * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  35. *
  36. * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
  37. */
  38. #include <linux/module.h>
  39. #include <linux/types.h>
  40. #include <linux/interrupt.h>
  41. #include <linux/workqueue.h>
  42. #include <linux/random.h>
  43. #include <linux/sunrpc/clnt.h>
  44. /*
  45. * Local variables
  46. */
  47. #ifdef RPC_DEBUG
  48. # undef RPC_DEBUG_DATA
  49. # define RPCDBG_FACILITY RPCDBG_XPRT
  50. #endif
  51. /*
  52. * Local functions
  53. */
  54. static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
  55. static inline void do_xprt_reserve(struct rpc_task *);
  56. static void xprt_connect_status(struct rpc_task *task);
  57. static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
  58. static int xprt_clear_backlog(struct rpc_xprt *xprt);
  59. /*
  60. * Serialize write access to transports, in order to prevent different
  61. * requests from interfering with each other.
  62. * Also prevents transport connects from colliding with writes.
  63. */
  64. static int
  65. __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
  66. {
  67. struct rpc_rqst *req = task->tk_rqstp;
  68. if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
  69. if (task == xprt->snd_task)
  70. return 1;
  71. goto out_sleep;
  72. }
  73. if (xprt->nocong || __xprt_get_cong(xprt, task)) {
  74. xprt->snd_task = task;
  75. if (req) {
  76. req->rq_bytes_sent = 0;
  77. req->rq_ntrans++;
  78. }
  79. return 1;
  80. }
  81. smp_mb__before_clear_bit();
  82. clear_bit(XPRT_LOCKED, &xprt->state);
  83. smp_mb__after_clear_bit();
  84. out_sleep:
  85. dprintk("RPC: %4d failed to lock transport %p\n", task->tk_pid, xprt);
  86. task->tk_timeout = 0;
  87. task->tk_status = -EAGAIN;
  88. if (req && req->rq_ntrans)
  89. rpc_sleep_on(&xprt->resend, task, NULL, NULL);
  90. else
  91. rpc_sleep_on(&xprt->sending, task, NULL, NULL);
  92. return 0;
  93. }
  94. static inline int
  95. xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
  96. {
  97. int retval;
  98. spin_lock_bh(&xprt->transport_lock);
  99. retval = __xprt_lock_write(xprt, task);
  100. spin_unlock_bh(&xprt->transport_lock);
  101. return retval;
  102. }
  103. static void
  104. __xprt_lock_write_next(struct rpc_xprt *xprt)
  105. {
  106. struct rpc_task *task;
  107. if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
  108. return;
  109. if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
  110. goto out_unlock;
  111. task = rpc_wake_up_next(&xprt->resend);
  112. if (!task) {
  113. task = rpc_wake_up_next(&xprt->sending);
  114. if (!task)
  115. goto out_unlock;
  116. }
  117. if (xprt->nocong || __xprt_get_cong(xprt, task)) {
  118. struct rpc_rqst *req = task->tk_rqstp;
  119. xprt->snd_task = task;
  120. if (req) {
  121. req->rq_bytes_sent = 0;
  122. req->rq_ntrans++;
  123. }
  124. return;
  125. }
  126. out_unlock:
  127. smp_mb__before_clear_bit();
  128. clear_bit(XPRT_LOCKED, &xprt->state);
  129. smp_mb__after_clear_bit();
  130. }
  131. /*
  132. * Releases the transport for use by other requests.
  133. */
  134. static void
  135. __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
  136. {
  137. if (xprt->snd_task == task) {
  138. xprt->snd_task = NULL;
  139. smp_mb__before_clear_bit();
  140. clear_bit(XPRT_LOCKED, &xprt->state);
  141. smp_mb__after_clear_bit();
  142. __xprt_lock_write_next(xprt);
  143. }
  144. }
  145. static inline void
  146. xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
  147. {
  148. spin_lock_bh(&xprt->transport_lock);
  149. __xprt_release_write(xprt, task);
  150. spin_unlock_bh(&xprt->transport_lock);
  151. }
  152. /*
  153. * Van Jacobson congestion avoidance. Check if the congestion window
  154. * overflowed. Put the task to sleep if this is the case.
  155. */
  156. static int
  157. __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
  158. {
  159. struct rpc_rqst *req = task->tk_rqstp;
  160. if (req->rq_cong)
  161. return 1;
  162. dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
  163. task->tk_pid, xprt->cong, xprt->cwnd);
  164. if (RPCXPRT_CONGESTED(xprt))
  165. return 0;
  166. req->rq_cong = 1;
  167. xprt->cong += RPC_CWNDSCALE;
  168. return 1;
  169. }
  170. /*
  171. * Adjust the congestion window, and wake up the next task
  172. * that has been sleeping due to congestion
  173. */
  174. static void
  175. __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
  176. {
  177. if (!req->rq_cong)
  178. return;
  179. req->rq_cong = 0;
  180. xprt->cong -= RPC_CWNDSCALE;
  181. __xprt_lock_write_next(xprt);
  182. }
  183. /*
  184. * Adjust RPC congestion window
  185. * We use a time-smoothed congestion estimator to avoid heavy oscillation.
  186. */
  187. static void
  188. xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
  189. {
  190. unsigned long cwnd;
  191. cwnd = xprt->cwnd;
  192. if (result >= 0 && cwnd <= xprt->cong) {
  193. /* The (cwnd >> 1) term makes sure
  194. * the result gets rounded properly. */
  195. cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
  196. if (cwnd > RPC_MAXCWND(xprt))
  197. cwnd = RPC_MAXCWND(xprt);
  198. __xprt_lock_write_next(xprt);
  199. } else if (result == -ETIMEDOUT) {
  200. cwnd >>= 1;
  201. if (cwnd < RPC_CWNDSCALE)
  202. cwnd = RPC_CWNDSCALE;
  203. }
  204. dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
  205. xprt->cong, xprt->cwnd, cwnd);
  206. xprt->cwnd = cwnd;
  207. }
  208. /**
  209. * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
  210. * @xprt: transport with waiting tasks
  211. * @status: result code to plant in each task before waking it
  212. *
  213. */
  214. void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
  215. {
  216. if (status < 0)
  217. rpc_wake_up_status(&xprt->pending, status);
  218. else
  219. rpc_wake_up(&xprt->pending);
  220. }
  221. /**
  222. * xprt_wait_for_buffer_space - wait for transport output buffer to clear
  223. * @task: task to be put to sleep
  224. *
  225. */
  226. void xprt_wait_for_buffer_space(struct rpc_task *task)
  227. {
  228. struct rpc_rqst *req = task->tk_rqstp;
  229. struct rpc_xprt *xprt = req->rq_xprt;
  230. task->tk_timeout = req->rq_timeout;
  231. rpc_sleep_on(&xprt->pending, task, NULL, NULL);
  232. }
  233. /**
  234. * xprt_write_space - wake the task waiting for transport output buffer space
  235. * @xprt: transport with waiting tasks
  236. *
  237. * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
  238. */
  239. void xprt_write_space(struct rpc_xprt *xprt)
  240. {
  241. if (unlikely(xprt->shutdown))
  242. return;
  243. spin_lock_bh(&xprt->transport_lock);
  244. if (xprt->snd_task) {
  245. dprintk("RPC: write space: waking waiting task on xprt %p\n",
  246. xprt);
  247. rpc_wake_up_task(xprt->snd_task);
  248. }
  249. spin_unlock_bh(&xprt->transport_lock);
  250. }
  251. /**
  252. * xprt_set_retrans_timeout_def - set a request's retransmit timeout
  253. * @task: task whose timeout is to be set
  254. *
  255. * Set a request's retransmit timeout based on the transport's
  256. * default timeout parameters. Used by transports that don't adjust
  257. * the retransmit timeout based on round-trip time estimation.
  258. */
  259. void xprt_set_retrans_timeout_def(struct rpc_task *task)
  260. {
  261. task->tk_timeout = task->tk_rqstp->rq_timeout;
  262. }
  263. /*
  264. * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
  265. * @task: task whose timeout is to be set
  266. *
  267. * Set a request's retransmit timeout using the RTT estimator.
  268. */
  269. void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
  270. {
  271. int timer = task->tk_msg.rpc_proc->p_timer;
  272. struct rpc_rtt *rtt = task->tk_client->cl_rtt;
  273. struct rpc_rqst *req = task->tk_rqstp;
  274. unsigned long max_timeout = req->rq_xprt->timeout.to_maxval;
  275. task->tk_timeout = rpc_calc_rto(rtt, timer);
  276. task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
  277. if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
  278. task->tk_timeout = max_timeout;
  279. }
  280. static void xprt_reset_majortimeo(struct rpc_rqst *req)
  281. {
  282. struct rpc_timeout *to = &req->rq_xprt->timeout;
  283. req->rq_majortimeo = req->rq_timeout;
  284. if (to->to_exponential)
  285. req->rq_majortimeo <<= to->to_retries;
  286. else
  287. req->rq_majortimeo += to->to_increment * to->to_retries;
  288. if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
  289. req->rq_majortimeo = to->to_maxval;
  290. req->rq_majortimeo += jiffies;
  291. }
  292. /**
  293. * xprt_adjust_timeout - adjust timeout values for next retransmit
  294. * @req: RPC request containing parameters to use for the adjustment
  295. *
  296. */
  297. int xprt_adjust_timeout(struct rpc_rqst *req)
  298. {
  299. struct rpc_xprt *xprt = req->rq_xprt;
  300. struct rpc_timeout *to = &xprt->timeout;
  301. int status = 0;
  302. if (time_before(jiffies, req->rq_majortimeo)) {
  303. if (to->to_exponential)
  304. req->rq_timeout <<= 1;
  305. else
  306. req->rq_timeout += to->to_increment;
  307. if (to->to_maxval && req->rq_timeout >= to->to_maxval)
  308. req->rq_timeout = to->to_maxval;
  309. req->rq_retries++;
  310. pprintk("RPC: %lu retrans\n", jiffies);
  311. } else {
  312. req->rq_timeout = to->to_initval;
  313. req->rq_retries = 0;
  314. xprt_reset_majortimeo(req);
  315. /* Reset the RTT counters == "slow start" */
  316. spin_lock_bh(&xprt->transport_lock);
  317. rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
  318. spin_unlock_bh(&xprt->transport_lock);
  319. pprintk("RPC: %lu timeout\n", jiffies);
  320. status = -ETIMEDOUT;
  321. }
  322. if (req->rq_timeout == 0) {
  323. printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
  324. req->rq_timeout = 5 * HZ;
  325. }
  326. return status;
  327. }
  328. static void xprt_autoclose(void *args)
  329. {
  330. struct rpc_xprt *xprt = (struct rpc_xprt *)args;
  331. xprt_disconnect(xprt);
  332. xprt->ops->close(xprt);
  333. xprt_release_write(xprt, NULL);
  334. }
  335. /**
  336. * xprt_disconnect - mark a transport as disconnected
  337. * @xprt: transport to flag for disconnect
  338. *
  339. */
  340. void xprt_disconnect(struct rpc_xprt *xprt)
  341. {
  342. dprintk("RPC: disconnected transport %p\n", xprt);
  343. spin_lock_bh(&xprt->transport_lock);
  344. xprt_clear_connected(xprt);
  345. xprt_wake_pending_tasks(xprt, -ENOTCONN);
  346. spin_unlock_bh(&xprt->transport_lock);
  347. }
  348. static void
  349. xprt_init_autodisconnect(unsigned long data)
  350. {
  351. struct rpc_xprt *xprt = (struct rpc_xprt *)data;
  352. spin_lock(&xprt->transport_lock);
  353. if (!list_empty(&xprt->recv) || xprt->shutdown)
  354. goto out_abort;
  355. if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
  356. goto out_abort;
  357. spin_unlock(&xprt->transport_lock);
  358. if (xprt_connecting(xprt))
  359. xprt_release_write(xprt, NULL);
  360. else
  361. schedule_work(&xprt->task_cleanup);
  362. return;
  363. out_abort:
  364. spin_unlock(&xprt->transport_lock);
  365. }
  366. /**
  367. * xprt_connect - schedule a transport connect operation
  368. * @task: RPC task that is requesting the connect
  369. *
  370. */
  371. void xprt_connect(struct rpc_task *task)
  372. {
  373. struct rpc_xprt *xprt = task->tk_xprt;
  374. dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
  375. xprt, (xprt_connected(xprt) ? "is" : "is not"));
  376. if (xprt->shutdown) {
  377. task->tk_status = -EIO;
  378. return;
  379. }
  380. if (!xprt->addr.sin_port) {
  381. task->tk_status = -EIO;
  382. return;
  383. }
  384. if (!xprt_lock_write(xprt, task))
  385. return;
  386. if (xprt_connected(xprt))
  387. xprt_release_write(xprt, task);
  388. else {
  389. if (task->tk_rqstp)
  390. task->tk_rqstp->rq_bytes_sent = 0;
  391. task->tk_timeout = RPC_CONNECT_TIMEOUT;
  392. rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
  393. xprt->ops->connect(task);
  394. }
  395. return;
  396. }
  397. static void xprt_connect_status(struct rpc_task *task)
  398. {
  399. struct rpc_xprt *xprt = task->tk_xprt;
  400. if (task->tk_status >= 0) {
  401. dprintk("RPC: %4d xprt_connect_status: connection established\n",
  402. task->tk_pid);
  403. return;
  404. }
  405. switch (task->tk_status) {
  406. case -ECONNREFUSED:
  407. case -ECONNRESET:
  408. dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n",
  409. task->tk_pid, task->tk_client->cl_server);
  410. break;
  411. case -ENOTCONN:
  412. dprintk("RPC: %4d xprt_connect_status: connection broken\n",
  413. task->tk_pid);
  414. break;
  415. case -ETIMEDOUT:
  416. dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n",
  417. task->tk_pid);
  418. break;
  419. default:
  420. dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n",
  421. task->tk_pid, -task->tk_status, task->tk_client->cl_server);
  422. xprt_release_write(xprt, task);
  423. task->tk_status = -EIO;
  424. return;
  425. }
  426. /* if soft mounted, just cause this RPC to fail */
  427. if (RPC_IS_SOFT(task)) {
  428. xprt_release_write(xprt, task);
  429. task->tk_status = -EIO;
  430. }
  431. }
  432. /**
  433. * xprt_lookup_rqst - find an RPC request corresponding to an XID
  434. * @xprt: transport on which the original request was transmitted
  435. * @xid: RPC XID of incoming reply
  436. *
  437. */
  438. struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
  439. {
  440. struct list_head *pos;
  441. struct rpc_rqst *req = NULL;
  442. list_for_each(pos, &xprt->recv) {
  443. struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
  444. if (entry->rq_xid == xid) {
  445. req = entry;
  446. break;
  447. }
  448. }
  449. return req;
  450. }
  451. /**
  452. * xprt_complete_rqst - called when reply processing is complete
  453. * @xprt: controlling transport
  454. * @req: RPC request that just completed
  455. * @copied: actual number of bytes received from the transport
  456. *
  457. */
  458. void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
  459. {
  460. struct rpc_task *task = req->rq_task;
  461. struct rpc_clnt *clnt = task->tk_client;
  462. /* Adjust congestion window */
  463. if (!xprt->nocong) {
  464. unsigned timer = task->tk_msg.rpc_proc->p_timer;
  465. xprt_adjust_cwnd(xprt, copied);
  466. __xprt_put_cong(xprt, req);
  467. if (timer) {
  468. if (req->rq_ntrans == 1)
  469. rpc_update_rtt(clnt->cl_rtt, timer,
  470. (long)jiffies - req->rq_xtime);
  471. rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1);
  472. }
  473. }
  474. #ifdef RPC_PROFILE
  475. /* Profile only reads for now */
  476. if (copied > 1024) {
  477. static unsigned long nextstat;
  478. static unsigned long pkt_rtt, pkt_len, pkt_cnt;
  479. pkt_cnt++;
  480. pkt_len += req->rq_slen + copied;
  481. pkt_rtt += jiffies - req->rq_xtime;
  482. if (time_before(nextstat, jiffies)) {
  483. printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
  484. printk("RPC: %ld %ld %ld %ld stat\n",
  485. jiffies, pkt_cnt, pkt_len, pkt_rtt);
  486. pkt_rtt = pkt_len = pkt_cnt = 0;
  487. nextstat = jiffies + 5 * HZ;
  488. }
  489. }
  490. #endif
  491. dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
  492. list_del_init(&req->rq_list);
  493. req->rq_received = req->rq_private_buf.len = copied;
  494. /* ... and wake up the process. */
  495. rpc_wake_up_task(task);
  496. return;
  497. }
  498. /*
  499. * RPC receive timeout handler.
  500. */
  501. static void
  502. xprt_timer(struct rpc_task *task)
  503. {
  504. struct rpc_rqst *req = task->tk_rqstp;
  505. struct rpc_xprt *xprt = req->rq_xprt;
  506. spin_lock(&xprt->transport_lock);
  507. if (req->rq_received)
  508. goto out;
  509. xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
  510. __xprt_put_cong(xprt, req);
  511. dprintk("RPC: %4d xprt_timer (%s request)\n",
  512. task->tk_pid, req ? "pending" : "backlogged");
  513. task->tk_status = -ETIMEDOUT;
  514. out:
  515. task->tk_timeout = 0;
  516. rpc_wake_up_task(task);
  517. spin_unlock(&xprt->transport_lock);
  518. }
  519. /**
  520. * xprt_prepare_transmit - reserve the transport before sending a request
  521. * @task: RPC task about to send a request
  522. *
  523. */
  524. int xprt_prepare_transmit(struct rpc_task *task)
  525. {
  526. struct rpc_rqst *req = task->tk_rqstp;
  527. struct rpc_xprt *xprt = req->rq_xprt;
  528. int err = 0;
  529. dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid);
  530. if (xprt->shutdown)
  531. return -EIO;
  532. spin_lock_bh(&xprt->transport_lock);
  533. if (req->rq_received && !req->rq_bytes_sent) {
  534. err = req->rq_received;
  535. goto out_unlock;
  536. }
  537. if (!__xprt_lock_write(xprt, task)) {
  538. err = -EAGAIN;
  539. goto out_unlock;
  540. }
  541. if (!xprt_connected(xprt)) {
  542. err = -ENOTCONN;
  543. goto out_unlock;
  544. }
  545. out_unlock:
  546. spin_unlock_bh(&xprt->transport_lock);
  547. return err;
  548. }
  549. /**
  550. * xprt_transmit - send an RPC request on a transport
  551. * @task: controlling RPC task
  552. *
  553. * We have to copy the iovec because sendmsg fiddles with its contents.
  554. */
  555. void xprt_transmit(struct rpc_task *task)
  556. {
  557. struct rpc_rqst *req = task->tk_rqstp;
  558. struct rpc_xprt *xprt = req->rq_xprt;
  559. int status;
  560. dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
  561. smp_rmb();
  562. if (!req->rq_received) {
  563. if (list_empty(&req->rq_list)) {
  564. spin_lock_bh(&xprt->transport_lock);
  565. /* Update the softirq receive buffer */
  566. memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
  567. sizeof(req->rq_private_buf));
  568. /* Add request to the receive list */
  569. list_add_tail(&req->rq_list, &xprt->recv);
  570. spin_unlock_bh(&xprt->transport_lock);
  571. xprt_reset_majortimeo(req);
  572. /* Turn off autodisconnect */
  573. del_singleshot_timer_sync(&xprt->timer);
  574. }
  575. } else if (!req->rq_bytes_sent)
  576. return;
  577. status = xprt->ops->send_request(task);
  578. if (status == 0) {
  579. dprintk("RPC: %4d xmit complete\n", task->tk_pid);
  580. spin_lock_bh(&xprt->transport_lock);
  581. xprt->ops->set_retrans_timeout(task);
  582. /* Don't race with disconnect */
  583. if (!xprt_connected(xprt))
  584. task->tk_status = -ENOTCONN;
  585. else if (!req->rq_received)
  586. rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
  587. __xprt_release_write(xprt, task);
  588. spin_unlock_bh(&xprt->transport_lock);
  589. return;
  590. }
  591. /* Note: at this point, task->tk_sleeping has not yet been set,
  592. * hence there is no danger of the waking up task being put on
  593. * schedq, and being picked up by a parallel run of rpciod().
  594. */
  595. task->tk_status = status;
  596. switch (status) {
  597. case -ECONNREFUSED:
  598. task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
  599. rpc_sleep_on(&xprt->sending, task, NULL, NULL);
  600. case -EAGAIN:
  601. case -ENOTCONN:
  602. return;
  603. default:
  604. break;
  605. }
  606. xprt_release_write(xprt, task);
  607. return;
  608. }
  609. static inline void do_xprt_reserve(struct rpc_task *task)
  610. {
  611. struct rpc_xprt *xprt = task->tk_xprt;
  612. task->tk_status = 0;
  613. if (task->tk_rqstp)
  614. return;
  615. if (!list_empty(&xprt->free)) {
  616. struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
  617. list_del_init(&req->rq_list);
  618. task->tk_rqstp = req;
  619. xprt_request_init(task, xprt);
  620. return;
  621. }
  622. dprintk("RPC: waiting for request slot\n");
  623. task->tk_status = -EAGAIN;
  624. task->tk_timeout = 0;
  625. rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
  626. }
  627. /**
  628. * xprt_reserve - allocate an RPC request slot
  629. * @task: RPC task requesting a slot allocation
  630. *
  631. * If no more slots are available, place the task on the transport's
  632. * backlog queue.
  633. */
  634. void xprt_reserve(struct rpc_task *task)
  635. {
  636. struct rpc_xprt *xprt = task->tk_xprt;
  637. task->tk_status = -EIO;
  638. if (!xprt->shutdown) {
  639. spin_lock(&xprt->reserve_lock);
  640. do_xprt_reserve(task);
  641. spin_unlock(&xprt->reserve_lock);
  642. }
  643. }
  644. static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt)
  645. {
  646. return xprt->xid++;
  647. }
  648. static inline void xprt_init_xid(struct rpc_xprt *xprt)
  649. {
  650. get_random_bytes(&xprt->xid, sizeof(xprt->xid));
  651. }
  652. static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
  653. {
  654. struct rpc_rqst *req = task->tk_rqstp;
  655. req->rq_timeout = xprt->timeout.to_initval;
  656. req->rq_task = task;
  657. req->rq_xprt = xprt;
  658. req->rq_xid = xprt_alloc_xid(xprt);
  659. dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
  660. req, ntohl(req->rq_xid));
  661. }
  662. /**
  663. * xprt_release - release an RPC request slot
  664. * @task: task which is finished with the slot
  665. *
  666. */
  667. void xprt_release(struct rpc_task *task)
  668. {
  669. struct rpc_xprt *xprt = task->tk_xprt;
  670. struct rpc_rqst *req;
  671. if (!(req = task->tk_rqstp))
  672. return;
  673. spin_lock_bh(&xprt->transport_lock);
  674. __xprt_release_write(xprt, task);
  675. __xprt_put_cong(xprt, req);
  676. if (!list_empty(&req->rq_list))
  677. list_del(&req->rq_list);
  678. xprt->last_used = jiffies;
  679. if (list_empty(&xprt->recv) && !xprt->shutdown)
  680. mod_timer(&xprt->timer,
  681. xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT);
  682. spin_unlock_bh(&xprt->transport_lock);
  683. task->tk_rqstp = NULL;
  684. memset(req, 0, sizeof(*req)); /* mark unused */
  685. dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
  686. spin_lock(&xprt->reserve_lock);
  687. list_add(&req->rq_list, &xprt->free);
  688. xprt_clear_backlog(xprt);
  689. spin_unlock(&xprt->reserve_lock);
  690. }
  691. /**
  692. * xprt_set_timeout - set constant RPC timeout
  693. * @to: RPC timeout parameters to set up
  694. * @retr: number of retries
  695. * @incr: amount of increase after each retry
  696. *
  697. */
  698. void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
  699. {
  700. to->to_initval =
  701. to->to_increment = incr;
  702. to->to_maxval = to->to_initval + (incr * retr);
  703. to->to_retries = retr;
  704. to->to_exponential = 0;
  705. }
  706. static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
  707. {
  708. int result;
  709. struct rpc_xprt *xprt;
  710. struct rpc_rqst *req;
  711. if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
  712. return ERR_PTR(-ENOMEM);
  713. memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
  714. xprt->addr = *ap;
  715. switch (proto) {
  716. case IPPROTO_UDP:
  717. result = xs_setup_udp(xprt, to);
  718. break;
  719. case IPPROTO_TCP:
  720. result = xs_setup_tcp(xprt, to);
  721. break;
  722. default:
  723. printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
  724. proto);
  725. result = -EIO;
  726. break;
  727. }
  728. if (result) {
  729. kfree(xprt);
  730. return ERR_PTR(result);
  731. }
  732. spin_lock_init(&xprt->transport_lock);
  733. spin_lock_init(&xprt->reserve_lock);
  734. init_waitqueue_head(&xprt->cong_wait);
  735. INIT_LIST_HEAD(&xprt->free);
  736. INIT_LIST_HEAD(&xprt->recv);
  737. INIT_WORK(&xprt->task_cleanup, xprt_autoclose, xprt);
  738. init_timer(&xprt->timer);
  739. xprt->timer.function = xprt_init_autodisconnect;
  740. xprt->timer.data = (unsigned long) xprt;
  741. xprt->last_used = jiffies;
  742. rpc_init_wait_queue(&xprt->pending, "xprt_pending");
  743. rpc_init_wait_queue(&xprt->sending, "xprt_sending");
  744. rpc_init_wait_queue(&xprt->resend, "xprt_resend");
  745. rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
  746. /* initialize free list */
  747. for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
  748. list_add(&req->rq_list, &xprt->free);
  749. xprt_init_xid(xprt);
  750. dprintk("RPC: created transport %p with %u slots\n", xprt,
  751. xprt->max_reqs);
  752. return xprt;
  753. }
  754. /**
  755. * xprt_create_proto - create an RPC client transport
  756. * @proto: requested transport protocol
  757. * @sap: remote peer's address
  758. * @to: timeout parameters for new transport
  759. *
  760. */
  761. struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
  762. {
  763. struct rpc_xprt *xprt;
  764. xprt = xprt_setup(proto, sap, to);
  765. if (IS_ERR(xprt))
  766. dprintk("RPC: xprt_create_proto failed\n");
  767. else
  768. dprintk("RPC: xprt_create_proto created xprt %p\n", xprt);
  769. return xprt;
  770. }
  771. static void xprt_shutdown(struct rpc_xprt *xprt)
  772. {
  773. xprt->shutdown = 1;
  774. rpc_wake_up(&xprt->sending);
  775. rpc_wake_up(&xprt->resend);
  776. xprt_wake_pending_tasks(xprt, -EIO);
  777. rpc_wake_up(&xprt->backlog);
  778. wake_up(&xprt->cong_wait);
  779. del_timer_sync(&xprt->timer);
  780. }
  781. static int xprt_clear_backlog(struct rpc_xprt *xprt) {
  782. rpc_wake_up_next(&xprt->backlog);
  783. wake_up(&xprt->cong_wait);
  784. return 1;
  785. }
  786. /**
  787. * xprt_destroy - destroy an RPC transport, killing off all requests.
  788. * @xprt: transport to destroy
  789. *
  790. */
  791. int xprt_destroy(struct rpc_xprt *xprt)
  792. {
  793. dprintk("RPC: destroying transport %p\n", xprt);
  794. xprt_shutdown(xprt);
  795. xprt->ops->destroy(xprt);
  796. kfree(xprt);
  797. return 0;
  798. }