12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238 |
- /******************************************************************************
- *******************************************************************************
- **
- ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
- ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
- **
- ** This copyrighted material is made available to anyone wishing to use,
- ** modify, copy, or redistribute it subject to the terms and conditions
- ** of the GNU General Public License v.2.
- **
- *******************************************************************************
- ******************************************************************************/
- /*
- * lowcomms.c
- *
- * This is the "low-level" comms layer.
- *
- * It is responsible for sending/receiving messages
- * from other nodes in the cluster.
- *
- * Cluster nodes are referred to by their nodeids. nodeids are
- * simply 32 bit numbers to the locking module - if they need to
- * be expanded for the cluster infrastructure then that is it's
- * responsibility. It is this layer's
- * responsibility to resolve these into IP address or
- * whatever it needs for inter-node communication.
- *
- * The comms level is two kernel threads that deal mainly with
- * the receiving of messages from other nodes and passing them
- * up to the mid-level comms layer (which understands the
- * message format) for execution by the locking core, and
- * a send thread which does all the setting up of connections
- * to remote nodes and the sending of data. Threads are not allowed
- * to send their own data because it may cause them to wait in times
- * of high load. Also, this way, the sending thread can collect together
- * messages bound for one node and send them in one block.
- *
- * I don't see any problem with the recv thread executing the locking
- * code on behalf of remote processes as the locking code is
- * short, efficient and never (well, hardly ever) waits.
- *
- */
- #include <asm/ioctls.h>
- #include <net/sock.h>
- #include <net/tcp.h>
- #include <net/sctp/user.h>
- #include <linux/pagemap.h>
- #include <linux/socket.h>
- #include <linux/idr.h>
- #include "dlm_internal.h"
- #include "lowcomms.h"
- #include "config.h"
- #include "midcomms.h"
- static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
- static int dlm_local_count;
- static int dlm_local_nodeid;
- /* One of these per connected node */
- #define NI_INIT_PENDING 1
- #define NI_WRITE_PENDING 2
- struct nodeinfo {
- spinlock_t lock;
- sctp_assoc_t assoc_id;
- unsigned long flags;
- struct list_head write_list; /* nodes with pending writes */
- struct list_head writequeue; /* outgoing writequeue_entries */
- spinlock_t writequeue_lock;
- int nodeid;
- };
- static DEFINE_IDR(nodeinfo_idr);
- static struct rw_semaphore nodeinfo_lock;
- static int max_nodeid;
- struct cbuf {
- unsigned base;
- unsigned len;
- unsigned mask;
- };
- /* Just the one of these, now. But this struct keeps
- the connection-specific variables together */
- #define CF_READ_PENDING 1
- struct connection {
- struct socket *sock;
- unsigned long flags;
- struct page *rx_page;
- atomic_t waiting_requests;
- struct cbuf cb;
- int eagain_flag;
- };
- /* An entry waiting to be sent */
- struct writequeue_entry {
- struct list_head list;
- struct page *page;
- int offset;
- int len;
- int end;
- int users;
- struct nodeinfo *ni;
- };
- #define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
- #define CBUF_EMPTY(cb) ((cb)->len == 0)
- #define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
- #define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
- #define CBUF_INIT(cb, size) \
- do { \
- (cb)->base = (cb)->len = 0; \
- (cb)->mask = ((size)-1); \
- } while(0)
- #define CBUF_EAT(cb, n) \
- do { \
- (cb)->len -= (n); \
- (cb)->base += (n); \
- (cb)->base &= (cb)->mask; \
- } while(0)
- /* List of nodes which have writes pending */
- static struct list_head write_nodes;
- static spinlock_t write_nodes_lock;
- /* Maximum number of incoming messages to process before
- * doing a schedule()
- */
- #define MAX_RX_MSG_COUNT 25
- /* Manage daemons */
- static struct task_struct *recv_task;
- static struct task_struct *send_task;
- static wait_queue_head_t lowcomms_recv_wait;
- static atomic_t accepting;
- /* The SCTP connection */
- static struct connection sctp_con;
- static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
- {
- struct sockaddr_storage addr;
- int error;
- if (!dlm_local_count)
- return -1;
- error = dlm_nodeid_to_addr(nodeid, &addr);
- if (error)
- return error;
- if (dlm_local_addr[0]->ss_family == AF_INET) {
- struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
- struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
- ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
- } else {
- struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
- struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
- memcpy(&ret6->sin6_addr, &in6->sin6_addr,
- sizeof(in6->sin6_addr));
- }
- return 0;
- }
- static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
- {
- struct nodeinfo *ni;
- int r;
- int n;
- down_read(&nodeinfo_lock);
- ni = idr_find(&nodeinfo_idr, nodeid);
- up_read(&nodeinfo_lock);
- if (!ni && alloc) {
- down_write(&nodeinfo_lock);
- ni = idr_find(&nodeinfo_idr, nodeid);
- if (ni)
- goto out_up;
- r = idr_pre_get(&nodeinfo_idr, alloc);
- if (!r)
- goto out_up;
- ni = kmalloc(sizeof(struct nodeinfo), alloc);
- if (!ni)
- goto out_up;
- r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
- if (r) {
- kfree(ni);
- ni = NULL;
- goto out_up;
- }
- if (n != nodeid) {
- idr_remove(&nodeinfo_idr, n);
- kfree(ni);
- ni = NULL;
- goto out_up;
- }
- memset(ni, 0, sizeof(struct nodeinfo));
- spin_lock_init(&ni->lock);
- INIT_LIST_HEAD(&ni->writequeue);
- spin_lock_init(&ni->writequeue_lock);
- ni->nodeid = nodeid;
- if (nodeid > max_nodeid)
- max_nodeid = nodeid;
- out_up:
- up_write(&nodeinfo_lock);
- }
- return ni;
- }
- /* Don't call this too often... */
- static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
- {
- int i;
- struct nodeinfo *ni;
- for (i=1; i<=max_nodeid; i++) {
- ni = nodeid2nodeinfo(i, 0);
- if (ni && ni->assoc_id == assoc)
- return ni;
- }
- return NULL;
- }
- /* Data or notification available on socket */
- static void lowcomms_data_ready(struct sock *sk, int count_unused)
- {
- atomic_inc(&sctp_con.waiting_requests);
- if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
- return;
- wake_up_interruptible(&lowcomms_recv_wait);
- }
- /* Add the port number to an IP6 or 4 sockaddr and return the address length.
- Also padd out the struct with zeros to make comparisons meaningful */
- static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
- int *addr_len)
- {
- struct sockaddr_in *local4_addr;
- struct sockaddr_in6 *local6_addr;
- if (!dlm_local_count)
- return;
- if (!port) {
- if (dlm_local_addr[0]->ss_family == AF_INET) {
- local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
- port = be16_to_cpu(local4_addr->sin_port);
- } else {
- local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
- port = be16_to_cpu(local6_addr->sin6_port);
- }
- }
- saddr->ss_family = dlm_local_addr[0]->ss_family;
- if (dlm_local_addr[0]->ss_family == AF_INET) {
- struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
- in4_addr->sin_port = cpu_to_be16(port);
- memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
- memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
- sizeof(struct sockaddr_in));
- *addr_len = sizeof(struct sockaddr_in);
- } else {
- struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
- in6_addr->sin6_port = cpu_to_be16(port);
- memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
- sizeof(struct sockaddr_in6));
- *addr_len = sizeof(struct sockaddr_in6);
- }
- }
- /* Close the connection and tidy up */
- static void close_connection(void)
- {
- if (sctp_con.sock) {
- sock_release(sctp_con.sock);
- sctp_con.sock = NULL;
- }
- if (sctp_con.rx_page) {
- __free_page(sctp_con.rx_page);
- sctp_con.rx_page = NULL;
- }
- }
- /* We only send shutdown messages to nodes that are not part of the cluster */
- static void send_shutdown(sctp_assoc_t associd)
- {
- static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
- struct msghdr outmessage;
- struct cmsghdr *cmsg;
- struct sctp_sndrcvinfo *sinfo;
- int ret;
- outmessage.msg_name = NULL;
- outmessage.msg_namelen = 0;
- outmessage.msg_control = outcmsg;
- outmessage.msg_controllen = sizeof(outcmsg);
- outmessage.msg_flags = MSG_EOR;
- cmsg = CMSG_FIRSTHDR(&outmessage);
- cmsg->cmsg_level = IPPROTO_SCTP;
- cmsg->cmsg_type = SCTP_SNDRCV;
- cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
- outmessage.msg_controllen = cmsg->cmsg_len;
- sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
- memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
- sinfo->sinfo_flags |= MSG_EOF;
- sinfo->sinfo_assoc_id = associd;
- ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
- if (ret != 0)
- log_print("send EOF to node failed: %d", ret);
- }
- /* INIT failed but we don't know which node...
- restart INIT on all pending nodes */
- static void init_failed(void)
- {
- int i;
- struct nodeinfo *ni;
- for (i=1; i<=max_nodeid; i++) {
- ni = nodeid2nodeinfo(i, 0);
- if (!ni)
- continue;
- if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
- ni->assoc_id = 0;
- if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
- spin_lock_bh(&write_nodes_lock);
- list_add_tail(&ni->write_list, &write_nodes);
- spin_unlock_bh(&write_nodes_lock);
- }
- }
- }
- wake_up_process(send_task);
- }
- /* Something happened to an association */
- static void process_sctp_notification(struct msghdr *msg, char *buf)
- {
- union sctp_notification *sn = (union sctp_notification *)buf;
- if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
- switch (sn->sn_assoc_change.sac_state) {
- case SCTP_COMM_UP:
- case SCTP_RESTART:
- {
- /* Check that the new node is in the lockspace */
- struct sctp_prim prim;
- mm_segment_t fs;
- int nodeid;
- int prim_len, ret;
- int addr_len;
- struct nodeinfo *ni;
- /* This seems to happen when we received a connection
- * too early... or something... anyway, it happens but
- * we always seem to get a real message too, see
- * receive_from_sock */
- if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
- log_print("COMM_UP for invalid assoc ID %d",
- (int)sn->sn_assoc_change.sac_assoc_id);
- init_failed();
- return;
- }
- memset(&prim, 0, sizeof(struct sctp_prim));
- prim_len = sizeof(struct sctp_prim);
- prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
- fs = get_fs();
- set_fs(get_ds());
- ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
- IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
- (char*)&prim, &prim_len);
- set_fs(fs);
- if (ret < 0) {
- struct nodeinfo *ni;
- log_print("getsockopt/sctp_primary_addr on "
- "new assoc %d failed : %d",
- (int)sn->sn_assoc_change.sac_assoc_id, ret);
- /* Retry INIT later */
- ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
- if (ni)
- clear_bit(NI_INIT_PENDING, &ni->flags);
- return;
- }
- make_sockaddr(&prim.ssp_addr, 0, &addr_len);
- if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
- log_print("reject connect from unknown addr");
- send_shutdown(prim.ssp_assoc_id);
- return;
- }
- ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
- if (!ni)
- return;
- /* Save the assoc ID */
- spin_lock(&ni->lock);
- ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
- spin_unlock(&ni->lock);
- log_print("got new/restarted association %d nodeid %d",
- (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
- /* Send any pending writes */
- clear_bit(NI_INIT_PENDING, &ni->flags);
- if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
- spin_lock_bh(&write_nodes_lock);
- list_add_tail(&ni->write_list, &write_nodes);
- spin_unlock_bh(&write_nodes_lock);
- }
- wake_up_process(send_task);
- }
- break;
- case SCTP_COMM_LOST:
- case SCTP_SHUTDOWN_COMP:
- {
- struct nodeinfo *ni;
- ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
- if (ni) {
- spin_lock(&ni->lock);
- ni->assoc_id = 0;
- spin_unlock(&ni->lock);
- }
- }
- break;
- /* We don't know which INIT failed, so clear the PENDING flags
- * on them all. if assoc_id is zero then it will then try
- * again */
- case SCTP_CANT_STR_ASSOC:
- {
- log_print("Can't start SCTP association - retrying");
- init_failed();
- }
- break;
- default:
- log_print("unexpected SCTP assoc change id=%d state=%d",
- (int)sn->sn_assoc_change.sac_assoc_id,
- sn->sn_assoc_change.sac_state);
- }
- }
- }
- /* Data received from remote end */
- static int receive_from_sock(void)
- {
- int ret = 0;
- struct msghdr msg;
- struct kvec iov[2];
- unsigned len;
- int r;
- struct sctp_sndrcvinfo *sinfo;
- struct cmsghdr *cmsg;
- struct nodeinfo *ni;
- /* These two are marginally too big for stack allocation, but this
- * function is (currently) only called by dlm_recvd so static should be
- * OK.
- */
- static struct sockaddr_storage msgname;
- static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
- if (sctp_con.sock == NULL)
- goto out;
- if (sctp_con.rx_page == NULL) {
- /*
- * This doesn't need to be atomic, but I think it should
- * improve performance if it is.
- */
- sctp_con.rx_page = alloc_page(GFP_ATOMIC);
- if (sctp_con.rx_page == NULL)
- goto out_resched;
- CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE);
- }
- memset(&incmsg, 0, sizeof(incmsg));
- memset(&msgname, 0, sizeof(msgname));
- memset(incmsg, 0, sizeof(incmsg));
- msg.msg_name = &msgname;
- msg.msg_namelen = sizeof(msgname);
- msg.msg_flags = 0;
- msg.msg_control = incmsg;
- msg.msg_controllen = sizeof(incmsg);
- /* I don't see why this circular buffer stuff is necessary for SCTP
- * which is a packet-based protocol, but the whole thing breaks under
- * load without it! The overhead is minimal (and is in the TCP lowcomms
- * anyway, of course) so I'll leave it in until I can figure out what's
- * really happening.
- */
- /*
- * iov[0] is the bit of the circular buffer between the current end
- * point (cb.base + cb.len) and the end of the buffer.
- */
- iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb);
- iov[0].iov_base = page_address(sctp_con.rx_page) +
- CBUF_DATA(&sctp_con.cb);
- iov[1].iov_len = 0;
- /*
- * iov[1] is the bit of the circular buffer between the start of the
- * buffer and the start of the currently used section (cb.base)
- */
- if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) {
- iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb);
- iov[1].iov_len = sctp_con.cb.base;
- iov[1].iov_base = page_address(sctp_con.rx_page);
- msg.msg_iovlen = 2;
- }
- len = iov[0].iov_len + iov[1].iov_len;
- r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
- MSG_NOSIGNAL | MSG_DONTWAIT);
- if (ret <= 0)
- goto out_close;
- msg.msg_control = incmsg;
- msg.msg_controllen = sizeof(incmsg);
- cmsg = CMSG_FIRSTHDR(&msg);
- sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
- if (msg.msg_flags & MSG_NOTIFICATION) {
- process_sctp_notification(&msg, page_address(sctp_con.rx_page));
- return 0;
- }
- /* Is this a new association ? */
- ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
- if (ni) {
- ni->assoc_id = sinfo->sinfo_assoc_id;
- if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
- if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
- spin_lock_bh(&write_nodes_lock);
- list_add_tail(&ni->write_list, &write_nodes);
- spin_unlock_bh(&write_nodes_lock);
- }
- wake_up_process(send_task);
- }
- }
- /* INIT sends a message with length of 1 - ignore it */
- if (r == 1)
- return 0;
- CBUF_ADD(&sctp_con.cb, ret);
- ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
- page_address(sctp_con.rx_page),
- sctp_con.cb.base, sctp_con.cb.len,
- PAGE_CACHE_SIZE);
- if (ret < 0)
- goto out_close;
- CBUF_EAT(&sctp_con.cb, ret);
- out:
- ret = 0;
- goto out_ret;
- out_resched:
- lowcomms_data_ready(sctp_con.sock->sk, 0);
- ret = 0;
- schedule();
- goto out_ret;
- out_close:
- if (ret != -EAGAIN)
- log_print("error reading from sctp socket: %d", ret);
- out_ret:
- return ret;
- }
- /* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
- static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
- {
- mm_segment_t fs;
- int result = 0;
- fs = get_fs();
- set_fs(get_ds());
- if (num == 1)
- result = sctp_con.sock->ops->bind(sctp_con.sock,
- (struct sockaddr *) addr, addr_len);
- else
- result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
- SCTP_SOCKOPT_BINDX_ADD, (char *)addr, addr_len);
- set_fs(fs);
- if (result < 0)
- log_print("Can't bind to port %d addr number %d",
- dlm_config.tcp_port, num);
- return result;
- }
- static void init_local(void)
- {
- struct sockaddr_storage sas, *addr;
- int i;
- dlm_local_nodeid = dlm_our_nodeid();
- for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
- if (dlm_our_addr(&sas, i))
- break;
- addr = kmalloc(sizeof(*addr), GFP_KERNEL);
- if (!addr)
- break;
- memcpy(addr, &sas, sizeof(*addr));
- dlm_local_addr[dlm_local_count++] = addr;
- }
- }
- /* Initialise SCTP socket and bind to all interfaces */
- static int init_sock(void)
- {
- mm_segment_t fs;
- struct socket *sock = NULL;
- struct sockaddr_storage localaddr;
- struct sctp_event_subscribe subscribe;
- int result = -EINVAL, num = 1, i, addr_len;
- if (!dlm_local_count) {
- init_local();
- if (!dlm_local_count) {
- log_print("no local IP address has been set");
- goto out;
- }
- }
- result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
- IPPROTO_SCTP, &sock);
- if (result < 0) {
- log_print("Can't create comms socket, check SCTP is loaded");
- goto out;
- }
- /* Listen for events */
- memset(&subscribe, 0, sizeof(subscribe));
- subscribe.sctp_data_io_event = 1;
- subscribe.sctp_association_event = 1;
- subscribe.sctp_send_failure_event = 1;
- subscribe.sctp_shutdown_event = 1;
- subscribe.sctp_partial_delivery_event = 1;
- fs = get_fs();
- set_fs(get_ds());
- result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
- (char *)&subscribe, sizeof(subscribe));
- set_fs(fs);
- if (result < 0) {
- log_print("Failed to set SCTP_EVENTS on socket: result=%d",
- result);
- goto create_delsock;
- }
- /* Init con struct */
- sock->sk->sk_user_data = &sctp_con;
- sctp_con.sock = sock;
- sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
- /* Bind to all interfaces. */
- for (i = 0; i < dlm_local_count; i++) {
- memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
- make_sockaddr(&localaddr, dlm_config.tcp_port, &addr_len);
- result = add_bind_addr(&localaddr, addr_len, num);
- if (result)
- goto create_delsock;
- ++num;
- }
- result = sock->ops->listen(sock, 5);
- if (result < 0) {
- log_print("Can't set socket listening");
- goto create_delsock;
- }
- return 0;
- create_delsock:
- sock_release(sock);
- sctp_con.sock = NULL;
- out:
- return result;
- }
- static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
- {
- struct writequeue_entry *entry;
- entry = kmalloc(sizeof(struct writequeue_entry), allocation);
- if (!entry)
- return NULL;
- entry->page = alloc_page(allocation);
- if (!entry->page) {
- kfree(entry);
- return NULL;
- }
- entry->offset = 0;
- entry->len = 0;
- entry->end = 0;
- entry->users = 0;
- return entry;
- }
- void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
- {
- struct writequeue_entry *e;
- int offset = 0;
- int users = 0;
- struct nodeinfo *ni;
- if (!atomic_read(&accepting))
- return NULL;
- ni = nodeid2nodeinfo(nodeid, allocation);
- if (!ni)
- return NULL;
- spin_lock(&ni->writequeue_lock);
- e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
- if (((struct list_head *) e == &ni->writequeue) ||
- (PAGE_CACHE_SIZE - e->end < len)) {
- e = NULL;
- } else {
- offset = e->end;
- e->end += len;
- users = e->users++;
- }
- spin_unlock(&ni->writequeue_lock);
- if (e) {
- got_one:
- if (users == 0)
- kmap(e->page);
- *ppc = page_address(e->page) + offset;
- return e;
- }
- e = new_writequeue_entry(allocation);
- if (e) {
- spin_lock(&ni->writequeue_lock);
- offset = e->end;
- e->end += len;
- e->ni = ni;
- users = e->users++;
- list_add_tail(&e->list, &ni->writequeue);
- spin_unlock(&ni->writequeue_lock);
- goto got_one;
- }
- return NULL;
- }
- void dlm_lowcomms_commit_buffer(void *arg)
- {
- struct writequeue_entry *e = (struct writequeue_entry *) arg;
- int users;
- struct nodeinfo *ni = e->ni;
- if (!atomic_read(&accepting))
- return;
- spin_lock(&ni->writequeue_lock);
- users = --e->users;
- if (users)
- goto out;
- e->len = e->end - e->offset;
- kunmap(e->page);
- spin_unlock(&ni->writequeue_lock);
- if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
- spin_lock_bh(&write_nodes_lock);
- list_add_tail(&ni->write_list, &write_nodes);
- spin_unlock_bh(&write_nodes_lock);
- wake_up_process(send_task);
- }
- return;
- out:
- spin_unlock(&ni->writequeue_lock);
- return;
- }
- static void free_entry(struct writequeue_entry *e)
- {
- __free_page(e->page);
- kfree(e);
- }
- /* Initiate an SCTP association. In theory we could just use sendmsg() on
- the first IP address and it should work, but this allows us to set up the
- association before sending any valuable data that we can't afford to lose.
- It also keeps the send path clean as it can now always use the association ID */
- static void initiate_association(int nodeid)
- {
- struct sockaddr_storage rem_addr;
- static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
- struct msghdr outmessage;
- struct cmsghdr *cmsg;
- struct sctp_sndrcvinfo *sinfo;
- int ret;
- int addrlen;
- char buf[1];
- struct kvec iov[1];
- struct nodeinfo *ni;
- log_print("Initiating association with node %d", nodeid);
- ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
- if (!ni)
- return;
- if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
- log_print("no address for nodeid %d", nodeid);
- return;
- }
- make_sockaddr(&rem_addr, dlm_config.tcp_port, &addrlen);
- outmessage.msg_name = &rem_addr;
- outmessage.msg_namelen = addrlen;
- outmessage.msg_control = outcmsg;
- outmessage.msg_controllen = sizeof(outcmsg);
- outmessage.msg_flags = MSG_EOR;
- iov[0].iov_base = buf;
- iov[0].iov_len = 1;
- /* Real INIT messages seem to cause trouble. Just send a 1 byte message
- we can afford to lose */
- cmsg = CMSG_FIRSTHDR(&outmessage);
- cmsg->cmsg_level = IPPROTO_SCTP;
- cmsg->cmsg_type = SCTP_SNDRCV;
- cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
- sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
- memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
- sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
- outmessage.msg_controllen = cmsg->cmsg_len;
- ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
- if (ret < 0) {
- log_print("send INIT to node failed: %d", ret);
- /* Try again later */
- clear_bit(NI_INIT_PENDING, &ni->flags);
- }
- }
- /* Send a message */
- static int send_to_sock(struct nodeinfo *ni)
- {
- int ret = 0;
- struct writequeue_entry *e;
- int len, offset;
- struct msghdr outmsg;
- static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
- struct cmsghdr *cmsg;
- struct sctp_sndrcvinfo *sinfo;
- struct kvec iov;
- /* See if we need to init an association before we start
- sending precious messages */
- spin_lock(&ni->lock);
- if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
- spin_unlock(&ni->lock);
- initiate_association(ni->nodeid);
- return 0;
- }
- spin_unlock(&ni->lock);
- outmsg.msg_name = NULL; /* We use assoc_id */
- outmsg.msg_namelen = 0;
- outmsg.msg_control = outcmsg;
- outmsg.msg_controllen = sizeof(outcmsg);
- outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
- cmsg = CMSG_FIRSTHDR(&outmsg);
- cmsg->cmsg_level = IPPROTO_SCTP;
- cmsg->cmsg_type = SCTP_SNDRCV;
- cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
- sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
- memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
- sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
- sinfo->sinfo_assoc_id = ni->assoc_id;
- outmsg.msg_controllen = cmsg->cmsg_len;
- spin_lock(&ni->writequeue_lock);
- for (;;) {
- if (list_empty(&ni->writequeue))
- break;
- e = list_entry(ni->writequeue.next, struct writequeue_entry,
- list);
- len = e->len;
- offset = e->offset;
- BUG_ON(len == 0 && e->users == 0);
- spin_unlock(&ni->writequeue_lock);
- kmap(e->page);
- ret = 0;
- if (len) {
- iov.iov_base = page_address(e->page)+offset;
- iov.iov_len = len;
- ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
- len);
- if (ret == -EAGAIN) {
- sctp_con.eagain_flag = 1;
- goto out;
- } else if (ret < 0)
- goto send_error;
- } else {
- /* Don't starve people filling buffers */
- schedule();
- }
- spin_lock(&ni->writequeue_lock);
- e->offset += ret;
- e->len -= ret;
- if (e->len == 0 && e->users == 0) {
- list_del(&e->list);
- free_entry(e);
- continue;
- }
- }
- spin_unlock(&ni->writequeue_lock);
- out:
- return ret;
- send_error:
- log_print("Error sending to node %d %d", ni->nodeid, ret);
- spin_lock(&ni->lock);
- if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
- ni->assoc_id = 0;
- spin_unlock(&ni->lock);
- initiate_association(ni->nodeid);
- } else
- spin_unlock(&ni->lock);
- return ret;
- }
- /* Try to send any messages that are pending */
- static void process_output_queue(void)
- {
- struct list_head *list;
- struct list_head *temp;
- spin_lock_bh(&write_nodes_lock);
- list_for_each_safe(list, temp, &write_nodes) {
- struct nodeinfo *ni =
- list_entry(list, struct nodeinfo, write_list);
- clear_bit(NI_WRITE_PENDING, &ni->flags);
- list_del(&ni->write_list);
- spin_unlock_bh(&write_nodes_lock);
- send_to_sock(ni);
- spin_lock_bh(&write_nodes_lock);
- }
- spin_unlock_bh(&write_nodes_lock);
- }
- /* Called after we've had -EAGAIN and been woken up */
- static void refill_write_queue(void)
- {
- int i;
- for (i=1; i<=max_nodeid; i++) {
- struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
- if (ni) {
- if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
- spin_lock_bh(&write_nodes_lock);
- list_add_tail(&ni->write_list, &write_nodes);
- spin_unlock_bh(&write_nodes_lock);
- }
- }
- }
- }
- static void clean_one_writequeue(struct nodeinfo *ni)
- {
- struct list_head *list;
- struct list_head *temp;
- spin_lock(&ni->writequeue_lock);
- list_for_each_safe(list, temp, &ni->writequeue) {
- struct writequeue_entry *e =
- list_entry(list, struct writequeue_entry, list);
- list_del(&e->list);
- free_entry(e);
- }
- spin_unlock(&ni->writequeue_lock);
- }
- static void clean_writequeues(void)
- {
- int i;
- for (i=1; i<=max_nodeid; i++) {
- struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
- if (ni)
- clean_one_writequeue(ni);
- }
- }
- static void dealloc_nodeinfo(void)
- {
- int i;
- for (i=1; i<=max_nodeid; i++) {
- struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
- if (ni) {
- idr_remove(&nodeinfo_idr, i);
- kfree(ni);
- }
- }
- }
- int dlm_lowcomms_close(int nodeid)
- {
- struct nodeinfo *ni;
- ni = nodeid2nodeinfo(nodeid, 0);
- if (!ni)
- return -1;
- spin_lock(&ni->lock);
- if (ni->assoc_id) {
- ni->assoc_id = 0;
- /* Don't send shutdown here, sctp will just queue it
- till the node comes back up! */
- }
- spin_unlock(&ni->lock);
- clean_one_writequeue(ni);
- clear_bit(NI_INIT_PENDING, &ni->flags);
- return 0;
- }
- static int write_list_empty(void)
- {
- int status;
- spin_lock_bh(&write_nodes_lock);
- status = list_empty(&write_nodes);
- spin_unlock_bh(&write_nodes_lock);
- return status;
- }
- static int dlm_recvd(void *data)
- {
- DECLARE_WAITQUEUE(wait, current);
- while (!kthread_should_stop()) {
- int count = 0;
- set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&lowcomms_recv_wait, &wait);
- if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
- schedule();
- remove_wait_queue(&lowcomms_recv_wait, &wait);
- set_current_state(TASK_RUNNING);
- if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
- int ret;
- do {
- ret = receive_from_sock();
- /* Don't starve out everyone else */
- if (++count >= MAX_RX_MSG_COUNT) {
- schedule();
- count = 0;
- }
- } while (!kthread_should_stop() && ret >=0);
- }
- schedule();
- }
- return 0;
- }
- static int dlm_sendd(void *data)
- {
- DECLARE_WAITQUEUE(wait, current);
- add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
- while (!kthread_should_stop()) {
- set_current_state(TASK_INTERRUPTIBLE);
- if (write_list_empty())
- schedule();
- set_current_state(TASK_RUNNING);
- if (sctp_con.eagain_flag) {
- sctp_con.eagain_flag = 0;
- refill_write_queue();
- }
- process_output_queue();
- }
- remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
- return 0;
- }
- static void daemons_stop(void)
- {
- kthread_stop(recv_task);
- kthread_stop(send_task);
- }
- static int daemons_start(void)
- {
- struct task_struct *p;
- int error;
- p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
- error = IS_ERR(p);
- if (error) {
- log_print("can't start dlm_recvd %d", error);
- return error;
- }
- recv_task = p;
- p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
- error = IS_ERR(p);
- if (error) {
- log_print("can't start dlm_sendd %d", error);
- kthread_stop(recv_task);
- return error;
- }
- send_task = p;
- return 0;
- }
- /*
- * This is quite likely to sleep...
- */
- int dlm_lowcomms_start(void)
- {
- int error;
- error = init_sock();
- if (error)
- goto fail_sock;
- error = daemons_start();
- if (error)
- goto fail_sock;
- atomic_set(&accepting, 1);
- return 0;
- fail_sock:
- close_connection();
- return error;
- }
- /* Set all the activity flags to prevent any socket activity. */
- void dlm_lowcomms_stop(void)
- {
- atomic_set(&accepting, 0);
- sctp_con.flags = 0x7;
- daemons_stop();
- clean_writequeues();
- close_connection();
- dealloc_nodeinfo();
- max_nodeid = 0;
- }
- int dlm_lowcomms_init(void)
- {
- init_waitqueue_head(&lowcomms_recv_wait);
- spin_lock_init(&write_nodes_lock);
- INIT_LIST_HEAD(&write_nodes);
- init_rwsem(&nodeinfo_lock);
- return 0;
- }
- void dlm_lowcomms_exit(void)
- {
- int i;
- for (i = 0; i < dlm_local_count; i++)
- kfree(dlm_local_addr[i]);
- dlm_local_count = 0;
- dlm_local_nodeid = 0;
- }
|