xprt.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831
  1. /*
  2. * linux/net/sunrpc/xprt.c
  3. *
  4. * This is a generic RPC call interface supporting congestion avoidance,
  5. * and asynchronous calls.
  6. *
  7. * The interface works like this:
  8. *
  9. * - When a process places a call, it allocates a request slot if
  10. * one is available. Otherwise, it sleeps on the backlog queue
  11. * (xprt_reserve).
  12. * - Next, the caller puts together the RPC message, stuffs it into
  13. * the request struct, and calls xprt_call().
  14. * - xprt_call transmits the message and installs the caller on the
  15. * socket's wait list. At the same time, it installs a timer that
  16. * is run after the packet's timeout has expired.
  17. * - When a packet arrives, the data_ready handler walks the list of
  18. * pending requests for that socket. If a matching XID is found, the
  19. * caller is woken up, and the timer removed.
  20. * - When no reply arrives within the timeout interval, the timer is
  21. * fired by the kernel and runs xprt_timer(). It either adjusts the
  22. * timeout values (minor timeout) or wakes up the caller with a status
  23. * of -ETIMEDOUT.
  24. * - When the caller receives a notification from RPC that a reply arrived,
  25. * it should release the RPC slot, and process the reply.
  26. * If the call timed out, it may choose to retry the operation by
  27. * adjusting the initial timeout value, and simply calling rpc_call
  28. * again.
  29. *
  30. * Support for async RPC is done through a set of RPC-specific scheduling
  31. * primitives that `transparently' work for processes as well as async
  32. * tasks that rely on callbacks.
  33. *
  34. * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  35. */
  36. #include <linux/module.h>
  37. #include <linux/types.h>
  38. #include <linux/interrupt.h>
  39. #include <linux/workqueue.h>
  40. #include <linux/random.h>
  41. #include <linux/sunrpc/clnt.h>
  42. /*
  43. * Local variables
  44. */
  45. #ifdef RPC_DEBUG
  46. # undef RPC_DEBUG_DATA
  47. # define RPCDBG_FACILITY RPCDBG_XPRT
  48. #endif
  49. #define XPRT_MAX_BACKOFF (8)
  50. /*
  51. * Local functions
  52. */
  53. static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
  54. static inline void do_xprt_reserve(struct rpc_task *);
  55. static void xprt_connect_status(struct rpc_task *task);
  56. static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
  57. static int xprt_clear_backlog(struct rpc_xprt *xprt);
  58. /*
  59. * Serialize write access to sockets, in order to prevent different
  60. * requests from interfering with each other.
  61. * Also prevents TCP socket connects from colliding with writes.
  62. */
  63. static int
  64. __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
  65. {
  66. struct rpc_rqst *req = task->tk_rqstp;
  67. if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) {
  68. if (task == xprt->snd_task)
  69. return 1;
  70. goto out_sleep;
  71. }
  72. if (xprt->nocong || __xprt_get_cong(xprt, task)) {
  73. xprt->snd_task = task;
  74. if (req) {
  75. req->rq_bytes_sent = 0;
  76. req->rq_ntrans++;
  77. }
  78. return 1;
  79. }
  80. smp_mb__before_clear_bit();
  81. clear_bit(XPRT_LOCKED, &xprt->sockstate);
  82. smp_mb__after_clear_bit();
  83. out_sleep:
  84. dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt);
  85. task->tk_timeout = 0;
  86. task->tk_status = -EAGAIN;
  87. if (req && req->rq_ntrans)
  88. rpc_sleep_on(&xprt->resend, task, NULL, NULL);
  89. else
  90. rpc_sleep_on(&xprt->sending, task, NULL, NULL);
  91. return 0;
  92. }
  93. static inline int
  94. xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
  95. {
  96. int retval;
  97. spin_lock_bh(&xprt->sock_lock);
  98. retval = __xprt_lock_write(xprt, task);
  99. spin_unlock_bh(&xprt->sock_lock);
  100. return retval;
  101. }
  102. static void
  103. __xprt_lock_write_next(struct rpc_xprt *xprt)
  104. {
  105. struct rpc_task *task;
  106. if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
  107. return;
  108. if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
  109. goto out_unlock;
  110. task = rpc_wake_up_next(&xprt->resend);
  111. if (!task) {
  112. task = rpc_wake_up_next(&xprt->sending);
  113. if (!task)
  114. goto out_unlock;
  115. }
  116. if (xprt->nocong || __xprt_get_cong(xprt, task)) {
  117. struct rpc_rqst *req = task->tk_rqstp;
  118. xprt->snd_task = task;
  119. if (req) {
  120. req->rq_bytes_sent = 0;
  121. req->rq_ntrans++;
  122. }
  123. return;
  124. }
  125. out_unlock:
  126. smp_mb__before_clear_bit();
  127. clear_bit(XPRT_LOCKED, &xprt->sockstate);
  128. smp_mb__after_clear_bit();
  129. }
  130. /*
  131. * Releases the socket for use by other requests.
  132. */
  133. static void
  134. __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
  135. {
  136. if (xprt->snd_task == task) {
  137. xprt->snd_task = NULL;
  138. smp_mb__before_clear_bit();
  139. clear_bit(XPRT_LOCKED, &xprt->sockstate);
  140. smp_mb__after_clear_bit();
  141. __xprt_lock_write_next(xprt);
  142. }
  143. }
  144. static inline void
  145. xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
  146. {
  147. spin_lock_bh(&xprt->sock_lock);
  148. __xprt_release_write(xprt, task);
  149. spin_unlock_bh(&xprt->sock_lock);
  150. }
  151. /*
  152. * Van Jacobson congestion avoidance. Check if the congestion window
  153. * overflowed. Put the task to sleep if this is the case.
  154. */
  155. static int
  156. __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
  157. {
  158. struct rpc_rqst *req = task->tk_rqstp;
  159. if (req->rq_cong)
  160. return 1;
  161. dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
  162. task->tk_pid, xprt->cong, xprt->cwnd);
  163. if (RPCXPRT_CONGESTED(xprt))
  164. return 0;
  165. req->rq_cong = 1;
  166. xprt->cong += RPC_CWNDSCALE;
  167. return 1;
  168. }
  169. /*
  170. * Adjust the congestion window, and wake up the next task
  171. * that has been sleeping due to congestion
  172. */
  173. static void
  174. __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
  175. {
  176. if (!req->rq_cong)
  177. return;
  178. req->rq_cong = 0;
  179. xprt->cong -= RPC_CWNDSCALE;
  180. __xprt_lock_write_next(xprt);
  181. }
  182. /*
  183. * Adjust RPC congestion window
  184. * We use a time-smoothed congestion estimator to avoid heavy oscillation.
  185. */
  186. static void
  187. xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
  188. {
  189. unsigned long cwnd;
  190. cwnd = xprt->cwnd;
  191. if (result >= 0 && cwnd <= xprt->cong) {
  192. /* The (cwnd >> 1) term makes sure
  193. * the result gets rounded properly. */
  194. cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
  195. if (cwnd > RPC_MAXCWND(xprt))
  196. cwnd = RPC_MAXCWND(xprt);
  197. __xprt_lock_write_next(xprt);
  198. } else if (result == -ETIMEDOUT) {
  199. cwnd >>= 1;
  200. if (cwnd < RPC_CWNDSCALE)
  201. cwnd = RPC_CWNDSCALE;
  202. }
  203. dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
  204. xprt->cong, xprt->cwnd, cwnd);
  205. xprt->cwnd = cwnd;
  206. }
  207. static void xprt_reset_majortimeo(struct rpc_rqst *req)
  208. {
  209. struct rpc_timeout *to = &req->rq_xprt->timeout;
  210. req->rq_majortimeo = req->rq_timeout;
  211. if (to->to_exponential)
  212. req->rq_majortimeo <<= to->to_retries;
  213. else
  214. req->rq_majortimeo += to->to_increment * to->to_retries;
  215. if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
  216. req->rq_majortimeo = to->to_maxval;
  217. req->rq_majortimeo += jiffies;
  218. }
  219. /**
  220. * xprt_adjust_timeout - adjust timeout values for next retransmit
  221. * @req: RPC request containing parameters to use for the adjustment
  222. *
  223. */
  224. int xprt_adjust_timeout(struct rpc_rqst *req)
  225. {
  226. struct rpc_xprt *xprt = req->rq_xprt;
  227. struct rpc_timeout *to = &xprt->timeout;
  228. int status = 0;
  229. if (time_before(jiffies, req->rq_majortimeo)) {
  230. if (to->to_exponential)
  231. req->rq_timeout <<= 1;
  232. else
  233. req->rq_timeout += to->to_increment;
  234. if (to->to_maxval && req->rq_timeout >= to->to_maxval)
  235. req->rq_timeout = to->to_maxval;
  236. req->rq_retries++;
  237. pprintk("RPC: %lu retrans\n", jiffies);
  238. } else {
  239. req->rq_timeout = to->to_initval;
  240. req->rq_retries = 0;
  241. xprt_reset_majortimeo(req);
  242. /* Reset the RTT counters == "slow start" */
  243. spin_lock_bh(&xprt->sock_lock);
  244. rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
  245. spin_unlock_bh(&xprt->sock_lock);
  246. pprintk("RPC: %lu timeout\n", jiffies);
  247. status = -ETIMEDOUT;
  248. }
  249. if (req->rq_timeout == 0) {
  250. printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
  251. req->rq_timeout = 5 * HZ;
  252. }
  253. return status;
  254. }
  255. static void
  256. xprt_socket_autoclose(void *args)
  257. {
  258. struct rpc_xprt *xprt = (struct rpc_xprt *)args;
  259. xprt_disconnect(xprt);
  260. xprt->ops->close(xprt);
  261. xprt_release_write(xprt, NULL);
  262. }
  263. /**
  264. * xprt_disconnect - mark a transport as disconnected
  265. * @xprt: transport to flag for disconnect
  266. *
  267. */
  268. void xprt_disconnect(struct rpc_xprt *xprt)
  269. {
  270. dprintk("RPC: disconnected transport %p\n", xprt);
  271. spin_lock_bh(&xprt->sock_lock);
  272. xprt_clear_connected(xprt);
  273. rpc_wake_up_status(&xprt->pending, -ENOTCONN);
  274. spin_unlock_bh(&xprt->sock_lock);
  275. }
  276. static void
  277. xprt_init_autodisconnect(unsigned long data)
  278. {
  279. struct rpc_xprt *xprt = (struct rpc_xprt *)data;
  280. spin_lock(&xprt->sock_lock);
  281. if (!list_empty(&xprt->recv) || xprt->shutdown)
  282. goto out_abort;
  283. if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
  284. goto out_abort;
  285. spin_unlock(&xprt->sock_lock);
  286. /* Let keventd close the socket */
  287. if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0)
  288. xprt_release_write(xprt, NULL);
  289. else
  290. schedule_work(&xprt->task_cleanup);
  291. return;
  292. out_abort:
  293. spin_unlock(&xprt->sock_lock);
  294. }
  295. /**
  296. * xprt_connect - schedule a transport connect operation
  297. * @task: RPC task that is requesting the connect
  298. *
  299. */
  300. void xprt_connect(struct rpc_task *task)
  301. {
  302. struct rpc_xprt *xprt = task->tk_xprt;
  303. dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
  304. xprt, (xprt_connected(xprt) ? "is" : "is not"));
  305. if (xprt->shutdown) {
  306. task->tk_status = -EIO;
  307. return;
  308. }
  309. if (!xprt->addr.sin_port) {
  310. task->tk_status = -EIO;
  311. return;
  312. }
  313. if (!xprt_lock_write(xprt, task))
  314. return;
  315. if (xprt_connected(xprt))
  316. xprt_release_write(xprt, task);
  317. else {
  318. if (task->tk_rqstp)
  319. task->tk_rqstp->rq_bytes_sent = 0;
  320. task->tk_timeout = RPC_CONNECT_TIMEOUT;
  321. rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
  322. xprt->ops->connect(task);
  323. }
  324. return;
  325. }
  326. static void xprt_connect_status(struct rpc_task *task)
  327. {
  328. struct rpc_xprt *xprt = task->tk_xprt;
  329. if (task->tk_status >= 0) {
  330. dprintk("RPC: %4d xprt_connect_status: connection established\n",
  331. task->tk_pid);
  332. return;
  333. }
  334. switch (task->tk_status) {
  335. case -ECONNREFUSED:
  336. case -ECONNRESET:
  337. dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n",
  338. task->tk_pid, task->tk_client->cl_server);
  339. break;
  340. case -ENOTCONN:
  341. dprintk("RPC: %4d xprt_connect_status: connection broken\n",
  342. task->tk_pid);
  343. break;
  344. case -ETIMEDOUT:
  345. dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n",
  346. task->tk_pid);
  347. break;
  348. default:
  349. dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n",
  350. task->tk_pid, -task->tk_status, task->tk_client->cl_server);
  351. xprt_release_write(xprt, task);
  352. task->tk_status = -EIO;
  353. return;
  354. }
  355. /* if soft mounted, just cause this RPC to fail */
  356. if (RPC_IS_SOFT(task)) {
  357. xprt_release_write(xprt, task);
  358. task->tk_status = -EIO;
  359. }
  360. }
  361. /**
  362. * xprt_lookup_rqst - find an RPC request corresponding to an XID
  363. * @xprt: transport on which the original request was transmitted
  364. * @xid: RPC XID of incoming reply
  365. *
  366. */
  367. struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
  368. {
  369. struct list_head *pos;
  370. struct rpc_rqst *req = NULL;
  371. list_for_each(pos, &xprt->recv) {
  372. struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
  373. if (entry->rq_xid == xid) {
  374. req = entry;
  375. break;
  376. }
  377. }
  378. return req;
  379. }
  380. /**
  381. * xprt_complete_rqst - called when reply processing is complete
  382. * @xprt: controlling transport
  383. * @req: RPC request that just completed
  384. * @copied: actual number of bytes received from the transport
  385. *
  386. */
  387. void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
  388. {
  389. struct rpc_task *task = req->rq_task;
  390. struct rpc_clnt *clnt = task->tk_client;
  391. /* Adjust congestion window */
  392. if (!xprt->nocong) {
  393. unsigned timer = task->tk_msg.rpc_proc->p_timer;
  394. xprt_adjust_cwnd(xprt, copied);
  395. __xprt_put_cong(xprt, req);
  396. if (timer) {
  397. if (req->rq_ntrans == 1)
  398. rpc_update_rtt(clnt->cl_rtt, timer,
  399. (long)jiffies - req->rq_xtime);
  400. rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1);
  401. }
  402. }
  403. #ifdef RPC_PROFILE
  404. /* Profile only reads for now */
  405. if (copied > 1024) {
  406. static unsigned long nextstat;
  407. static unsigned long pkt_rtt, pkt_len, pkt_cnt;
  408. pkt_cnt++;
  409. pkt_len += req->rq_slen + copied;
  410. pkt_rtt += jiffies - req->rq_xtime;
  411. if (time_before(nextstat, jiffies)) {
  412. printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
  413. printk("RPC: %ld %ld %ld %ld stat\n",
  414. jiffies, pkt_cnt, pkt_len, pkt_rtt);
  415. pkt_rtt = pkt_len = pkt_cnt = 0;
  416. nextstat = jiffies + 5 * HZ;
  417. }
  418. }
  419. #endif
  420. dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
  421. list_del_init(&req->rq_list);
  422. req->rq_received = req->rq_private_buf.len = copied;
  423. /* ... and wake up the process. */
  424. rpc_wake_up_task(task);
  425. return;
  426. }
  427. /*
  428. * RPC receive timeout handler.
  429. */
  430. static void
  431. xprt_timer(struct rpc_task *task)
  432. {
  433. struct rpc_rqst *req = task->tk_rqstp;
  434. struct rpc_xprt *xprt = req->rq_xprt;
  435. spin_lock(&xprt->sock_lock);
  436. if (req->rq_received)
  437. goto out;
  438. xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
  439. __xprt_put_cong(xprt, req);
  440. dprintk("RPC: %4d xprt_timer (%s request)\n",
  441. task->tk_pid, req ? "pending" : "backlogged");
  442. task->tk_status = -ETIMEDOUT;
  443. out:
  444. task->tk_timeout = 0;
  445. rpc_wake_up_task(task);
  446. spin_unlock(&xprt->sock_lock);
  447. }
  448. /**
  449. * xprt_prepare_transmit - reserve the transport before sending a request
  450. * @task: RPC task about to send a request
  451. *
  452. */
  453. int xprt_prepare_transmit(struct rpc_task *task)
  454. {
  455. struct rpc_rqst *req = task->tk_rqstp;
  456. struct rpc_xprt *xprt = req->rq_xprt;
  457. int err = 0;
  458. dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid);
  459. if (xprt->shutdown)
  460. return -EIO;
  461. spin_lock_bh(&xprt->sock_lock);
  462. if (req->rq_received && !req->rq_bytes_sent) {
  463. err = req->rq_received;
  464. goto out_unlock;
  465. }
  466. if (!__xprt_lock_write(xprt, task)) {
  467. err = -EAGAIN;
  468. goto out_unlock;
  469. }
  470. if (!xprt_connected(xprt)) {
  471. err = -ENOTCONN;
  472. goto out_unlock;
  473. }
  474. out_unlock:
  475. spin_unlock_bh(&xprt->sock_lock);
  476. return err;
  477. }
  478. /**
  479. * xprt_transmit - send an RPC request on a transport
  480. * @task: controlling RPC task
  481. *
  482. * We have to copy the iovec because sendmsg fiddles with its contents.
  483. */
  484. void xprt_transmit(struct rpc_task *task)
  485. {
  486. struct rpc_clnt *clnt = task->tk_client;
  487. struct rpc_rqst *req = task->tk_rqstp;
  488. struct rpc_xprt *xprt = req->rq_xprt;
  489. int status;
  490. dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
  491. smp_rmb();
  492. if (!req->rq_received) {
  493. if (list_empty(&req->rq_list)) {
  494. spin_lock_bh(&xprt->sock_lock);
  495. /* Update the softirq receive buffer */
  496. memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
  497. sizeof(req->rq_private_buf));
  498. /* Add request to the receive list */
  499. list_add_tail(&req->rq_list, &xprt->recv);
  500. spin_unlock_bh(&xprt->sock_lock);
  501. xprt_reset_majortimeo(req);
  502. /* Turn off autodisconnect */
  503. del_singleshot_timer_sync(&xprt->timer);
  504. }
  505. } else if (!req->rq_bytes_sent)
  506. return;
  507. status = xprt->ops->send_request(task);
  508. if (!status)
  509. goto out_receive;
  510. /* Note: at this point, task->tk_sleeping has not yet been set,
  511. * hence there is no danger of the waking up task being put on
  512. * schedq, and being picked up by a parallel run of rpciod().
  513. */
  514. task->tk_status = status;
  515. switch (status) {
  516. case -ECONNREFUSED:
  517. task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
  518. rpc_sleep_on(&xprt->sending, task, NULL, NULL);
  519. case -EAGAIN:
  520. case -ENOTCONN:
  521. return;
  522. default:
  523. if (xprt->stream)
  524. xprt_disconnect(xprt);
  525. }
  526. xprt_release_write(xprt, task);
  527. return;
  528. out_receive:
  529. dprintk("RPC: %4d xmit complete\n", task->tk_pid);
  530. /* Set the task's receive timeout value */
  531. spin_lock_bh(&xprt->sock_lock);
  532. if (!xprt->nocong) {
  533. int timer = task->tk_msg.rpc_proc->p_timer;
  534. task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer);
  535. task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries;
  536. if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0)
  537. task->tk_timeout = xprt->timeout.to_maxval;
  538. } else
  539. task->tk_timeout = req->rq_timeout;
  540. /* Don't race with disconnect */
  541. if (!xprt_connected(xprt))
  542. task->tk_status = -ENOTCONN;
  543. else if (!req->rq_received)
  544. rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
  545. __xprt_release_write(xprt, task);
  546. spin_unlock_bh(&xprt->sock_lock);
  547. }
  548. static inline void do_xprt_reserve(struct rpc_task *task)
  549. {
  550. struct rpc_xprt *xprt = task->tk_xprt;
  551. task->tk_status = 0;
  552. if (task->tk_rqstp)
  553. return;
  554. if (!list_empty(&xprt->free)) {
  555. struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
  556. list_del_init(&req->rq_list);
  557. task->tk_rqstp = req;
  558. xprt_request_init(task, xprt);
  559. return;
  560. }
  561. dprintk("RPC: waiting for request slot\n");
  562. task->tk_status = -EAGAIN;
  563. task->tk_timeout = 0;
  564. rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
  565. }
  566. /**
  567. * xprt_reserve - allocate an RPC request slot
  568. * @task: RPC task requesting a slot allocation
  569. *
  570. * If no more slots are available, place the task on the transport's
  571. * backlog queue.
  572. */
  573. void xprt_reserve(struct rpc_task *task)
  574. {
  575. struct rpc_xprt *xprt = task->tk_xprt;
  576. task->tk_status = -EIO;
  577. if (!xprt->shutdown) {
  578. spin_lock(&xprt->xprt_lock);
  579. do_xprt_reserve(task);
  580. spin_unlock(&xprt->xprt_lock);
  581. }
  582. }
  583. static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt)
  584. {
  585. return xprt->xid++;
  586. }
  587. static inline void xprt_init_xid(struct rpc_xprt *xprt)
  588. {
  589. get_random_bytes(&xprt->xid, sizeof(xprt->xid));
  590. }
  591. static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
  592. {
  593. struct rpc_rqst *req = task->tk_rqstp;
  594. req->rq_timeout = xprt->timeout.to_initval;
  595. req->rq_task = task;
  596. req->rq_xprt = xprt;
  597. req->rq_xid = xprt_alloc_xid(xprt);
  598. dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
  599. req, ntohl(req->rq_xid));
  600. }
  601. /**
  602. * xprt_release - release an RPC request slot
  603. * @task: task which is finished with the slot
  604. *
  605. */
  606. void xprt_release(struct rpc_task *task)
  607. {
  608. struct rpc_xprt *xprt = task->tk_xprt;
  609. struct rpc_rqst *req;
  610. if (!(req = task->tk_rqstp))
  611. return;
  612. spin_lock_bh(&xprt->sock_lock);
  613. __xprt_release_write(xprt, task);
  614. __xprt_put_cong(xprt, req);
  615. if (!list_empty(&req->rq_list))
  616. list_del(&req->rq_list);
  617. xprt->last_used = jiffies;
  618. if (list_empty(&xprt->recv) && !xprt->shutdown)
  619. mod_timer(&xprt->timer,
  620. xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT);
  621. spin_unlock_bh(&xprt->sock_lock);
  622. task->tk_rqstp = NULL;
  623. memset(req, 0, sizeof(*req)); /* mark unused */
  624. dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
  625. spin_lock(&xprt->xprt_lock);
  626. list_add(&req->rq_list, &xprt->free);
  627. xprt_clear_backlog(xprt);
  628. spin_unlock(&xprt->xprt_lock);
  629. }
  630. /**
  631. * xprt_set_timeout - set constant RPC timeout
  632. * @to: RPC timeout parameters to set up
  633. * @retr: number of retries
  634. * @incr: amount of increase after each retry
  635. *
  636. */
  637. void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
  638. {
  639. to->to_initval =
  640. to->to_increment = incr;
  641. to->to_maxval = to->to_initval + (incr * retr);
  642. to->to_retries = retr;
  643. to->to_exponential = 0;
  644. }
  645. static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
  646. {
  647. int result;
  648. struct rpc_xprt *xprt;
  649. struct rpc_rqst *req;
  650. if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
  651. return ERR_PTR(-ENOMEM);
  652. memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
  653. xprt->addr = *ap;
  654. switch (proto) {
  655. case IPPROTO_UDP:
  656. result = xs_setup_udp(xprt, to);
  657. break;
  658. case IPPROTO_TCP:
  659. result = xs_setup_tcp(xprt, to);
  660. break;
  661. default:
  662. printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
  663. proto);
  664. result = -EIO;
  665. break;
  666. }
  667. if (result) {
  668. kfree(xprt);
  669. return ERR_PTR(result);
  670. }
  671. spin_lock_init(&xprt->sock_lock);
  672. spin_lock_init(&xprt->xprt_lock);
  673. init_waitqueue_head(&xprt->cong_wait);
  674. INIT_LIST_HEAD(&xprt->free);
  675. INIT_LIST_HEAD(&xprt->recv);
  676. INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt);
  677. init_timer(&xprt->timer);
  678. xprt->timer.function = xprt_init_autodisconnect;
  679. xprt->timer.data = (unsigned long) xprt;
  680. xprt->last_used = jiffies;
  681. rpc_init_wait_queue(&xprt->pending, "xprt_pending");
  682. rpc_init_wait_queue(&xprt->sending, "xprt_sending");
  683. rpc_init_wait_queue(&xprt->resend, "xprt_resend");
  684. rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
  685. /* initialize free list */
  686. for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
  687. list_add(&req->rq_list, &xprt->free);
  688. xprt_init_xid(xprt);
  689. dprintk("RPC: created transport %p with %u slots\n", xprt,
  690. xprt->max_reqs);
  691. return xprt;
  692. }
  693. /**
  694. * xprt_create_proto - create an RPC client transport
  695. * @proto: requested transport protocol
  696. * @sap: remote peer's address
  697. * @to: timeout parameters for new transport
  698. *
  699. */
  700. struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
  701. {
  702. struct rpc_xprt *xprt;
  703. xprt = xprt_setup(proto, sap, to);
  704. if (IS_ERR(xprt))
  705. dprintk("RPC: xprt_create_proto failed\n");
  706. else
  707. dprintk("RPC: xprt_create_proto created xprt %p\n", xprt);
  708. return xprt;
  709. }
  710. static void xprt_shutdown(struct rpc_xprt *xprt)
  711. {
  712. xprt->shutdown = 1;
  713. rpc_wake_up(&xprt->sending);
  714. rpc_wake_up(&xprt->resend);
  715. rpc_wake_up(&xprt->pending);
  716. rpc_wake_up(&xprt->backlog);
  717. wake_up(&xprt->cong_wait);
  718. del_timer_sync(&xprt->timer);
  719. }
  720. static int xprt_clear_backlog(struct rpc_xprt *xprt) {
  721. rpc_wake_up_next(&xprt->backlog);
  722. wake_up(&xprt->cong_wait);
  723. return 1;
  724. }
  725. /**
  726. * xprt_destroy - destroy an RPC transport, killing off all requests.
  727. * @xprt: transport to destroy
  728. *
  729. */
  730. int xprt_destroy(struct rpc_xprt *xprt)
  731. {
  732. dprintk("RPC: destroying transport %p\n", xprt);
  733. xprt_shutdown(xprt);
  734. xprt->ops->destroy(xprt);
  735. kfree(xprt);
  736. return 0;
  737. }