lowcomms.c 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818
  1. /******************************************************************************
  2. *******************************************************************************
  3. **
  4. ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
  5. ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
  6. **
  7. ** This copyrighted material is made available to anyone wishing to use,
  8. ** modify, copy, or redistribute it subject to the terms and conditions
  9. ** of the GNU General Public License v.2.
  10. **
  11. *******************************************************************************
  12. ******************************************************************************/
  13. /*
  14. * lowcomms.c
  15. *
  16. * This is the "low-level" comms layer.
  17. *
  18. * It is responsible for sending/receiving messages
  19. * from other nodes in the cluster.
  20. *
  21. * Cluster nodes are referred to by their nodeids. nodeids are
  22. * simply 32 bit numbers to the locking module - if they need to
  23. * be expanded for the cluster infrastructure then that is its
  24. * responsibility. It is this layer's
  25. * responsibility to resolve these into IP address or
  26. * whatever it needs for inter-node communication.
  27. *
  28. * The comms level is two kernel threads that deal mainly with
  29. * the receiving of messages from other nodes and passing them
  30. * up to the mid-level comms layer (which understands the
  31. * message format) for execution by the locking core, and
  32. * a send thread which does all the setting up of connections
  33. * to remote nodes and the sending of data. Threads are not allowed
  34. * to send their own data because it may cause them to wait in times
  35. * of high load. Also, this way, the sending thread can collect together
  36. * messages bound for one node and send them in one block.
  37. *
  38. * lowcomms will choose to use either TCP or SCTP as its transport layer
  39. * depending on the configuration variable 'protocol'. This should be set
  40. * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  41. * cluster-wide mechanism as it must be the same on all nodes of the cluster
  42. * for the DLM to function.
  43. *
  44. */
  45. #include <asm/ioctls.h>
  46. #include <net/sock.h>
  47. #include <net/tcp.h>
  48. #include <linux/pagemap.h>
  49. #include <linux/file.h>
  50. #include <linux/mutex.h>
  51. #include <linux/sctp.h>
  52. #include <linux/slab.h>
  53. #include <linux/sctp.h>
  54. #include <net/sctp/sctp.h>
  55. #include <net/ipv6.h>
  56. #include "dlm_internal.h"
  57. #include "lowcomms.h"
  58. #include "midcomms.h"
  59. #include "config.h"
  60. #define NEEDED_RMEM (4*1024*1024)
  61. #define CONN_HASH_SIZE 32
  62. /* Number of messages to send before rescheduling */
  63. #define MAX_SEND_MSG_COUNT 25
  64. struct cbuf {
  65. unsigned int base;
  66. unsigned int len;
  67. unsigned int mask;
  68. };
  69. static void cbuf_add(struct cbuf *cb, int n)
  70. {
  71. cb->len += n;
  72. }
  73. static int cbuf_data(struct cbuf *cb)
  74. {
  75. return ((cb->base + cb->len) & cb->mask);
  76. }
  77. static void cbuf_init(struct cbuf *cb, int size)
  78. {
  79. cb->base = cb->len = 0;
  80. cb->mask = size-1;
  81. }
  82. static void cbuf_eat(struct cbuf *cb, int n)
  83. {
  84. cb->len -= n;
  85. cb->base += n;
  86. cb->base &= cb->mask;
  87. }
  88. static bool cbuf_empty(struct cbuf *cb)
  89. {
  90. return cb->len == 0;
  91. }
  92. struct connection {
  93. struct socket *sock; /* NULL if not connected */
  94. uint32_t nodeid; /* So we know who we are in the list */
  95. struct mutex sock_mutex;
  96. unsigned long flags;
  97. #define CF_READ_PENDING 1
  98. #define CF_WRITE_PENDING 2
  99. #define CF_CONNECT_PENDING 3
  100. #define CF_INIT_PENDING 4
  101. #define CF_IS_OTHERCON 5
  102. #define CF_CLOSE 6
  103. #define CF_APP_LIMITED 7
  104. struct list_head writequeue; /* List of outgoing writequeue_entries */
  105. spinlock_t writequeue_lock;
  106. int (*rx_action) (struct connection *); /* What to do when active */
  107. void (*connect_action) (struct connection *); /* What to do to connect */
  108. struct page *rx_page;
  109. struct cbuf cb;
  110. int retries;
  111. #define MAX_CONNECT_RETRIES 3
  112. int sctp_assoc;
  113. struct hlist_node list;
  114. struct connection *othercon;
  115. struct work_struct rwork; /* Receive workqueue */
  116. struct work_struct swork; /* Send workqueue */
  117. bool try_new_addr;
  118. };
  119. #define sock2con(x) ((struct connection *)(x)->sk_user_data)
  120. /* An entry waiting to be sent */
  121. struct writequeue_entry {
  122. struct list_head list;
  123. struct page *page;
  124. int offset;
  125. int len;
  126. int end;
  127. int users;
  128. struct connection *con;
  129. };
  130. struct dlm_node_addr {
  131. struct list_head list;
  132. int nodeid;
  133. int addr_count;
  134. int curr_addr_index;
  135. struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
  136. };
  137. static LIST_HEAD(dlm_node_addrs);
  138. static DEFINE_SPINLOCK(dlm_node_addrs_spin);
  139. static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
  140. static int dlm_local_count;
  141. static int dlm_allow_conn;
  142. /* Work queues */
  143. static struct workqueue_struct *recv_workqueue;
  144. static struct workqueue_struct *send_workqueue;
  145. static struct hlist_head connection_hash[CONN_HASH_SIZE];
  146. static DEFINE_MUTEX(connections_lock);
  147. static struct kmem_cache *con_cache;
  148. static void process_recv_sockets(struct work_struct *work);
  149. static void process_send_sockets(struct work_struct *work);
  150. /* This is deliberately very simple because most clusters have simple
  151. sequential nodeids, so we should be able to go straight to a connection
  152. struct in the array */
  153. static inline int nodeid_hash(int nodeid)
  154. {
  155. return nodeid & (CONN_HASH_SIZE-1);
  156. }
  157. static struct connection *__find_con(int nodeid)
  158. {
  159. int r;
  160. struct connection *con;
  161. r = nodeid_hash(nodeid);
  162. hlist_for_each_entry(con, &connection_hash[r], list) {
  163. if (con->nodeid == nodeid)
  164. return con;
  165. }
  166. return NULL;
  167. }
  168. /*
  169. * If 'allocation' is zero then we don't attempt to create a new
  170. * connection structure for this node.
  171. */
  172. static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
  173. {
  174. struct connection *con = NULL;
  175. int r;
  176. con = __find_con(nodeid);
  177. if (con || !alloc)
  178. return con;
  179. con = kmem_cache_zalloc(con_cache, alloc);
  180. if (!con)
  181. return NULL;
  182. r = nodeid_hash(nodeid);
  183. hlist_add_head(&con->list, &connection_hash[r]);
  184. con->nodeid = nodeid;
  185. mutex_init(&con->sock_mutex);
  186. INIT_LIST_HEAD(&con->writequeue);
  187. spin_lock_init(&con->writequeue_lock);
  188. INIT_WORK(&con->swork, process_send_sockets);
  189. INIT_WORK(&con->rwork, process_recv_sockets);
  190. /* Setup action pointers for child sockets */
  191. if (con->nodeid) {
  192. struct connection *zerocon = __find_con(0);
  193. con->connect_action = zerocon->connect_action;
  194. if (!con->rx_action)
  195. con->rx_action = zerocon->rx_action;
  196. }
  197. return con;
  198. }
  199. /* Loop round all connections */
  200. static void foreach_conn(void (*conn_func)(struct connection *c))
  201. {
  202. int i;
  203. struct hlist_node *n;
  204. struct connection *con;
  205. for (i = 0; i < CONN_HASH_SIZE; i++) {
  206. hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
  207. conn_func(con);
  208. }
  209. }
  210. static struct connection *nodeid2con(int nodeid, gfp_t allocation)
  211. {
  212. struct connection *con;
  213. mutex_lock(&connections_lock);
  214. con = __nodeid2con(nodeid, allocation);
  215. mutex_unlock(&connections_lock);
  216. return con;
  217. }
  218. /* This is a bit drastic, but only called when things go wrong */
  219. static struct connection *assoc2con(int assoc_id)
  220. {
  221. int i;
  222. struct connection *con;
  223. mutex_lock(&connections_lock);
  224. for (i = 0 ; i < CONN_HASH_SIZE; i++) {
  225. hlist_for_each_entry(con, &connection_hash[i], list) {
  226. if (con->sctp_assoc == assoc_id) {
  227. mutex_unlock(&connections_lock);
  228. return con;
  229. }
  230. }
  231. }
  232. mutex_unlock(&connections_lock);
  233. return NULL;
  234. }
  235. static struct dlm_node_addr *find_node_addr(int nodeid)
  236. {
  237. struct dlm_node_addr *na;
  238. list_for_each_entry(na, &dlm_node_addrs, list) {
  239. if (na->nodeid == nodeid)
  240. return na;
  241. }
  242. return NULL;
  243. }
  244. static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
  245. {
  246. switch (x->ss_family) {
  247. case AF_INET: {
  248. struct sockaddr_in *sinx = (struct sockaddr_in *)x;
  249. struct sockaddr_in *siny = (struct sockaddr_in *)y;
  250. if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
  251. return 0;
  252. if (sinx->sin_port != siny->sin_port)
  253. return 0;
  254. break;
  255. }
  256. case AF_INET6: {
  257. struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
  258. struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
  259. if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
  260. return 0;
  261. if (sinx->sin6_port != siny->sin6_port)
  262. return 0;
  263. break;
  264. }
  265. default:
  266. return 0;
  267. }
  268. return 1;
  269. }
  270. static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
  271. struct sockaddr *sa_out, bool try_new_addr)
  272. {
  273. struct sockaddr_storage sas;
  274. struct dlm_node_addr *na;
  275. if (!dlm_local_count)
  276. return -1;
  277. spin_lock(&dlm_node_addrs_spin);
  278. na = find_node_addr(nodeid);
  279. if (na && na->addr_count) {
  280. if (try_new_addr) {
  281. na->curr_addr_index++;
  282. if (na->curr_addr_index == na->addr_count)
  283. na->curr_addr_index = 0;
  284. }
  285. memcpy(&sas, na->addr[na->curr_addr_index ],
  286. sizeof(struct sockaddr_storage));
  287. }
  288. spin_unlock(&dlm_node_addrs_spin);
  289. if (!na)
  290. return -EEXIST;
  291. if (!na->addr_count)
  292. return -ENOENT;
  293. if (sas_out)
  294. memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
  295. if (!sa_out)
  296. return 0;
  297. if (dlm_local_addr[0]->ss_family == AF_INET) {
  298. struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
  299. struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
  300. ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
  301. } else {
  302. struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas;
  303. struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
  304. ret6->sin6_addr = in6->sin6_addr;
  305. }
  306. return 0;
  307. }
  308. static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
  309. {
  310. struct dlm_node_addr *na;
  311. int rv = -EEXIST;
  312. int addr_i;
  313. spin_lock(&dlm_node_addrs_spin);
  314. list_for_each_entry(na, &dlm_node_addrs, list) {
  315. if (!na->addr_count)
  316. continue;
  317. for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
  318. if (addr_compare(na->addr[addr_i], addr)) {
  319. *nodeid = na->nodeid;
  320. rv = 0;
  321. goto unlock;
  322. }
  323. }
  324. }
  325. unlock:
  326. spin_unlock(&dlm_node_addrs_spin);
  327. return rv;
  328. }
  329. int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
  330. {
  331. struct sockaddr_storage *new_addr;
  332. struct dlm_node_addr *new_node, *na;
  333. new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
  334. if (!new_node)
  335. return -ENOMEM;
  336. new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
  337. if (!new_addr) {
  338. kfree(new_node);
  339. return -ENOMEM;
  340. }
  341. memcpy(new_addr, addr, len);
  342. spin_lock(&dlm_node_addrs_spin);
  343. na = find_node_addr(nodeid);
  344. if (!na) {
  345. new_node->nodeid = nodeid;
  346. new_node->addr[0] = new_addr;
  347. new_node->addr_count = 1;
  348. list_add(&new_node->list, &dlm_node_addrs);
  349. spin_unlock(&dlm_node_addrs_spin);
  350. return 0;
  351. }
  352. if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
  353. spin_unlock(&dlm_node_addrs_spin);
  354. kfree(new_addr);
  355. kfree(new_node);
  356. return -ENOSPC;
  357. }
  358. na->addr[na->addr_count++] = new_addr;
  359. spin_unlock(&dlm_node_addrs_spin);
  360. kfree(new_node);
  361. return 0;
  362. }
  363. /* Data available on socket or listen socket received a connect */
  364. static void lowcomms_data_ready(struct sock *sk, int count_unused)
  365. {
  366. struct connection *con = sock2con(sk);
  367. if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
  368. queue_work(recv_workqueue, &con->rwork);
  369. }
  370. static void lowcomms_write_space(struct sock *sk)
  371. {
  372. struct connection *con = sock2con(sk);
  373. if (!con)
  374. return;
  375. clear_bit(SOCK_NOSPACE, &con->sock->flags);
  376. if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
  377. con->sock->sk->sk_write_pending--;
  378. clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
  379. }
  380. if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
  381. queue_work(send_workqueue, &con->swork);
  382. }
  383. static inline void lowcomms_connect_sock(struct connection *con)
  384. {
  385. if (test_bit(CF_CLOSE, &con->flags))
  386. return;
  387. if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
  388. queue_work(send_workqueue, &con->swork);
  389. }
  390. static void lowcomms_state_change(struct sock *sk)
  391. {
  392. if (sk->sk_state == TCP_ESTABLISHED)
  393. lowcomms_write_space(sk);
  394. }
  395. int dlm_lowcomms_connect_node(int nodeid)
  396. {
  397. struct connection *con;
  398. /* with sctp there's no connecting without sending */
  399. if (dlm_config.ci_protocol != 0)
  400. return 0;
  401. if (nodeid == dlm_our_nodeid())
  402. return 0;
  403. con = nodeid2con(nodeid, GFP_NOFS);
  404. if (!con)
  405. return -ENOMEM;
  406. lowcomms_connect_sock(con);
  407. return 0;
  408. }
  409. /* Make a socket active */
  410. static void add_sock(struct socket *sock, struct connection *con)
  411. {
  412. con->sock = sock;
  413. /* Install a data_ready callback */
  414. con->sock->sk->sk_data_ready = lowcomms_data_ready;
  415. con->sock->sk->sk_write_space = lowcomms_write_space;
  416. con->sock->sk->sk_state_change = lowcomms_state_change;
  417. con->sock->sk->sk_user_data = con;
  418. con->sock->sk->sk_allocation = GFP_NOFS;
  419. }
  420. /* Add the port number to an IPv6 or 4 sockaddr and return the address
  421. length */
  422. static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
  423. int *addr_len)
  424. {
  425. saddr->ss_family = dlm_local_addr[0]->ss_family;
  426. if (saddr->ss_family == AF_INET) {
  427. struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
  428. in4_addr->sin_port = cpu_to_be16(port);
  429. *addr_len = sizeof(struct sockaddr_in);
  430. memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
  431. } else {
  432. struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
  433. in6_addr->sin6_port = cpu_to_be16(port);
  434. *addr_len = sizeof(struct sockaddr_in6);
  435. }
  436. memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
  437. }
  438. /* Close a remote connection and tidy up */
  439. static void close_connection(struct connection *con, bool and_other)
  440. {
  441. mutex_lock(&con->sock_mutex);
  442. if (con->sock) {
  443. sock_release(con->sock);
  444. con->sock = NULL;
  445. }
  446. if (con->othercon && and_other) {
  447. /* Will only re-enter once. */
  448. close_connection(con->othercon, false);
  449. }
  450. if (con->rx_page) {
  451. __free_page(con->rx_page);
  452. con->rx_page = NULL;
  453. }
  454. con->retries = 0;
  455. mutex_unlock(&con->sock_mutex);
  456. }
  457. /* We only send shutdown messages to nodes that are not part of the cluster */
  458. static void sctp_send_shutdown(sctp_assoc_t associd)
  459. {
  460. static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
  461. struct msghdr outmessage;
  462. struct cmsghdr *cmsg;
  463. struct sctp_sndrcvinfo *sinfo;
  464. int ret;
  465. struct connection *con;
  466. con = nodeid2con(0,0);
  467. BUG_ON(con == NULL);
  468. outmessage.msg_name = NULL;
  469. outmessage.msg_namelen = 0;
  470. outmessage.msg_control = outcmsg;
  471. outmessage.msg_controllen = sizeof(outcmsg);
  472. outmessage.msg_flags = MSG_EOR;
  473. cmsg = CMSG_FIRSTHDR(&outmessage);
  474. cmsg->cmsg_level = IPPROTO_SCTP;
  475. cmsg->cmsg_type = SCTP_SNDRCV;
  476. cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
  477. outmessage.msg_controllen = cmsg->cmsg_len;
  478. sinfo = CMSG_DATA(cmsg);
  479. memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
  480. sinfo->sinfo_flags |= MSG_EOF;
  481. sinfo->sinfo_assoc_id = associd;
  482. ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
  483. if (ret != 0)
  484. log_print("send EOF to node failed: %d", ret);
  485. }
  486. static void sctp_init_failed_foreach(struct connection *con)
  487. {
  488. /*
  489. * Don't try to recover base con and handle race where the
  490. * other node's assoc init creates a assoc and we get that
  491. * notification, then we get a notification that our attempt
  492. * failed due. This happens when we are still trying the primary
  493. * address, but the other node has already tried secondary addrs
  494. * and found one that worked.
  495. */
  496. if (!con->nodeid || con->sctp_assoc)
  497. return;
  498. log_print("Retrying SCTP association init for node %d\n", con->nodeid);
  499. con->try_new_addr = true;
  500. con->sctp_assoc = 0;
  501. if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) {
  502. if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
  503. queue_work(send_workqueue, &con->swork);
  504. }
  505. }
  506. /* INIT failed but we don't know which node...
  507. restart INIT on all pending nodes */
  508. static void sctp_init_failed(void)
  509. {
  510. mutex_lock(&connections_lock);
  511. foreach_conn(sctp_init_failed_foreach);
  512. mutex_unlock(&connections_lock);
  513. }
  514. static void retry_failed_sctp_send(struct connection *recv_con,
  515. struct sctp_send_failed *sn_send_failed,
  516. char *buf)
  517. {
  518. int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
  519. struct dlm_mhandle *mh;
  520. struct connection *con;
  521. char *retry_buf;
  522. int nodeid = sn_send_failed->ssf_info.sinfo_ppid;
  523. log_print("Retry sending %d bytes to node id %d", len, nodeid);
  524. con = nodeid2con(nodeid, 0);
  525. if (!con) {
  526. log_print("Could not look up con for nodeid %d\n",
  527. nodeid);
  528. return;
  529. }
  530. mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
  531. if (!mh) {
  532. log_print("Could not allocate buf for retry.");
  533. return;
  534. }
  535. memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
  536. dlm_lowcomms_commit_buffer(mh);
  537. /*
  538. * If we got a assoc changed event before the send failed event then
  539. * we only need to retry the send.
  540. */
  541. if (con->sctp_assoc) {
  542. if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
  543. queue_work(send_workqueue, &con->swork);
  544. } else
  545. sctp_init_failed_foreach(con);
  546. }
  547. /* Something happened to an association */
  548. static void process_sctp_notification(struct connection *con,
  549. struct msghdr *msg, char *buf)
  550. {
  551. union sctp_notification *sn = (union sctp_notification *)buf;
  552. switch (sn->sn_header.sn_type) {
  553. case SCTP_SEND_FAILED:
  554. retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
  555. break;
  556. case SCTP_ASSOC_CHANGE:
  557. switch (sn->sn_assoc_change.sac_state) {
  558. case SCTP_COMM_UP:
  559. case SCTP_RESTART:
  560. {
  561. /* Check that the new node is in the lockspace */
  562. struct sctp_prim prim;
  563. int nodeid;
  564. int prim_len, ret;
  565. int addr_len;
  566. struct connection *new_con;
  567. /*
  568. * We get this before any data for an association.
  569. * We verify that the node is in the cluster and
  570. * then peel off a socket for it.
  571. */
  572. if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
  573. log_print("COMM_UP for invalid assoc ID %d",
  574. (int)sn->sn_assoc_change.sac_assoc_id);
  575. sctp_init_failed();
  576. return;
  577. }
  578. memset(&prim, 0, sizeof(struct sctp_prim));
  579. prim_len = sizeof(struct sctp_prim);
  580. prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
  581. ret = kernel_getsockopt(con->sock,
  582. IPPROTO_SCTP,
  583. SCTP_PRIMARY_ADDR,
  584. (char*)&prim,
  585. &prim_len);
  586. if (ret < 0) {
  587. log_print("getsockopt/sctp_primary_addr on "
  588. "new assoc %d failed : %d",
  589. (int)sn->sn_assoc_change.sac_assoc_id,
  590. ret);
  591. /* Retry INIT later */
  592. new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
  593. if (new_con)
  594. clear_bit(CF_CONNECT_PENDING, &con->flags);
  595. return;
  596. }
  597. make_sockaddr(&prim.ssp_addr, 0, &addr_len);
  598. if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
  599. unsigned char *b=(unsigned char *)&prim.ssp_addr;
  600. log_print("reject connect from unknown addr");
  601. print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
  602. b, sizeof(struct sockaddr_storage));
  603. sctp_send_shutdown(prim.ssp_assoc_id);
  604. return;
  605. }
  606. new_con = nodeid2con(nodeid, GFP_NOFS);
  607. if (!new_con)
  608. return;
  609. /* Peel off a new sock */
  610. sctp_lock_sock(con->sock->sk);
  611. ret = sctp_do_peeloff(con->sock->sk,
  612. sn->sn_assoc_change.sac_assoc_id,
  613. &new_con->sock);
  614. sctp_release_sock(con->sock->sk);
  615. if (ret < 0) {
  616. log_print("Can't peel off a socket for "
  617. "connection %d to node %d: err=%d",
  618. (int)sn->sn_assoc_change.sac_assoc_id,
  619. nodeid, ret);
  620. return;
  621. }
  622. add_sock(new_con->sock, new_con);
  623. log_print("connecting to %d sctp association %d",
  624. nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
  625. new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id;
  626. new_con->try_new_addr = false;
  627. /* Send any pending writes */
  628. clear_bit(CF_CONNECT_PENDING, &new_con->flags);
  629. clear_bit(CF_INIT_PENDING, &new_con->flags);
  630. if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
  631. queue_work(send_workqueue, &new_con->swork);
  632. }
  633. if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
  634. queue_work(recv_workqueue, &new_con->rwork);
  635. }
  636. break;
  637. case SCTP_COMM_LOST:
  638. case SCTP_SHUTDOWN_COMP:
  639. {
  640. con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
  641. if (con) {
  642. con->sctp_assoc = 0;
  643. }
  644. }
  645. break;
  646. case SCTP_CANT_STR_ASSOC:
  647. {
  648. /* Will retry init when we get the send failed notification */
  649. log_print("Can't start SCTP association - retrying");
  650. }
  651. break;
  652. default:
  653. log_print("unexpected SCTP assoc change id=%d state=%d",
  654. (int)sn->sn_assoc_change.sac_assoc_id,
  655. sn->sn_assoc_change.sac_state);
  656. }
  657. default:
  658. ; /* fall through */
  659. }
  660. }
  661. /* Data received from remote end */
  662. static int receive_from_sock(struct connection *con)
  663. {
  664. int ret = 0;
  665. struct msghdr msg = {};
  666. struct kvec iov[2];
  667. unsigned len;
  668. int r;
  669. int call_again_soon = 0;
  670. int nvec;
  671. char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
  672. mutex_lock(&con->sock_mutex);
  673. if (con->sock == NULL) {
  674. ret = -EAGAIN;
  675. goto out_close;
  676. }
  677. if (con->rx_page == NULL) {
  678. /*
  679. * This doesn't need to be atomic, but I think it should
  680. * improve performance if it is.
  681. */
  682. con->rx_page = alloc_page(GFP_ATOMIC);
  683. if (con->rx_page == NULL)
  684. goto out_resched;
  685. cbuf_init(&con->cb, PAGE_CACHE_SIZE);
  686. }
  687. /* Only SCTP needs these really */
  688. memset(&incmsg, 0, sizeof(incmsg));
  689. msg.msg_control = incmsg;
  690. msg.msg_controllen = sizeof(incmsg);
  691. /*
  692. * iov[0] is the bit of the circular buffer between the current end
  693. * point (cb.base + cb.len) and the end of the buffer.
  694. */
  695. iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
  696. iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
  697. iov[1].iov_len = 0;
  698. nvec = 1;
  699. /*
  700. * iov[1] is the bit of the circular buffer between the start of the
  701. * buffer and the start of the currently used section (cb.base)
  702. */
  703. if (cbuf_data(&con->cb) >= con->cb.base) {
  704. iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
  705. iov[1].iov_len = con->cb.base;
  706. iov[1].iov_base = page_address(con->rx_page);
  707. nvec = 2;
  708. }
  709. len = iov[0].iov_len + iov[1].iov_len;
  710. r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
  711. MSG_DONTWAIT | MSG_NOSIGNAL);
  712. if (ret <= 0)
  713. goto out_close;
  714. /* Process SCTP notifications */
  715. if (msg.msg_flags & MSG_NOTIFICATION) {
  716. msg.msg_control = incmsg;
  717. msg.msg_controllen = sizeof(incmsg);
  718. process_sctp_notification(con, &msg,
  719. page_address(con->rx_page) + con->cb.base);
  720. mutex_unlock(&con->sock_mutex);
  721. return 0;
  722. }
  723. BUG_ON(con->nodeid == 0);
  724. if (ret == len)
  725. call_again_soon = 1;
  726. cbuf_add(&con->cb, ret);
  727. ret = dlm_process_incoming_buffer(con->nodeid,
  728. page_address(con->rx_page),
  729. con->cb.base, con->cb.len,
  730. PAGE_CACHE_SIZE);
  731. if (ret == -EBADMSG) {
  732. log_print("lowcomms: addr=%p, base=%u, len=%u, "
  733. "iov_len=%u, iov_base[0]=%p, read=%d",
  734. page_address(con->rx_page), con->cb.base, con->cb.len,
  735. len, iov[0].iov_base, r);
  736. }
  737. if (ret < 0)
  738. goto out_close;
  739. cbuf_eat(&con->cb, ret);
  740. if (cbuf_empty(&con->cb) && !call_again_soon) {
  741. __free_page(con->rx_page);
  742. con->rx_page = NULL;
  743. }
  744. if (call_again_soon)
  745. goto out_resched;
  746. mutex_unlock(&con->sock_mutex);
  747. return 0;
  748. out_resched:
  749. if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
  750. queue_work(recv_workqueue, &con->rwork);
  751. mutex_unlock(&con->sock_mutex);
  752. return -EAGAIN;
  753. out_close:
  754. mutex_unlock(&con->sock_mutex);
  755. if (ret != -EAGAIN) {
  756. close_connection(con, false);
  757. /* Reconnect when there is something to send */
  758. }
  759. /* Don't return success if we really got EOF */
  760. if (ret == 0)
  761. ret = -EAGAIN;
  762. return ret;
  763. }
  764. /* Listening socket is busy, accept a connection */
  765. static int tcp_accept_from_sock(struct connection *con)
  766. {
  767. int result;
  768. struct sockaddr_storage peeraddr;
  769. struct socket *newsock;
  770. int len;
  771. int nodeid;
  772. struct connection *newcon;
  773. struct connection *addcon;
  774. mutex_lock(&connections_lock);
  775. if (!dlm_allow_conn) {
  776. mutex_unlock(&connections_lock);
  777. return -1;
  778. }
  779. mutex_unlock(&connections_lock);
  780. memset(&peeraddr, 0, sizeof(peeraddr));
  781. result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
  782. IPPROTO_TCP, &newsock);
  783. if (result < 0)
  784. return -ENOMEM;
  785. mutex_lock_nested(&con->sock_mutex, 0);
  786. result = -ENOTCONN;
  787. if (con->sock == NULL)
  788. goto accept_err;
  789. newsock->type = con->sock->type;
  790. newsock->ops = con->sock->ops;
  791. result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
  792. if (result < 0)
  793. goto accept_err;
  794. /* Get the connected socket's peer */
  795. memset(&peeraddr, 0, sizeof(peeraddr));
  796. if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
  797. &len, 2)) {
  798. result = -ECONNABORTED;
  799. goto accept_err;
  800. }
  801. /* Get the new node's NODEID */
  802. make_sockaddr(&peeraddr, 0, &len);
  803. if (addr_to_nodeid(&peeraddr, &nodeid)) {
  804. unsigned char *b=(unsigned char *)&peeraddr;
  805. log_print("connect from non cluster node");
  806. print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
  807. b, sizeof(struct sockaddr_storage));
  808. sock_release(newsock);
  809. mutex_unlock(&con->sock_mutex);
  810. return -1;
  811. }
  812. log_print("got connection from %d", nodeid);
  813. /* Check to see if we already have a connection to this node. This
  814. * could happen if the two nodes initiate a connection at roughly
  815. * the same time and the connections cross on the wire.
  816. * In this case we store the incoming one in "othercon"
  817. */
  818. newcon = nodeid2con(nodeid, GFP_NOFS);
  819. if (!newcon) {
  820. result = -ENOMEM;
  821. goto accept_err;
  822. }
  823. mutex_lock_nested(&newcon->sock_mutex, 1);
  824. if (newcon->sock) {
  825. struct connection *othercon = newcon->othercon;
  826. if (!othercon) {
  827. othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
  828. if (!othercon) {
  829. log_print("failed to allocate incoming socket");
  830. mutex_unlock(&newcon->sock_mutex);
  831. result = -ENOMEM;
  832. goto accept_err;
  833. }
  834. othercon->nodeid = nodeid;
  835. othercon->rx_action = receive_from_sock;
  836. mutex_init(&othercon->sock_mutex);
  837. INIT_WORK(&othercon->swork, process_send_sockets);
  838. INIT_WORK(&othercon->rwork, process_recv_sockets);
  839. set_bit(CF_IS_OTHERCON, &othercon->flags);
  840. }
  841. if (!othercon->sock) {
  842. newcon->othercon = othercon;
  843. othercon->sock = newsock;
  844. newsock->sk->sk_user_data = othercon;
  845. add_sock(newsock, othercon);
  846. addcon = othercon;
  847. }
  848. else {
  849. printk("Extra connection from node %d attempted\n", nodeid);
  850. result = -EAGAIN;
  851. mutex_unlock(&newcon->sock_mutex);
  852. goto accept_err;
  853. }
  854. }
  855. else {
  856. newsock->sk->sk_user_data = newcon;
  857. newcon->rx_action = receive_from_sock;
  858. add_sock(newsock, newcon);
  859. addcon = newcon;
  860. }
  861. mutex_unlock(&newcon->sock_mutex);
  862. /*
  863. * Add it to the active queue in case we got data
  864. * between processing the accept adding the socket
  865. * to the read_sockets list
  866. */
  867. if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
  868. queue_work(recv_workqueue, &addcon->rwork);
  869. mutex_unlock(&con->sock_mutex);
  870. return 0;
  871. accept_err:
  872. mutex_unlock(&con->sock_mutex);
  873. sock_release(newsock);
  874. if (result != -EAGAIN)
  875. log_print("error accepting connection from node: %d", result);
  876. return result;
  877. }
  878. static void free_entry(struct writequeue_entry *e)
  879. {
  880. __free_page(e->page);
  881. kfree(e);
  882. }
  883. /*
  884. * writequeue_entry_complete - try to delete and free write queue entry
  885. * @e: write queue entry to try to delete
  886. * @completed: bytes completed
  887. *
  888. * writequeue_lock must be held.
  889. */
  890. static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
  891. {
  892. e->offset += completed;
  893. e->len -= completed;
  894. if (e->len == 0 && e->users == 0) {
  895. list_del(&e->list);
  896. free_entry(e);
  897. }
  898. }
  899. /* Initiate an SCTP association.
  900. This is a special case of send_to_sock() in that we don't yet have a
  901. peeled-off socket for this association, so we use the listening socket
  902. and add the primary IP address of the remote node.
  903. */
  904. static void sctp_init_assoc(struct connection *con)
  905. {
  906. struct sockaddr_storage rem_addr;
  907. char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
  908. struct msghdr outmessage;
  909. struct cmsghdr *cmsg;
  910. struct sctp_sndrcvinfo *sinfo;
  911. struct connection *base_con;
  912. struct writequeue_entry *e;
  913. int len, offset;
  914. int ret;
  915. int addrlen;
  916. struct kvec iov[1];
  917. mutex_lock(&con->sock_mutex);
  918. if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
  919. goto unlock;
  920. if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr,
  921. con->try_new_addr)) {
  922. log_print("no address for nodeid %d", con->nodeid);
  923. goto unlock;
  924. }
  925. base_con = nodeid2con(0, 0);
  926. BUG_ON(base_con == NULL);
  927. make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
  928. outmessage.msg_name = &rem_addr;
  929. outmessage.msg_namelen = addrlen;
  930. outmessage.msg_control = outcmsg;
  931. outmessage.msg_controllen = sizeof(outcmsg);
  932. outmessage.msg_flags = MSG_EOR;
  933. spin_lock(&con->writequeue_lock);
  934. if (list_empty(&con->writequeue)) {
  935. spin_unlock(&con->writequeue_lock);
  936. log_print("writequeue empty for nodeid %d", con->nodeid);
  937. goto unlock;
  938. }
  939. e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
  940. len = e->len;
  941. offset = e->offset;
  942. /* Send the first block off the write queue */
  943. iov[0].iov_base = page_address(e->page)+offset;
  944. iov[0].iov_len = len;
  945. spin_unlock(&con->writequeue_lock);
  946. if (rem_addr.ss_family == AF_INET) {
  947. struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr;
  948. log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr);
  949. } else {
  950. struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr;
  951. log_print("Trying to connect to %pI6", &sin6->sin6_addr);
  952. }
  953. cmsg = CMSG_FIRSTHDR(&outmessage);
  954. cmsg->cmsg_level = IPPROTO_SCTP;
  955. cmsg->cmsg_type = SCTP_SNDRCV;
  956. cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
  957. sinfo = CMSG_DATA(cmsg);
  958. memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
  959. sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
  960. outmessage.msg_controllen = cmsg->cmsg_len;
  961. sinfo->sinfo_flags |= SCTP_ADDR_OVER;
  962. ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
  963. if (ret < 0) {
  964. log_print("Send first packet to node %d failed: %d",
  965. con->nodeid, ret);
  966. /* Try again later */
  967. clear_bit(CF_CONNECT_PENDING, &con->flags);
  968. clear_bit(CF_INIT_PENDING, &con->flags);
  969. }
  970. else {
  971. spin_lock(&con->writequeue_lock);
  972. writequeue_entry_complete(e, ret);
  973. spin_unlock(&con->writequeue_lock);
  974. }
  975. unlock:
  976. mutex_unlock(&con->sock_mutex);
  977. }
  978. /* Connect a new socket to its peer */
  979. static void tcp_connect_to_sock(struct connection *con)
  980. {
  981. struct sockaddr_storage saddr, src_addr;
  982. int addr_len;
  983. struct socket *sock = NULL;
  984. int one = 1;
  985. int result;
  986. if (con->nodeid == 0) {
  987. log_print("attempt to connect sock 0 foiled");
  988. return;
  989. }
  990. mutex_lock(&con->sock_mutex);
  991. if (con->retries++ > MAX_CONNECT_RETRIES)
  992. goto out;
  993. /* Some odd races can cause double-connects, ignore them */
  994. if (con->sock)
  995. goto out;
  996. /* Create a socket to communicate with */
  997. result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
  998. IPPROTO_TCP, &sock);
  999. if (result < 0)
  1000. goto out_err;
  1001. memset(&saddr, 0, sizeof(saddr));
  1002. result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
  1003. if (result < 0) {
  1004. log_print("no address for nodeid %d", con->nodeid);
  1005. goto out_err;
  1006. }
  1007. sock->sk->sk_user_data = con;
  1008. con->rx_action = receive_from_sock;
  1009. con->connect_action = tcp_connect_to_sock;
  1010. add_sock(sock, con);
  1011. /* Bind to our cluster-known address connecting to avoid
  1012. routing problems */
  1013. memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
  1014. make_sockaddr(&src_addr, 0, &addr_len);
  1015. result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
  1016. addr_len);
  1017. if (result < 0) {
  1018. log_print("could not bind for connect: %d", result);
  1019. /* This *may* not indicate a critical error */
  1020. }
  1021. make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
  1022. log_print("connecting to %d", con->nodeid);
  1023. /* Turn off Nagle's algorithm */
  1024. kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
  1025. sizeof(one));
  1026. result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
  1027. O_NONBLOCK);
  1028. if (result == -EINPROGRESS)
  1029. result = 0;
  1030. if (result == 0)
  1031. goto out;
  1032. out_err:
  1033. if (con->sock) {
  1034. sock_release(con->sock);
  1035. con->sock = NULL;
  1036. } else if (sock) {
  1037. sock_release(sock);
  1038. }
  1039. /*
  1040. * Some errors are fatal and this list might need adjusting. For other
  1041. * errors we try again until the max number of retries is reached.
  1042. */
  1043. if (result != -EHOSTUNREACH &&
  1044. result != -ENETUNREACH &&
  1045. result != -ENETDOWN &&
  1046. result != -EINVAL &&
  1047. result != -EPROTONOSUPPORT) {
  1048. log_print("connect %d try %d error %d", con->nodeid,
  1049. con->retries, result);
  1050. mutex_unlock(&con->sock_mutex);
  1051. msleep(1000);
  1052. lowcomms_connect_sock(con);
  1053. return;
  1054. }
  1055. out:
  1056. mutex_unlock(&con->sock_mutex);
  1057. return;
  1058. }
  1059. static struct socket *tcp_create_listen_sock(struct connection *con,
  1060. struct sockaddr_storage *saddr)
  1061. {
  1062. struct socket *sock = NULL;
  1063. int result = 0;
  1064. int one = 1;
  1065. int addr_len;
  1066. if (dlm_local_addr[0]->ss_family == AF_INET)
  1067. addr_len = sizeof(struct sockaddr_in);
  1068. else
  1069. addr_len = sizeof(struct sockaddr_in6);
  1070. /* Create a socket to communicate with */
  1071. result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
  1072. IPPROTO_TCP, &sock);
  1073. if (result < 0) {
  1074. log_print("Can't create listening comms socket");
  1075. goto create_out;
  1076. }
  1077. /* Turn off Nagle's algorithm */
  1078. kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
  1079. sizeof(one));
  1080. result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
  1081. (char *)&one, sizeof(one));
  1082. if (result < 0) {
  1083. log_print("Failed to set SO_REUSEADDR on socket: %d", result);
  1084. }
  1085. con->rx_action = tcp_accept_from_sock;
  1086. con->connect_action = tcp_connect_to_sock;
  1087. /* Bind to our port */
  1088. make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
  1089. result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
  1090. if (result < 0) {
  1091. log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
  1092. sock_release(sock);
  1093. sock = NULL;
  1094. con->sock = NULL;
  1095. goto create_out;
  1096. }
  1097. result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
  1098. (char *)&one, sizeof(one));
  1099. if (result < 0) {
  1100. log_print("Set keepalive failed: %d", result);
  1101. }
  1102. result = sock->ops->listen(sock, 5);
  1103. if (result < 0) {
  1104. log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
  1105. sock_release(sock);
  1106. sock = NULL;
  1107. goto create_out;
  1108. }
  1109. create_out:
  1110. return sock;
  1111. }
  1112. /* Get local addresses */
  1113. static void init_local(void)
  1114. {
  1115. struct sockaddr_storage sas, *addr;
  1116. int i;
  1117. dlm_local_count = 0;
  1118. for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
  1119. if (dlm_our_addr(&sas, i))
  1120. break;
  1121. addr = kmalloc(sizeof(*addr), GFP_NOFS);
  1122. if (!addr)
  1123. break;
  1124. memcpy(addr, &sas, sizeof(*addr));
  1125. dlm_local_addr[dlm_local_count++] = addr;
  1126. }
  1127. }
  1128. /* Bind to an IP address. SCTP allows multiple address so it can do
  1129. multi-homing */
  1130. static int add_sctp_bind_addr(struct connection *sctp_con,
  1131. struct sockaddr_storage *addr,
  1132. int addr_len, int num)
  1133. {
  1134. int result = 0;
  1135. if (num == 1)
  1136. result = kernel_bind(sctp_con->sock,
  1137. (struct sockaddr *) addr,
  1138. addr_len);
  1139. else
  1140. result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
  1141. SCTP_SOCKOPT_BINDX_ADD,
  1142. (char *)addr, addr_len);
  1143. if (result < 0)
  1144. log_print("Can't bind to port %d addr number %d",
  1145. dlm_config.ci_tcp_port, num);
  1146. return result;
  1147. }
  1148. /* Initialise SCTP socket and bind to all interfaces */
  1149. static int sctp_listen_for_all(void)
  1150. {
  1151. struct socket *sock = NULL;
  1152. struct sockaddr_storage localaddr;
  1153. struct sctp_event_subscribe subscribe;
  1154. int result = -EINVAL, num = 1, i, addr_len;
  1155. struct connection *con = nodeid2con(0, GFP_NOFS);
  1156. int bufsize = NEEDED_RMEM;
  1157. int one = 1;
  1158. if (!con)
  1159. return -ENOMEM;
  1160. log_print("Using SCTP for communications");
  1161. result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
  1162. IPPROTO_SCTP, &sock);
  1163. if (result < 0) {
  1164. log_print("Can't create comms socket, check SCTP is loaded");
  1165. goto out;
  1166. }
  1167. /* Listen for events */
  1168. memset(&subscribe, 0, sizeof(subscribe));
  1169. subscribe.sctp_data_io_event = 1;
  1170. subscribe.sctp_association_event = 1;
  1171. subscribe.sctp_send_failure_event = 1;
  1172. subscribe.sctp_shutdown_event = 1;
  1173. subscribe.sctp_partial_delivery_event = 1;
  1174. result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
  1175. (char *)&bufsize, sizeof(bufsize));
  1176. if (result)
  1177. log_print("Error increasing buffer space on socket %d", result);
  1178. result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
  1179. (char *)&subscribe, sizeof(subscribe));
  1180. if (result < 0) {
  1181. log_print("Failed to set SCTP_EVENTS on socket: result=%d",
  1182. result);
  1183. goto create_delsock;
  1184. }
  1185. result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
  1186. sizeof(one));
  1187. if (result < 0)
  1188. log_print("Could not set SCTP NODELAY error %d\n", result);
  1189. /* Init con struct */
  1190. sock->sk->sk_user_data = con;
  1191. con->sock = sock;
  1192. con->sock->sk->sk_data_ready = lowcomms_data_ready;
  1193. con->rx_action = receive_from_sock;
  1194. con->connect_action = sctp_init_assoc;
  1195. /* Bind to all interfaces. */
  1196. for (i = 0; i < dlm_local_count; i++) {
  1197. memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
  1198. make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
  1199. result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
  1200. if (result)
  1201. goto create_delsock;
  1202. ++num;
  1203. }
  1204. result = sock->ops->listen(sock, 5);
  1205. if (result < 0) {
  1206. log_print("Can't set socket listening");
  1207. goto create_delsock;
  1208. }
  1209. return 0;
  1210. create_delsock:
  1211. sock_release(sock);
  1212. con->sock = NULL;
  1213. out:
  1214. return result;
  1215. }
  1216. static int tcp_listen_for_all(void)
  1217. {
  1218. struct socket *sock = NULL;
  1219. struct connection *con = nodeid2con(0, GFP_NOFS);
  1220. int result = -EINVAL;
  1221. if (!con)
  1222. return -ENOMEM;
  1223. /* We don't support multi-homed hosts */
  1224. if (dlm_local_addr[1] != NULL) {
  1225. log_print("TCP protocol can't handle multi-homed hosts, "
  1226. "try SCTP");
  1227. return -EINVAL;
  1228. }
  1229. log_print("Using TCP for communications");
  1230. sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
  1231. if (sock) {
  1232. add_sock(sock, con);
  1233. result = 0;
  1234. }
  1235. else {
  1236. result = -EADDRINUSE;
  1237. }
  1238. return result;
  1239. }
  1240. static struct writequeue_entry *new_writequeue_entry(struct connection *con,
  1241. gfp_t allocation)
  1242. {
  1243. struct writequeue_entry *entry;
  1244. entry = kmalloc(sizeof(struct writequeue_entry), allocation);
  1245. if (!entry)
  1246. return NULL;
  1247. entry->page = alloc_page(allocation);
  1248. if (!entry->page) {
  1249. kfree(entry);
  1250. return NULL;
  1251. }
  1252. entry->offset = 0;
  1253. entry->len = 0;
  1254. entry->end = 0;
  1255. entry->users = 0;
  1256. entry->con = con;
  1257. return entry;
  1258. }
  1259. void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
  1260. {
  1261. struct connection *con;
  1262. struct writequeue_entry *e;
  1263. int offset = 0;
  1264. con = nodeid2con(nodeid, allocation);
  1265. if (!con)
  1266. return NULL;
  1267. spin_lock(&con->writequeue_lock);
  1268. e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
  1269. if ((&e->list == &con->writequeue) ||
  1270. (PAGE_CACHE_SIZE - e->end < len)) {
  1271. e = NULL;
  1272. } else {
  1273. offset = e->end;
  1274. e->end += len;
  1275. e->users++;
  1276. }
  1277. spin_unlock(&con->writequeue_lock);
  1278. if (e) {
  1279. got_one:
  1280. *ppc = page_address(e->page) + offset;
  1281. return e;
  1282. }
  1283. e = new_writequeue_entry(con, allocation);
  1284. if (e) {
  1285. spin_lock(&con->writequeue_lock);
  1286. offset = e->end;
  1287. e->end += len;
  1288. e->users++;
  1289. list_add_tail(&e->list, &con->writequeue);
  1290. spin_unlock(&con->writequeue_lock);
  1291. goto got_one;
  1292. }
  1293. return NULL;
  1294. }
  1295. void dlm_lowcomms_commit_buffer(void *mh)
  1296. {
  1297. struct writequeue_entry *e = (struct writequeue_entry *)mh;
  1298. struct connection *con = e->con;
  1299. int users;
  1300. spin_lock(&con->writequeue_lock);
  1301. users = --e->users;
  1302. if (users)
  1303. goto out;
  1304. e->len = e->end - e->offset;
  1305. spin_unlock(&con->writequeue_lock);
  1306. if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
  1307. queue_work(send_workqueue, &con->swork);
  1308. }
  1309. return;
  1310. out:
  1311. spin_unlock(&con->writequeue_lock);
  1312. return;
  1313. }
  1314. /* Send a message */
  1315. static void send_to_sock(struct connection *con)
  1316. {
  1317. int ret = 0;
  1318. const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
  1319. struct writequeue_entry *e;
  1320. int len, offset;
  1321. int count = 0;
  1322. mutex_lock(&con->sock_mutex);
  1323. if (con->sock == NULL)
  1324. goto out_connect;
  1325. spin_lock(&con->writequeue_lock);
  1326. for (;;) {
  1327. e = list_entry(con->writequeue.next, struct writequeue_entry,
  1328. list);
  1329. if ((struct list_head *) e == &con->writequeue)
  1330. break;
  1331. len = e->len;
  1332. offset = e->offset;
  1333. BUG_ON(len == 0 && e->users == 0);
  1334. spin_unlock(&con->writequeue_lock);
  1335. ret = 0;
  1336. if (len) {
  1337. ret = kernel_sendpage(con->sock, e->page, offset, len,
  1338. msg_flags);
  1339. if (ret == -EAGAIN || ret == 0) {
  1340. if (ret == -EAGAIN &&
  1341. test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
  1342. !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
  1343. /* Notify TCP that we're limited by the
  1344. * application window size.
  1345. */
  1346. set_bit(SOCK_NOSPACE, &con->sock->flags);
  1347. con->sock->sk->sk_write_pending++;
  1348. }
  1349. cond_resched();
  1350. goto out;
  1351. } else if (ret < 0)
  1352. goto send_error;
  1353. }
  1354. /* Don't starve people filling buffers */
  1355. if (++count >= MAX_SEND_MSG_COUNT) {
  1356. cond_resched();
  1357. count = 0;
  1358. }
  1359. spin_lock(&con->writequeue_lock);
  1360. writequeue_entry_complete(e, ret);
  1361. }
  1362. spin_unlock(&con->writequeue_lock);
  1363. out:
  1364. mutex_unlock(&con->sock_mutex);
  1365. return;
  1366. send_error:
  1367. mutex_unlock(&con->sock_mutex);
  1368. close_connection(con, false);
  1369. lowcomms_connect_sock(con);
  1370. return;
  1371. out_connect:
  1372. mutex_unlock(&con->sock_mutex);
  1373. if (!test_bit(CF_INIT_PENDING, &con->flags))
  1374. lowcomms_connect_sock(con);
  1375. }
  1376. static void clean_one_writequeue(struct connection *con)
  1377. {
  1378. struct writequeue_entry *e, *safe;
  1379. spin_lock(&con->writequeue_lock);
  1380. list_for_each_entry_safe(e, safe, &con->writequeue, list) {
  1381. list_del(&e->list);
  1382. free_entry(e);
  1383. }
  1384. spin_unlock(&con->writequeue_lock);
  1385. }
  1386. /* Called from recovery when it knows that a node has
  1387. left the cluster */
  1388. int dlm_lowcomms_close(int nodeid)
  1389. {
  1390. struct connection *con;
  1391. struct dlm_node_addr *na;
  1392. log_print("closing connection to node %d", nodeid);
  1393. con = nodeid2con(nodeid, 0);
  1394. if (con) {
  1395. clear_bit(CF_CONNECT_PENDING, &con->flags);
  1396. clear_bit(CF_WRITE_PENDING, &con->flags);
  1397. set_bit(CF_CLOSE, &con->flags);
  1398. if (cancel_work_sync(&con->swork))
  1399. log_print("canceled swork for node %d", nodeid);
  1400. if (cancel_work_sync(&con->rwork))
  1401. log_print("canceled rwork for node %d", nodeid);
  1402. clean_one_writequeue(con);
  1403. close_connection(con, true);
  1404. }
  1405. spin_lock(&dlm_node_addrs_spin);
  1406. na = find_node_addr(nodeid);
  1407. if (na) {
  1408. list_del(&na->list);
  1409. while (na->addr_count--)
  1410. kfree(na->addr[na->addr_count]);
  1411. kfree(na);
  1412. }
  1413. spin_unlock(&dlm_node_addrs_spin);
  1414. return 0;
  1415. }
  1416. /* Receive workqueue function */
  1417. static void process_recv_sockets(struct work_struct *work)
  1418. {
  1419. struct connection *con = container_of(work, struct connection, rwork);
  1420. int err;
  1421. clear_bit(CF_READ_PENDING, &con->flags);
  1422. do {
  1423. err = con->rx_action(con);
  1424. } while (!err);
  1425. }
  1426. /* Send workqueue function */
  1427. static void process_send_sockets(struct work_struct *work)
  1428. {
  1429. struct connection *con = container_of(work, struct connection, swork);
  1430. if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
  1431. con->connect_action(con);
  1432. set_bit(CF_WRITE_PENDING, &con->flags);
  1433. }
  1434. if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
  1435. send_to_sock(con);
  1436. }
  1437. /* Discard all entries on the write queues */
  1438. static void clean_writequeues(void)
  1439. {
  1440. foreach_conn(clean_one_writequeue);
  1441. }
  1442. static void work_stop(void)
  1443. {
  1444. destroy_workqueue(recv_workqueue);
  1445. destroy_workqueue(send_workqueue);
  1446. }
  1447. static int work_start(void)
  1448. {
  1449. recv_workqueue = alloc_workqueue("dlm_recv",
  1450. WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
  1451. if (!recv_workqueue) {
  1452. log_print("can't start dlm_recv");
  1453. return -ENOMEM;
  1454. }
  1455. send_workqueue = alloc_workqueue("dlm_send",
  1456. WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
  1457. if (!send_workqueue) {
  1458. log_print("can't start dlm_send");
  1459. destroy_workqueue(recv_workqueue);
  1460. return -ENOMEM;
  1461. }
  1462. return 0;
  1463. }
  1464. static void stop_conn(struct connection *con)
  1465. {
  1466. con->flags |= 0x0F;
  1467. if (con->sock && con->sock->sk)
  1468. con->sock->sk->sk_user_data = NULL;
  1469. }
  1470. static void free_conn(struct connection *con)
  1471. {
  1472. close_connection(con, true);
  1473. if (con->othercon)
  1474. kmem_cache_free(con_cache, con->othercon);
  1475. hlist_del(&con->list);
  1476. kmem_cache_free(con_cache, con);
  1477. }
  1478. void dlm_lowcomms_stop(void)
  1479. {
  1480. /* Set all the flags to prevent any
  1481. socket activity.
  1482. */
  1483. mutex_lock(&connections_lock);
  1484. dlm_allow_conn = 0;
  1485. foreach_conn(stop_conn);
  1486. mutex_unlock(&connections_lock);
  1487. work_stop();
  1488. mutex_lock(&connections_lock);
  1489. clean_writequeues();
  1490. foreach_conn(free_conn);
  1491. mutex_unlock(&connections_lock);
  1492. kmem_cache_destroy(con_cache);
  1493. }
  1494. int dlm_lowcomms_start(void)
  1495. {
  1496. int error = -EINVAL;
  1497. struct connection *con;
  1498. int i;
  1499. for (i = 0; i < CONN_HASH_SIZE; i++)
  1500. INIT_HLIST_HEAD(&connection_hash[i]);
  1501. init_local();
  1502. if (!dlm_local_count) {
  1503. error = -ENOTCONN;
  1504. log_print("no local IP address has been set");
  1505. goto fail;
  1506. }
  1507. error = -ENOMEM;
  1508. con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
  1509. __alignof__(struct connection), 0,
  1510. NULL);
  1511. if (!con_cache)
  1512. goto fail;
  1513. error = work_start();
  1514. if (error)
  1515. goto fail_destroy;
  1516. dlm_allow_conn = 1;
  1517. /* Start listening */
  1518. if (dlm_config.ci_protocol == 0)
  1519. error = tcp_listen_for_all();
  1520. else
  1521. error = sctp_listen_for_all();
  1522. if (error)
  1523. goto fail_unlisten;
  1524. return 0;
  1525. fail_unlisten:
  1526. dlm_allow_conn = 0;
  1527. con = nodeid2con(0,0);
  1528. if (con) {
  1529. close_connection(con, false);
  1530. kmem_cache_free(con_cache, con);
  1531. }
  1532. fail_destroy:
  1533. kmem_cache_destroy(con_cache);
  1534. fail:
  1535. return error;
  1536. }
  1537. void dlm_lowcomms_exit(void)
  1538. {
  1539. struct dlm_node_addr *na, *safe;
  1540. spin_lock(&dlm_node_addrs_spin);
  1541. list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
  1542. list_del(&na->list);
  1543. while (na->addr_count--)
  1544. kfree(na->addr[na->addr_count]);
  1545. kfree(na);
  1546. }
  1547. spin_unlock(&dlm_node_addrs_spin);
  1548. }