lowcomms-tcp.c 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263
  1. /******************************************************************************
  2. *******************************************************************************
  3. **
  4. ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
  5. ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
  6. **
  7. ** This copyrighted material is made available to anyone wishing to use,
  8. ** modify, copy, or redistribute it subject to the terms and conditions
  9. ** of the GNU General Public License v.2.
  10. **
  11. *******************************************************************************
  12. ******************************************************************************/
  13. /*
  14. * lowcomms.c
  15. *
  16. * This is the "low-level" comms layer.
  17. *
  18. * It is responsible for sending/receiving messages
  19. * from other nodes in the cluster.
  20. *
  21. * Cluster nodes are referred to by their nodeids. nodeids are
  22. * simply 32 bit numbers to the locking module - if they need to
  23. * be expanded for the cluster infrastructure then that is it's
  24. * responsibility. It is this layer's
  25. * responsibility to resolve these into IP address or
  26. * whatever it needs for inter-node communication.
  27. *
  28. * The comms level is two kernel threads that deal mainly with
  29. * the receiving of messages from other nodes and passing them
  30. * up to the mid-level comms layer (which understands the
  31. * message format) for execution by the locking core, and
  32. * a send thread which does all the setting up of connections
  33. * to remote nodes and the sending of data. Threads are not allowed
  34. * to send their own data because it may cause them to wait in times
  35. * of high load. Also, this way, the sending thread can collect together
  36. * messages bound for one node and send them in one block.
  37. *
  38. * I don't see any problem with the recv thread executing the locking
  39. * code on behalf of remote processes as the locking code is
  40. * short, efficient and never waits.
  41. *
  42. */
  43. #include <asm/ioctls.h>
  44. #include <net/sock.h>
  45. #include <net/tcp.h>
  46. #include <linux/pagemap.h>
  47. #include "dlm_internal.h"
  48. #include "lowcomms.h"
  49. #include "midcomms.h"
  50. #include "config.h"
  51. struct cbuf {
  52. unsigned base;
  53. unsigned len;
  54. unsigned mask;
  55. };
  56. #ifndef FALSE
  57. #define FALSE 0
  58. #define TRUE 1
  59. #endif
  60. #define NODE_INCREMENT 32
  61. #define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0)
  62. #define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
  63. #define CBUF_EMPTY(cb) ((cb)->len == 0)
  64. #define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
  65. #define CBUF_EAT(cb, n) do { (cb)->len -= (n); \
  66. (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0)
  67. #define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
  68. /* Maximum number of incoming messages to process before
  69. doing a schedule()
  70. */
  71. #define MAX_RX_MSG_COUNT 25
  72. struct connection {
  73. struct socket *sock; /* NULL if not connected */
  74. uint32_t nodeid; /* So we know who we are in the list */
  75. struct rw_semaphore sock_sem; /* Stop connect races */
  76. struct list_head read_list; /* On this list when ready for reading */
  77. struct list_head write_list; /* On this list when ready for writing */
  78. struct list_head state_list; /* On this list when ready to connect */
  79. unsigned long flags; /* bit 1,2 = We are on the read/write lists */
  80. #define CF_READ_PENDING 1
  81. #define CF_WRITE_PENDING 2
  82. #define CF_CONNECT_PENDING 3
  83. #define CF_IS_OTHERCON 4
  84. struct list_head writequeue; /* List of outgoing writequeue_entries */
  85. struct list_head listenlist; /* List of allocated listening sockets */
  86. spinlock_t writequeue_lock;
  87. int (*rx_action) (struct connection *); /* What to do when active */
  88. struct page *rx_page;
  89. struct cbuf cb;
  90. int retries;
  91. atomic_t waiting_requests;
  92. #define MAX_CONNECT_RETRIES 3
  93. struct connection *othercon;
  94. };
  95. #define sock2con(x) ((struct connection *)(x)->sk_user_data)
  96. /* An entry waiting to be sent */
  97. struct writequeue_entry {
  98. struct list_head list;
  99. struct page *page;
  100. int offset;
  101. int len;
  102. int end;
  103. int users;
  104. struct connection *con;
  105. };
  106. static struct sockaddr_storage dlm_local_addr;
  107. /* Manage daemons */
  108. static struct task_struct *recv_task;
  109. static struct task_struct *send_task;
  110. static wait_queue_t lowcomms_send_waitq_head;
  111. static wait_queue_head_t lowcomms_send_waitq;
  112. static wait_queue_t lowcomms_recv_waitq_head;
  113. static wait_queue_head_t lowcomms_recv_waitq;
  114. /* An array of pointers to connections, indexed by NODEID */
  115. static struct connection **connections;
  116. static struct semaphore connections_lock;
  117. static kmem_cache_t *con_cache;
  118. static int conn_array_size;
  119. static atomic_t accepting;
  120. /* List of sockets that have reads pending */
  121. static struct list_head read_sockets;
  122. static spinlock_t read_sockets_lock;
  123. /* List of sockets which have writes pending */
  124. static struct list_head write_sockets;
  125. static spinlock_t write_sockets_lock;
  126. /* List of sockets which have connects pending */
  127. static struct list_head state_sockets;
  128. static spinlock_t state_sockets_lock;
  129. static struct connection *nodeid2con(int nodeid, gfp_t allocation)
  130. {
  131. struct connection *con = NULL;
  132. down(&connections_lock);
  133. if (nodeid >= conn_array_size) {
  134. int new_size = nodeid + NODE_INCREMENT;
  135. struct connection **new_conns;
  136. new_conns = kmalloc(sizeof(struct connection *) *
  137. new_size, allocation);
  138. if (!new_conns)
  139. goto finish;
  140. memset(new_conns, 0, sizeof(struct connection *) * new_size);
  141. memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
  142. conn_array_size = new_size;
  143. kfree(connections);
  144. connections = new_conns;
  145. }
  146. con = connections[nodeid];
  147. if (con == NULL && allocation) {
  148. con = kmem_cache_alloc(con_cache, allocation);
  149. if (!con)
  150. goto finish;
  151. memset(con, 0, sizeof(*con));
  152. con->nodeid = nodeid;
  153. init_rwsem(&con->sock_sem);
  154. INIT_LIST_HEAD(&con->writequeue);
  155. spin_lock_init(&con->writequeue_lock);
  156. connections[nodeid] = con;
  157. }
  158. finish:
  159. up(&connections_lock);
  160. return con;
  161. }
  162. /* Data available on socket or listen socket received a connect */
  163. static void lowcomms_data_ready(struct sock *sk, int count_unused)
  164. {
  165. struct connection *con = sock2con(sk);
  166. atomic_inc(&con->waiting_requests);
  167. if (test_and_set_bit(CF_READ_PENDING, &con->flags))
  168. return;
  169. spin_lock_bh(&read_sockets_lock);
  170. list_add_tail(&con->read_list, &read_sockets);
  171. spin_unlock_bh(&read_sockets_lock);
  172. wake_up_interruptible(&lowcomms_recv_waitq);
  173. }
  174. static void lowcomms_write_space(struct sock *sk)
  175. {
  176. struct connection *con = sock2con(sk);
  177. if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
  178. return;
  179. spin_lock_bh(&write_sockets_lock);
  180. list_add_tail(&con->write_list, &write_sockets);
  181. spin_unlock_bh(&write_sockets_lock);
  182. wake_up_interruptible(&lowcomms_send_waitq);
  183. }
  184. static inline void lowcomms_connect_sock(struct connection *con)
  185. {
  186. if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
  187. return;
  188. if (!atomic_read(&accepting))
  189. return;
  190. spin_lock_bh(&state_sockets_lock);
  191. list_add_tail(&con->state_list, &state_sockets);
  192. spin_unlock_bh(&state_sockets_lock);
  193. wake_up_interruptible(&lowcomms_send_waitq);
  194. }
  195. static void lowcomms_state_change(struct sock *sk)
  196. {
  197. /* struct connection *con = sock2con(sk); */
  198. switch (sk->sk_state) {
  199. case TCP_ESTABLISHED:
  200. lowcomms_write_space(sk);
  201. break;
  202. case TCP_FIN_WAIT1:
  203. case TCP_FIN_WAIT2:
  204. case TCP_TIME_WAIT:
  205. case TCP_CLOSE:
  206. case TCP_CLOSE_WAIT:
  207. case TCP_LAST_ACK:
  208. case TCP_CLOSING:
  209. /* FIXME: I think this causes more trouble than it solves.
  210. lowcomms wil reconnect anyway when there is something to
  211. send. This just attempts reconnection if a node goes down!
  212. */
  213. /* lowcomms_connect_sock(con); */
  214. break;
  215. default:
  216. printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state);
  217. break;
  218. }
  219. }
  220. /* Make a socket active */
  221. static int add_sock(struct socket *sock, struct connection *con)
  222. {
  223. con->sock = sock;
  224. /* Install a data_ready callback */
  225. con->sock->sk->sk_data_ready = lowcomms_data_ready;
  226. con->sock->sk->sk_write_space = lowcomms_write_space;
  227. con->sock->sk->sk_state_change = lowcomms_state_change;
  228. return 0;
  229. }
  230. /* Add the port number to an IP6 or 4 sockaddr and return the address
  231. length */
  232. static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
  233. int *addr_len)
  234. {
  235. saddr->ss_family = dlm_local_addr.ss_family;
  236. if (saddr->ss_family == AF_INET) {
  237. struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
  238. in4_addr->sin_port = cpu_to_be16(port);
  239. *addr_len = sizeof(struct sockaddr_in);
  240. }
  241. else {
  242. struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
  243. in6_addr->sin6_port = cpu_to_be16(port);
  244. *addr_len = sizeof(struct sockaddr_in6);
  245. }
  246. }
  247. /* Close a remote connection and tidy up */
  248. static void close_connection(struct connection *con, int and_other)
  249. {
  250. down_write(&con->sock_sem);
  251. if (con->sock) {
  252. sock_release(con->sock);
  253. con->sock = NULL;
  254. }
  255. if (con->othercon && and_other) {
  256. /* Argh! recursion in kernel code!
  257. Actually, this isn't a list so it
  258. will only re-enter once.
  259. */
  260. close_connection(con->othercon, FALSE);
  261. }
  262. if (con->rx_page) {
  263. __free_page(con->rx_page);
  264. con->rx_page = NULL;
  265. }
  266. con->retries = 0;
  267. up_write(&con->sock_sem);
  268. }
  269. /* Data received from remote end */
  270. static int receive_from_sock(struct connection *con)
  271. {
  272. int ret = 0;
  273. struct msghdr msg;
  274. struct iovec iov[2];
  275. mm_segment_t fs;
  276. unsigned len;
  277. int r;
  278. int call_again_soon = 0;
  279. down_read(&con->sock_sem);
  280. if (con->sock == NULL)
  281. goto out;
  282. if (con->rx_page == NULL) {
  283. /*
  284. * This doesn't need to be atomic, but I think it should
  285. * improve performance if it is.
  286. */
  287. con->rx_page = alloc_page(GFP_ATOMIC);
  288. if (con->rx_page == NULL)
  289. goto out_resched;
  290. CBUF_INIT(&con->cb, PAGE_CACHE_SIZE);
  291. }
  292. msg.msg_control = NULL;
  293. msg.msg_controllen = 0;
  294. msg.msg_iovlen = 1;
  295. msg.msg_iov = iov;
  296. msg.msg_name = NULL;
  297. msg.msg_namelen = 0;
  298. msg.msg_flags = 0;
  299. /*
  300. * iov[0] is the bit of the circular buffer between the current end
  301. * point (cb.base + cb.len) and the end of the buffer.
  302. */
  303. iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb);
  304. iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb);
  305. iov[1].iov_len = 0;
  306. /*
  307. * iov[1] is the bit of the circular buffer between the start of the
  308. * buffer and the start of the currently used section (cb.base)
  309. */
  310. if (CBUF_DATA(&con->cb) >= con->cb.base) {
  311. iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb);
  312. iov[1].iov_len = con->cb.base;
  313. iov[1].iov_base = page_address(con->rx_page);
  314. msg.msg_iovlen = 2;
  315. }
  316. len = iov[0].iov_len + iov[1].iov_len;
  317. fs = get_fs();
  318. set_fs(get_ds());
  319. r = ret = sock_recvmsg(con->sock, &msg, len,
  320. MSG_DONTWAIT | MSG_NOSIGNAL);
  321. set_fs(fs);
  322. if (ret <= 0)
  323. goto out_close;
  324. if (ret == len)
  325. call_again_soon = 1;
  326. CBUF_ADD(&con->cb, ret);
  327. ret = dlm_process_incoming_buffer(con->nodeid,
  328. page_address(con->rx_page),
  329. con->cb.base, con->cb.len,
  330. PAGE_CACHE_SIZE);
  331. if (ret == -EBADMSG) {
  332. printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
  333. "iov_len=%u, iov_base[0]=%p, read=%d\n",
  334. page_address(con->rx_page), con->cb.base, con->cb.len,
  335. len, iov[0].iov_base, r);
  336. }
  337. if (ret < 0)
  338. goto out_close;
  339. CBUF_EAT(&con->cb, ret);
  340. if (CBUF_EMPTY(&con->cb) && !call_again_soon) {
  341. __free_page(con->rx_page);
  342. con->rx_page = NULL;
  343. }
  344. out:
  345. if (call_again_soon)
  346. goto out_resched;
  347. up_read(&con->sock_sem);
  348. ret = 0;
  349. goto out_ret;
  350. out_resched:
  351. lowcomms_data_ready(con->sock->sk, 0);
  352. up_read(&con->sock_sem);
  353. ret = 0;
  354. schedule();
  355. goto out_ret;
  356. out_close:
  357. up_read(&con->sock_sem);
  358. if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
  359. close_connection(con, FALSE);
  360. /* Reconnect when there is something to send */
  361. }
  362. out_ret:
  363. return ret;
  364. }
  365. /* Listening socket is busy, accept a connection */
  366. static int accept_from_sock(struct connection *con)
  367. {
  368. int result;
  369. struct sockaddr_storage peeraddr;
  370. struct socket *newsock;
  371. int len;
  372. int nodeid;
  373. struct connection *newcon;
  374. memset(&peeraddr, 0, sizeof(peeraddr));
  375. result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock);
  376. if (result < 0)
  377. return -ENOMEM;
  378. down_read(&con->sock_sem);
  379. result = -ENOTCONN;
  380. if (con->sock == NULL)
  381. goto accept_err;
  382. newsock->type = con->sock->type;
  383. newsock->ops = con->sock->ops;
  384. result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
  385. if (result < 0)
  386. goto accept_err;
  387. /* Get the connected socket's peer */
  388. memset(&peeraddr, 0, sizeof(peeraddr));
  389. if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
  390. &len, 2)) {
  391. result = -ECONNABORTED;
  392. goto accept_err;
  393. }
  394. /* Get the new node's NODEID */
  395. make_sockaddr(&peeraddr, 0, &len);
  396. if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
  397. printk("dlm: connect from non cluster node\n");
  398. sock_release(newsock);
  399. up_read(&con->sock_sem);
  400. return -1;
  401. }
  402. log_print("got connection from %d", nodeid);
  403. /* Check to see if we already have a connection to this node. This
  404. * could happen if the two nodes initiate a connection at roughly
  405. * the same time and the connections cross on the wire.
  406. * TEMPORARY FIX:
  407. * In this case we store the incoming one in "othercon"
  408. */
  409. newcon = nodeid2con(nodeid, GFP_KERNEL);
  410. if (!newcon) {
  411. result = -ENOMEM;
  412. goto accept_err;
  413. }
  414. down_write(&newcon->sock_sem);
  415. if (newcon->sock) {
  416. struct connection *othercon = newcon->othercon;
  417. if (!othercon) {
  418. othercon = kmem_cache_alloc(con_cache, GFP_KERNEL);
  419. if (!othercon) {
  420. printk("dlm: failed to allocate incoming socket\n");
  421. up_write(&newcon->sock_sem);
  422. result = -ENOMEM;
  423. goto accept_err;
  424. }
  425. memset(othercon, 0, sizeof(*othercon));
  426. othercon->nodeid = nodeid;
  427. othercon->rx_action = receive_from_sock;
  428. init_rwsem(&othercon->sock_sem);
  429. set_bit(CF_IS_OTHERCON, &othercon->flags);
  430. newcon->othercon = othercon;
  431. }
  432. othercon->sock = newsock;
  433. newsock->sk->sk_user_data = othercon;
  434. add_sock(newsock, othercon);
  435. }
  436. else {
  437. newsock->sk->sk_user_data = newcon;
  438. newcon->rx_action = receive_from_sock;
  439. add_sock(newsock, newcon);
  440. }
  441. up_write(&newcon->sock_sem);
  442. /*
  443. * Add it to the active queue in case we got data
  444. * beween processing the accept adding the socket
  445. * to the read_sockets list
  446. */
  447. lowcomms_data_ready(newsock->sk, 0);
  448. up_read(&con->sock_sem);
  449. return 0;
  450. accept_err:
  451. up_read(&con->sock_sem);
  452. sock_release(newsock);
  453. if (result != -EAGAIN)
  454. printk("dlm: error accepting connection from node: %d\n", result);
  455. return result;
  456. }
  457. /* Connect a new socket to its peer */
  458. static int connect_to_sock(struct connection *con)
  459. {
  460. int result = -EHOSTUNREACH;
  461. struct sockaddr_storage saddr;
  462. int addr_len;
  463. struct socket *sock;
  464. if (con->nodeid == 0) {
  465. log_print("attempt to connect sock 0 foiled");
  466. return 0;
  467. }
  468. down_write(&con->sock_sem);
  469. if (con->retries++ > MAX_CONNECT_RETRIES)
  470. goto out;
  471. /* Some odd races can cause double-connects, ignore them */
  472. if (con->sock) {
  473. result = 0;
  474. goto out;
  475. }
  476. /* Create a socket to communicate with */
  477. result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
  478. if (result < 0)
  479. goto out_err;
  480. memset(&saddr, 0, sizeof(saddr));
  481. if (dlm_nodeid_to_addr(con->nodeid, &saddr))
  482. goto out_err;
  483. sock->sk->sk_user_data = con;
  484. con->rx_action = receive_from_sock;
  485. make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len);
  486. add_sock(sock, con);
  487. log_print("connecting to %d", con->nodeid);
  488. result =
  489. sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
  490. O_NONBLOCK);
  491. if (result == -EINPROGRESS)
  492. result = 0;
  493. if (result != 0)
  494. goto out_err;
  495. out:
  496. up_write(&con->sock_sem);
  497. /*
  498. * Returning an error here means we've given up trying to connect to
  499. * a remote node, otherwise we return 0 and reschedule the connetion
  500. * attempt
  501. */
  502. return result;
  503. out_err:
  504. if (con->sock) {
  505. sock_release(con->sock);
  506. con->sock = NULL;
  507. }
  508. /*
  509. * Some errors are fatal and this list might need adjusting. For other
  510. * errors we try again until the max number of retries is reached.
  511. */
  512. if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
  513. result != -ENETDOWN && result != EINVAL
  514. && result != -EPROTONOSUPPORT) {
  515. lowcomms_connect_sock(con);
  516. result = 0;
  517. }
  518. goto out;
  519. }
  520. static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr)
  521. {
  522. struct socket *sock = NULL;
  523. mm_segment_t fs;
  524. int result = 0;
  525. int one = 1;
  526. int addr_len;
  527. if (dlm_local_addr.ss_family == AF_INET)
  528. addr_len = sizeof(struct sockaddr_in);
  529. else
  530. addr_len = sizeof(struct sockaddr_in6);
  531. /* Create a socket to communicate with */
  532. result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
  533. if (result < 0) {
  534. printk("dlm: Can't create listening comms socket\n");
  535. goto create_out;
  536. }
  537. fs = get_fs();
  538. set_fs(get_ds());
  539. result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
  540. set_fs(fs);
  541. if (result < 0) {
  542. printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result);
  543. }
  544. sock->sk->sk_user_data = con;
  545. con->rx_action = accept_from_sock;
  546. con->sock = sock;
  547. /* Bind to our port */
  548. make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
  549. result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
  550. if (result < 0) {
  551. printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
  552. sock_release(sock);
  553. sock = NULL;
  554. con->sock = NULL;
  555. goto create_out;
  556. }
  557. fs = get_fs();
  558. set_fs(get_ds());
  559. result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one));
  560. set_fs(fs);
  561. if (result < 0) {
  562. printk("dlm: Set keepalive failed: %d\n", result);
  563. }
  564. result = sock->ops->listen(sock, 5);
  565. if (result < 0) {
  566. printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
  567. sock_release(sock);
  568. sock = NULL;
  569. goto create_out;
  570. }
  571. create_out:
  572. return sock;
  573. }
  574. /* Listen on all interfaces */
  575. static int listen_for_all(void)
  576. {
  577. struct socket *sock = NULL;
  578. struct connection *con = nodeid2con(0, GFP_KERNEL);
  579. int result = -EINVAL;
  580. /* We don't support multi-homed hosts */
  581. memset(con, 0, sizeof(*con));
  582. init_rwsem(&con->sock_sem);
  583. spin_lock_init(&con->writequeue_lock);
  584. INIT_LIST_HEAD(&con->writequeue);
  585. set_bit(CF_IS_OTHERCON, &con->flags);
  586. sock = create_listen_sock(con, &dlm_local_addr);
  587. if (sock) {
  588. add_sock(sock, con);
  589. result = 0;
  590. }
  591. else {
  592. result = -EADDRINUSE;
  593. }
  594. return result;
  595. }
  596. static struct writequeue_entry *new_writequeue_entry(struct connection *con,
  597. gfp_t allocation)
  598. {
  599. struct writequeue_entry *entry;
  600. entry = kmalloc(sizeof(struct writequeue_entry), allocation);
  601. if (!entry)
  602. return NULL;
  603. entry->page = alloc_page(allocation);
  604. if (!entry->page) {
  605. kfree(entry);
  606. return NULL;
  607. }
  608. entry->offset = 0;
  609. entry->len = 0;
  610. entry->end = 0;
  611. entry->users = 0;
  612. entry->con = con;
  613. return entry;
  614. }
  615. void *dlm_lowcomms_get_buffer(int nodeid, int len,
  616. gfp_t allocation, char **ppc)
  617. {
  618. struct connection *con;
  619. struct writequeue_entry *e;
  620. int offset = 0;
  621. int users = 0;
  622. if (!atomic_read(&accepting))
  623. return NULL;
  624. con = nodeid2con(nodeid, allocation);
  625. if (!con)
  626. return NULL;
  627. spin_lock(&con->writequeue_lock);
  628. e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
  629. if (((struct list_head *) e == &con->writequeue) ||
  630. (PAGE_CACHE_SIZE - e->end < len)) {
  631. e = NULL;
  632. } else {
  633. offset = e->end;
  634. e->end += len;
  635. users = e->users++;
  636. }
  637. spin_unlock(&con->writequeue_lock);
  638. if (e) {
  639. got_one:
  640. if (users == 0)
  641. kmap(e->page);
  642. *ppc = page_address(e->page) + offset;
  643. return e;
  644. }
  645. e = new_writequeue_entry(con, allocation);
  646. if (e) {
  647. spin_lock(&con->writequeue_lock);
  648. offset = e->end;
  649. e->end += len;
  650. users = e->users++;
  651. list_add_tail(&e->list, &con->writequeue);
  652. spin_unlock(&con->writequeue_lock);
  653. goto got_one;
  654. }
  655. return NULL;
  656. }
  657. void dlm_lowcomms_commit_buffer(void *mh)
  658. {
  659. struct writequeue_entry *e = (struct writequeue_entry *)mh;
  660. struct connection *con = e->con;
  661. int users;
  662. if (!atomic_read(&accepting))
  663. return;
  664. spin_lock(&con->writequeue_lock);
  665. users = --e->users;
  666. if (users)
  667. goto out;
  668. e->len = e->end - e->offset;
  669. kunmap(e->page);
  670. spin_unlock(&con->writequeue_lock);
  671. if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
  672. spin_lock_bh(&write_sockets_lock);
  673. list_add_tail(&con->write_list, &write_sockets);
  674. spin_unlock_bh(&write_sockets_lock);
  675. wake_up_interruptible(&lowcomms_send_waitq);
  676. }
  677. return;
  678. out:
  679. spin_unlock(&con->writequeue_lock);
  680. return;
  681. }
  682. static void free_entry(struct writequeue_entry *e)
  683. {
  684. __free_page(e->page);
  685. kfree(e);
  686. }
  687. /* Send a message */
  688. static int send_to_sock(struct connection *con)
  689. {
  690. int ret = 0;
  691. ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
  692. const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
  693. struct writequeue_entry *e;
  694. int len, offset;
  695. down_read(&con->sock_sem);
  696. if (con->sock == NULL)
  697. goto out_connect;
  698. sendpage = con->sock->ops->sendpage;
  699. spin_lock(&con->writequeue_lock);
  700. for (;;) {
  701. e = list_entry(con->writequeue.next, struct writequeue_entry,
  702. list);
  703. if ((struct list_head *) e == &con->writequeue)
  704. break;
  705. len = e->len;
  706. offset = e->offset;
  707. BUG_ON(len == 0 && e->users == 0);
  708. spin_unlock(&con->writequeue_lock);
  709. ret = 0;
  710. if (len) {
  711. ret = sendpage(con->sock, e->page, offset, len,
  712. msg_flags);
  713. if (ret == -EAGAIN || ret == 0)
  714. goto out;
  715. if (ret <= 0)
  716. goto send_error;
  717. }
  718. else {
  719. /* Don't starve people filling buffers */
  720. schedule();
  721. }
  722. spin_lock(&con->writequeue_lock);
  723. e->offset += ret;
  724. e->len -= ret;
  725. if (e->len == 0 && e->users == 0) {
  726. list_del(&e->list);
  727. free_entry(e);
  728. continue;
  729. }
  730. }
  731. spin_unlock(&con->writequeue_lock);
  732. out:
  733. up_read(&con->sock_sem);
  734. return ret;
  735. send_error:
  736. up_read(&con->sock_sem);
  737. close_connection(con, FALSE);
  738. lowcomms_connect_sock(con);
  739. return ret;
  740. out_connect:
  741. up_read(&con->sock_sem);
  742. lowcomms_connect_sock(con);
  743. return 0;
  744. }
  745. static void clean_one_writequeue(struct connection *con)
  746. {
  747. struct list_head *list;
  748. struct list_head *temp;
  749. spin_lock(&con->writequeue_lock);
  750. list_for_each_safe(list, temp, &con->writequeue) {
  751. struct writequeue_entry *e =
  752. list_entry(list, struct writequeue_entry, list);
  753. list_del(&e->list);
  754. free_entry(e);
  755. }
  756. spin_unlock(&con->writequeue_lock);
  757. }
  758. /* Called from recovery when it knows that a node has
  759. left the cluster */
  760. int dlm_lowcomms_close(int nodeid)
  761. {
  762. struct connection *con;
  763. if (!connections)
  764. goto out;
  765. log_print("closing connection to node %d", nodeid);
  766. con = nodeid2con(nodeid, 0);
  767. if (con) {
  768. clean_one_writequeue(con);
  769. close_connection(con, TRUE);
  770. atomic_set(&con->waiting_requests, 0);
  771. }
  772. return 0;
  773. out:
  774. return -1;
  775. }
  776. /* API send message call, may queue the request */
  777. /* N.B. This is the old interface - use the new one for new calls */
  778. int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
  779. {
  780. struct writequeue_entry *e;
  781. char *b;
  782. e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
  783. if (e) {
  784. memcpy(b, buf, len);
  785. dlm_lowcomms_commit_buffer(e);
  786. return 0;
  787. }
  788. return -ENOBUFS;
  789. }
  790. /* Look for activity on active sockets */
  791. static void process_sockets(void)
  792. {
  793. struct list_head *list;
  794. struct list_head *temp;
  795. int count = 0;
  796. spin_lock_bh(&read_sockets_lock);
  797. list_for_each_safe(list, temp, &read_sockets) {
  798. struct connection *con =
  799. list_entry(list, struct connection, read_list);
  800. list_del(&con->read_list);
  801. clear_bit(CF_READ_PENDING, &con->flags);
  802. spin_unlock_bh(&read_sockets_lock);
  803. /* This can reach zero if we are processing requests
  804. * as they come in.
  805. */
  806. if (atomic_read(&con->waiting_requests) == 0) {
  807. spin_lock_bh(&read_sockets_lock);
  808. continue;
  809. }
  810. do {
  811. con->rx_action(con);
  812. /* Don't starve out everyone else */
  813. if (++count >= MAX_RX_MSG_COUNT) {
  814. schedule();
  815. count = 0;
  816. }
  817. } while (!atomic_dec_and_test(&con->waiting_requests) &&
  818. !kthread_should_stop());
  819. spin_lock_bh(&read_sockets_lock);
  820. }
  821. spin_unlock_bh(&read_sockets_lock);
  822. }
  823. /* Try to send any messages that are pending
  824. */
  825. static void process_output_queue(void)
  826. {
  827. struct list_head *list;
  828. struct list_head *temp;
  829. int ret;
  830. spin_lock_bh(&write_sockets_lock);
  831. list_for_each_safe(list, temp, &write_sockets) {
  832. struct connection *con =
  833. list_entry(list, struct connection, write_list);
  834. clear_bit(CF_WRITE_PENDING, &con->flags);
  835. list_del(&con->write_list);
  836. spin_unlock_bh(&write_sockets_lock);
  837. ret = send_to_sock(con);
  838. if (ret < 0) {
  839. }
  840. spin_lock_bh(&write_sockets_lock);
  841. }
  842. spin_unlock_bh(&write_sockets_lock);
  843. }
  844. static void process_state_queue(void)
  845. {
  846. struct list_head *list;
  847. struct list_head *temp;
  848. int ret;
  849. spin_lock_bh(&state_sockets_lock);
  850. list_for_each_safe(list, temp, &state_sockets) {
  851. struct connection *con =
  852. list_entry(list, struct connection, state_list);
  853. list_del(&con->state_list);
  854. clear_bit(CF_CONNECT_PENDING, &con->flags);
  855. spin_unlock_bh(&state_sockets_lock);
  856. ret = connect_to_sock(con);
  857. if (ret < 0) {
  858. }
  859. spin_lock_bh(&state_sockets_lock);
  860. }
  861. spin_unlock_bh(&state_sockets_lock);
  862. }
  863. /* Discard all entries on the write queues */
  864. static void clean_writequeues(void)
  865. {
  866. int nodeid;
  867. for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
  868. struct connection *con = nodeid2con(nodeid, 0);
  869. if (con)
  870. clean_one_writequeue(con);
  871. }
  872. }
  873. static int read_list_empty(void)
  874. {
  875. int status;
  876. spin_lock_bh(&read_sockets_lock);
  877. status = list_empty(&read_sockets);
  878. spin_unlock_bh(&read_sockets_lock);
  879. return status;
  880. }
  881. /* DLM Transport comms receive daemon */
  882. static int dlm_recvd(void *data)
  883. {
  884. init_waitqueue_head(&lowcomms_recv_waitq);
  885. init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
  886. add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
  887. while (!kthread_should_stop()) {
  888. set_current_state(TASK_INTERRUPTIBLE);
  889. if (read_list_empty())
  890. schedule();
  891. set_current_state(TASK_RUNNING);
  892. process_sockets();
  893. }
  894. return 0;
  895. }
  896. static int write_and_state_lists_empty(void)
  897. {
  898. int status;
  899. spin_lock_bh(&write_sockets_lock);
  900. status = list_empty(&write_sockets);
  901. spin_unlock_bh(&write_sockets_lock);
  902. spin_lock_bh(&state_sockets_lock);
  903. if (list_empty(&state_sockets) == 0)
  904. status = 0;
  905. spin_unlock_bh(&state_sockets_lock);
  906. return status;
  907. }
  908. /* DLM Transport send daemon */
  909. static int dlm_sendd(void *data)
  910. {
  911. init_waitqueue_head(&lowcomms_send_waitq);
  912. init_waitqueue_entry(&lowcomms_send_waitq_head, current);
  913. add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
  914. while (!kthread_should_stop()) {
  915. set_current_state(TASK_INTERRUPTIBLE);
  916. if (write_and_state_lists_empty())
  917. schedule();
  918. set_current_state(TASK_RUNNING);
  919. process_state_queue();
  920. process_output_queue();
  921. }
  922. return 0;
  923. }
  924. static void daemons_stop(void)
  925. {
  926. kthread_stop(recv_task);
  927. kthread_stop(send_task);
  928. }
  929. static int daemons_start(void)
  930. {
  931. struct task_struct *p;
  932. int error;
  933. p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
  934. error = IS_ERR(p);
  935. if (error) {
  936. log_print("can't start dlm_recvd %d", error);
  937. return error;
  938. }
  939. recv_task = p;
  940. p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
  941. error = IS_ERR(p);
  942. if (error) {
  943. log_print("can't start dlm_sendd %d", error);
  944. kthread_stop(recv_task);
  945. return error;
  946. }
  947. send_task = p;
  948. return 0;
  949. }
  950. /*
  951. * Return the largest buffer size we can cope with.
  952. */
  953. int lowcomms_max_buffer_size(void)
  954. {
  955. return PAGE_CACHE_SIZE;
  956. }
  957. void dlm_lowcomms_stop(void)
  958. {
  959. int i;
  960. atomic_set(&accepting, 0);
  961. /* Set all the activity flags to prevent any
  962. socket activity.
  963. */
  964. for (i = 0; i < conn_array_size; i++) {
  965. if (connections[i])
  966. connections[i]->flags |= 0x7;
  967. }
  968. daemons_stop();
  969. clean_writequeues();
  970. for (i = 0; i < conn_array_size; i++) {
  971. if (connections[i]) {
  972. close_connection(connections[i], TRUE);
  973. if (connections[i]->othercon)
  974. kmem_cache_free(con_cache, connections[i]->othercon);
  975. kmem_cache_free(con_cache, connections[i]);
  976. }
  977. }
  978. kfree(connections);
  979. connections = NULL;
  980. kmem_cache_destroy(con_cache);
  981. }
  982. /* This is quite likely to sleep... */
  983. int dlm_lowcomms_start(void)
  984. {
  985. int error = 0;
  986. error = -ENOTCONN;
  987. /*
  988. * Temporarily initialise the waitq head so that lowcomms_send_message
  989. * doesn't crash if it gets called before the thread is fully
  990. * initialised
  991. */
  992. init_waitqueue_head(&lowcomms_send_waitq);
  993. error = -ENOMEM;
  994. connections = kmalloc(sizeof(struct connection *) *
  995. NODE_INCREMENT, GFP_KERNEL);
  996. if (!connections)
  997. goto out;
  998. memset(connections, 0,
  999. sizeof(struct connection *) * NODE_INCREMENT);
  1000. conn_array_size = NODE_INCREMENT;
  1001. if (dlm_our_addr(&dlm_local_addr, 0)) {
  1002. log_print("no local IP address has been set");
  1003. goto fail_free_conn;
  1004. }
  1005. if (!dlm_our_addr(&dlm_local_addr, 1)) {
  1006. log_print("This dlm comms module does not support multi-homed clustering");
  1007. goto fail_free_conn;
  1008. }
  1009. con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
  1010. __alignof__(struct connection), 0, NULL, NULL);
  1011. if (!con_cache)
  1012. goto fail_free_conn;
  1013. /* Start listening */
  1014. error = listen_for_all();
  1015. if (error)
  1016. goto fail_unlisten;
  1017. error = daemons_start();
  1018. if (error)
  1019. goto fail_unlisten;
  1020. atomic_set(&accepting, 1);
  1021. return 0;
  1022. fail_unlisten:
  1023. close_connection(connections[0], 0);
  1024. kmem_cache_free(con_cache, connections[0]);
  1025. kmem_cache_destroy(con_cache);
  1026. fail_free_conn:
  1027. kfree(connections);
  1028. out:
  1029. return error;
  1030. }
  1031. int dlm_lowcomms_init(void)
  1032. {
  1033. INIT_LIST_HEAD(&read_sockets);
  1034. INIT_LIST_HEAD(&write_sockets);
  1035. INIT_LIST_HEAD(&state_sockets);
  1036. spin_lock_init(&read_sockets_lock);
  1037. spin_lock_init(&write_sockets_lock);
  1038. spin_lock_init(&state_sockets_lock);
  1039. init_MUTEX(&connections_lock);
  1040. return 0;
  1041. }
  1042. void dlm_lowcomms_exit(void)
  1043. {
  1044. }
  1045. /*
  1046. * Overrides for Emacs so that we follow Linus's tabbing style.
  1047. * Emacs will notice this stuff at the end of the file and automatically
  1048. * adjust the settings for this buffer only. This must remain at the end
  1049. * of the file.
  1050. * ---------------------------------------------------------------------------
  1051. * Local variables:
  1052. * c-file-style: "linux"
  1053. * End:
  1054. */