|
@@ -2,7 +2,7 @@
|
|
|
*******************************************************************************
|
|
|
**
|
|
|
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
|
|
|
-** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
|
|
|
+** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
|
|
|
**
|
|
|
** This copyrighted material is made available to anyone wishing to use,
|
|
|
** modify, copy, or redistribute it subject to the terms and conditions
|
|
@@ -96,10 +96,7 @@ static bool cbuf_empty(struct cbuf *cb)
|
|
|
struct connection {
|
|
|
struct socket *sock; /* NULL if not connected */
|
|
|
uint32_t nodeid; /* So we know who we are in the list */
|
|
|
- struct rw_semaphore sock_sem; /* Stop connect races */
|
|
|
- struct list_head read_list; /* On this list when ready for reading */
|
|
|
- struct list_head write_list; /* On this list when ready for writing */
|
|
|
- struct list_head state_list; /* On this list when ready to connect */
|
|
|
+ struct mutex sock_mutex;
|
|
|
unsigned long flags; /* bit 1,2 = We are on the read/write lists */
|
|
|
#define CF_READ_PENDING 1
|
|
|
#define CF_WRITE_PENDING 2
|
|
@@ -112,9 +109,10 @@ struct connection {
|
|
|
struct page *rx_page;
|
|
|
struct cbuf cb;
|
|
|
int retries;
|
|
|
- atomic_t waiting_requests;
|
|
|
#define MAX_CONNECT_RETRIES 3
|
|
|
struct connection *othercon;
|
|
|
+ struct work_struct rwork; /* Receive workqueue */
|
|
|
+ struct work_struct swork; /* Send workqueue */
|
|
|
};
|
|
|
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
|
|
|
|
|
@@ -131,14 +129,9 @@ struct writequeue_entry {
|
|
|
|
|
|
static struct sockaddr_storage dlm_local_addr;
|
|
|
|
|
|
-/* Manage daemons */
|
|
|
-static struct task_struct *recv_task;
|
|
|
-static struct task_struct *send_task;
|
|
|
-
|
|
|
-static wait_queue_t lowcomms_send_waitq_head;
|
|
|
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
|
|
|
-static wait_queue_t lowcomms_recv_waitq_head;
|
|
|
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
|
|
|
+/* Work queues */
|
|
|
+static struct workqueue_struct *recv_workqueue;
|
|
|
+static struct workqueue_struct *send_workqueue;
|
|
|
|
|
|
/* An array of pointers to connections, indexed by NODEID */
|
|
|
static struct connection **connections;
|
|
@@ -146,17 +139,8 @@ static DECLARE_MUTEX(connections_lock);
|
|
|
static struct kmem_cache *con_cache;
|
|
|
static int conn_array_size;
|
|
|
|
|
|
-/* List of sockets that have reads pending */
|
|
|
-static LIST_HEAD(read_sockets);
|
|
|
-static DEFINE_SPINLOCK(read_sockets_lock);
|
|
|
-
|
|
|
-/* List of sockets which have writes pending */
|
|
|
-static LIST_HEAD(write_sockets);
|
|
|
-static DEFINE_SPINLOCK(write_sockets_lock);
|
|
|
-
|
|
|
-/* List of sockets which have connects pending */
|
|
|
-static LIST_HEAD(state_sockets);
|
|
|
-static DEFINE_SPINLOCK(state_sockets_lock);
|
|
|
+static void process_recv_sockets(struct work_struct *work);
|
|
|
+static void process_send_sockets(struct work_struct *work);
|
|
|
|
|
|
static struct connection *nodeid2con(int nodeid, gfp_t allocation)
|
|
|
{
|
|
@@ -186,9 +170,11 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
|
|
|
goto finish;
|
|
|
|
|
|
con->nodeid = nodeid;
|
|
|
- init_rwsem(&con->sock_sem);
|
|
|
+ mutex_init(&con->sock_mutex);
|
|
|
INIT_LIST_HEAD(&con->writequeue);
|
|
|
spin_lock_init(&con->writequeue_lock);
|
|
|
+ INIT_WORK(&con->swork, process_send_sockets);
|
|
|
+ INIT_WORK(&con->rwork, process_recv_sockets);
|
|
|
|
|
|
connections[nodeid] = con;
|
|
|
}
|
|
@@ -203,41 +189,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused)
|
|
|
{
|
|
|
struct connection *con = sock2con(sk);
|
|
|
|
|
|
- atomic_inc(&con->waiting_requests);
|
|
|
- if (test_and_set_bit(CF_READ_PENDING, &con->flags))
|
|
|
- return;
|
|
|
-
|
|
|
- spin_lock_bh(&read_sockets_lock);
|
|
|
- list_add_tail(&con->read_list, &read_sockets);
|
|
|
- spin_unlock_bh(&read_sockets_lock);
|
|
|
-
|
|
|
- wake_up_interruptible(&lowcomms_recv_waitq);
|
|
|
+ if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
|
|
|
+ queue_work(recv_workqueue, &con->rwork);
|
|
|
}
|
|
|
|
|
|
static void lowcomms_write_space(struct sock *sk)
|
|
|
{
|
|
|
struct connection *con = sock2con(sk);
|
|
|
|
|
|
- if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
|
|
|
- return;
|
|
|
-
|
|
|
- spin_lock_bh(&write_sockets_lock);
|
|
|
- list_add_tail(&con->write_list, &write_sockets);
|
|
|
- spin_unlock_bh(&write_sockets_lock);
|
|
|
-
|
|
|
- wake_up_interruptible(&lowcomms_send_waitq);
|
|
|
+ if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
|
|
|
+ queue_work(send_workqueue, &con->swork);
|
|
|
}
|
|
|
|
|
|
static inline void lowcomms_connect_sock(struct connection *con)
|
|
|
{
|
|
|
- if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
|
|
|
- return;
|
|
|
-
|
|
|
- spin_lock_bh(&state_sockets_lock);
|
|
|
- list_add_tail(&con->state_list, &state_sockets);
|
|
|
- spin_unlock_bh(&state_sockets_lock);
|
|
|
-
|
|
|
- wake_up_interruptible(&lowcomms_send_waitq);
|
|
|
+ if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
|
|
|
+ queue_work(send_workqueue, &con->swork);
|
|
|
}
|
|
|
|
|
|
static void lowcomms_state_change(struct sock *sk)
|
|
@@ -279,7 +246,7 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
|
|
|
/* Close a remote connection and tidy up */
|
|
|
static void close_connection(struct connection *con, bool and_other)
|
|
|
{
|
|
|
- down_write(&con->sock_sem);
|
|
|
+ mutex_lock(&con->sock_mutex);
|
|
|
|
|
|
if (con->sock) {
|
|
|
sock_release(con->sock);
|
|
@@ -294,7 +261,7 @@ static void close_connection(struct connection *con, bool and_other)
|
|
|
con->rx_page = NULL;
|
|
|
}
|
|
|
con->retries = 0;
|
|
|
- up_write(&con->sock_sem);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
}
|
|
|
|
|
|
/* Data received from remote end */
|
|
@@ -308,10 +275,13 @@ static int receive_from_sock(struct connection *con)
|
|
|
int r;
|
|
|
int call_again_soon = 0;
|
|
|
|
|
|
- down_read(&con->sock_sem);
|
|
|
+ mutex_lock(&con->sock_mutex);
|
|
|
+
|
|
|
+ if (con->sock == NULL) {
|
|
|
+ ret = -EAGAIN;
|
|
|
+ goto out_close;
|
|
|
+ }
|
|
|
|
|
|
- if (con->sock == NULL)
|
|
|
- goto out;
|
|
|
if (con->rx_page == NULL) {
|
|
|
/*
|
|
|
* This doesn't need to be atomic, but I think it should
|
|
@@ -359,6 +329,9 @@ static int receive_from_sock(struct connection *con)
|
|
|
|
|
|
if (ret <= 0)
|
|
|
goto out_close;
|
|
|
+ if (ret == -EAGAIN)
|
|
|
+ goto out_resched;
|
|
|
+
|
|
|
if (ret == len)
|
|
|
call_again_soon = 1;
|
|
|
cbuf_add(&con->cb, ret);
|
|
@@ -381,24 +354,26 @@ static int receive_from_sock(struct connection *con)
|
|
|
con->rx_page = NULL;
|
|
|
}
|
|
|
|
|
|
-out:
|
|
|
if (call_again_soon)
|
|
|
goto out_resched;
|
|
|
- up_read(&con->sock_sem);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
return 0;
|
|
|
|
|
|
out_resched:
|
|
|
- lowcomms_data_ready(con->sock->sk, 0);
|
|
|
- up_read(&con->sock_sem);
|
|
|
- cond_resched();
|
|
|
- return 0;
|
|
|
+ if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
|
|
|
+ queue_work(recv_workqueue, &con->rwork);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
+ return -EAGAIN;
|
|
|
|
|
|
out_close:
|
|
|
- up_read(&con->sock_sem);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
|
|
|
close_connection(con, false);
|
|
|
/* Reconnect when there is something to send */
|
|
|
}
|
|
|
+ /* Don't return success if we really got EOF */
|
|
|
+ if (ret == 0)
|
|
|
+ ret = -EAGAIN;
|
|
|
|
|
|
return ret;
|
|
|
}
|
|
@@ -412,6 +387,7 @@ static int accept_from_sock(struct connection *con)
|
|
|
int len;
|
|
|
int nodeid;
|
|
|
struct connection *newcon;
|
|
|
+ struct connection *addcon;
|
|
|
|
|
|
memset(&peeraddr, 0, sizeof(peeraddr));
|
|
|
result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
|
|
@@ -419,7 +395,7 @@ static int accept_from_sock(struct connection *con)
|
|
|
if (result < 0)
|
|
|
return -ENOMEM;
|
|
|
|
|
|
- down_read(&con->sock_sem);
|
|
|
+ mutex_lock_nested(&con->sock_mutex, 0);
|
|
|
|
|
|
result = -ENOTCONN;
|
|
|
if (con->sock == NULL)
|
|
@@ -445,7 +421,7 @@ static int accept_from_sock(struct connection *con)
|
|
|
if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
|
|
|
printk("dlm: connect from non cluster node\n");
|
|
|
sock_release(newsock);
|
|
|
- up_read(&con->sock_sem);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
@@ -462,7 +438,7 @@ static int accept_from_sock(struct connection *con)
|
|
|
result = -ENOMEM;
|
|
|
goto accept_err;
|
|
|
}
|
|
|
- down_write(&newcon->sock_sem);
|
|
|
+ mutex_lock_nested(&newcon->sock_mutex, 1);
|
|
|
if (newcon->sock) {
|
|
|
struct connection *othercon = newcon->othercon;
|
|
|
|
|
@@ -470,41 +446,45 @@ static int accept_from_sock(struct connection *con)
|
|
|
othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
|
|
|
if (!othercon) {
|
|
|
printk("dlm: failed to allocate incoming socket\n");
|
|
|
- up_write(&newcon->sock_sem);
|
|
|
+ mutex_unlock(&newcon->sock_mutex);
|
|
|
result = -ENOMEM;
|
|
|
goto accept_err;
|
|
|
}
|
|
|
othercon->nodeid = nodeid;
|
|
|
othercon->rx_action = receive_from_sock;
|
|
|
- init_rwsem(&othercon->sock_sem);
|
|
|
+ mutex_init(&othercon->sock_mutex);
|
|
|
+ INIT_WORK(&othercon->swork, process_send_sockets);
|
|
|
+ INIT_WORK(&othercon->rwork, process_recv_sockets);
|
|
|
set_bit(CF_IS_OTHERCON, &othercon->flags);
|
|
|
newcon->othercon = othercon;
|
|
|
}
|
|
|
othercon->sock = newsock;
|
|
|
newsock->sk->sk_user_data = othercon;
|
|
|
add_sock(newsock, othercon);
|
|
|
+ addcon = othercon;
|
|
|
}
|
|
|
else {
|
|
|
newsock->sk->sk_user_data = newcon;
|
|
|
newcon->rx_action = receive_from_sock;
|
|
|
add_sock(newsock, newcon);
|
|
|
-
|
|
|
+ addcon = newcon;
|
|
|
}
|
|
|
|
|
|
- up_write(&newcon->sock_sem);
|
|
|
+ mutex_unlock(&newcon->sock_mutex);
|
|
|
|
|
|
/*
|
|
|
* Add it to the active queue in case we got data
|
|
|
* beween processing the accept adding the socket
|
|
|
* to the read_sockets list
|
|
|
*/
|
|
|
- lowcomms_data_ready(newsock->sk, 0);
|
|
|
- up_read(&con->sock_sem);
|
|
|
+ if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
|
|
|
+ queue_work(recv_workqueue, &addcon->rwork);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
accept_err:
|
|
|
- up_read(&con->sock_sem);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
sock_release(newsock);
|
|
|
|
|
|
if (result != -EAGAIN)
|
|
@@ -525,7 +505,7 @@ static void connect_to_sock(struct connection *con)
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- down_write(&con->sock_sem);
|
|
|
+ mutex_lock(&con->sock_mutex);
|
|
|
if (con->retries++ > MAX_CONNECT_RETRIES)
|
|
|
goto out;
|
|
|
|
|
@@ -548,7 +528,7 @@ static void connect_to_sock(struct connection *con)
|
|
|
sock->sk->sk_user_data = con;
|
|
|
con->rx_action = receive_from_sock;
|
|
|
|
|
|
- make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len);
|
|
|
+ make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
|
|
|
|
|
|
add_sock(sock, con);
|
|
|
|
|
@@ -577,7 +557,7 @@ out_err:
|
|
|
result = 0;
|
|
|
}
|
|
|
out:
|
|
|
- up_write(&con->sock_sem);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -616,10 +596,10 @@ static struct socket *create_listen_sock(struct connection *con,
|
|
|
con->sock = sock;
|
|
|
|
|
|
/* Bind to our port */
|
|
|
- make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
|
|
|
+ make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
|
|
|
result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
|
|
|
if (result < 0) {
|
|
|
- printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
|
|
|
+ printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port);
|
|
|
sock_release(sock);
|
|
|
sock = NULL;
|
|
|
con->sock = NULL;
|
|
@@ -638,7 +618,7 @@ static struct socket *create_listen_sock(struct connection *con,
|
|
|
|
|
|
result = sock->ops->listen(sock, 5);
|
|
|
if (result < 0) {
|
|
|
- printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
|
|
|
+ printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port);
|
|
|
sock_release(sock);
|
|
|
sock = NULL;
|
|
|
goto create_out;
|
|
@@ -709,6 +689,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
|
|
|
if (!con)
|
|
|
return NULL;
|
|
|
|
|
|
+ spin_lock(&con->writequeue_lock);
|
|
|
e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
|
|
|
if ((&e->list == &con->writequeue) ||
|
|
|
(PAGE_CACHE_SIZE - e->end < len)) {
|
|
@@ -747,6 +728,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
|
|
|
struct connection *con = e->con;
|
|
|
int users;
|
|
|
|
|
|
+ spin_lock(&con->writequeue_lock);
|
|
|
users = --e->users;
|
|
|
if (users)
|
|
|
goto out;
|
|
@@ -754,12 +736,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
|
|
|
kunmap(e->page);
|
|
|
spin_unlock(&con->writequeue_lock);
|
|
|
|
|
|
- if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
|
|
|
- spin_lock_bh(&write_sockets_lock);
|
|
|
- list_add_tail(&con->write_list, &write_sockets);
|
|
|
- spin_unlock_bh(&write_sockets_lock);
|
|
|
-
|
|
|
- wake_up_interruptible(&lowcomms_send_waitq);
|
|
|
+ if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
|
|
|
+ queue_work(send_workqueue, &con->swork);
|
|
|
}
|
|
|
return;
|
|
|
|
|
@@ -783,7 +761,7 @@ static void send_to_sock(struct connection *con)
|
|
|
struct writequeue_entry *e;
|
|
|
int len, offset;
|
|
|
|
|
|
- down_read(&con->sock_sem);
|
|
|
+ mutex_lock(&con->sock_mutex);
|
|
|
if (con->sock == NULL)
|
|
|
goto out_connect;
|
|
|
|
|
@@ -800,6 +778,7 @@ static void send_to_sock(struct connection *con)
|
|
|
offset = e->offset;
|
|
|
BUG_ON(len == 0 && e->users == 0);
|
|
|
spin_unlock(&con->writequeue_lock);
|
|
|
+ kmap(e->page);
|
|
|
|
|
|
ret = 0;
|
|
|
if (len) {
|
|
@@ -828,18 +807,18 @@ static void send_to_sock(struct connection *con)
|
|
|
}
|
|
|
spin_unlock(&con->writequeue_lock);
|
|
|
out:
|
|
|
- up_read(&con->sock_sem);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
return;
|
|
|
|
|
|
send_error:
|
|
|
- up_read(&con->sock_sem);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
close_connection(con, false);
|
|
|
lowcomms_connect_sock(con);
|
|
|
return;
|
|
|
|
|
|
out_connect:
|
|
|
- up_read(&con->sock_sem);
|
|
|
- lowcomms_connect_sock(con);
|
|
|
+ mutex_unlock(&con->sock_mutex);
|
|
|
+ connect_to_sock(con);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -872,7 +851,6 @@ int dlm_lowcomms_close(int nodeid)
|
|
|
if (con) {
|
|
|
clean_one_writequeue(con);
|
|
|
close_connection(con, true);
|
|
|
- atomic_set(&con->waiting_requests, 0);
|
|
|
}
|
|
|
return 0;
|
|
|
|
|
@@ -880,102 +858,29 @@ out:
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
-/* API send message call, may queue the request */
|
|
|
-/* N.B. This is the old interface - use the new one for new calls */
|
|
|
-int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
|
|
|
-{
|
|
|
- struct writequeue_entry *e;
|
|
|
- char *b;
|
|
|
-
|
|
|
- e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
|
|
|
- if (e) {
|
|
|
- memcpy(b, buf, len);
|
|
|
- dlm_lowcomms_commit_buffer(e);
|
|
|
- return 0;
|
|
|
- }
|
|
|
- return -ENOBUFS;
|
|
|
-}
|
|
|
-
|
|
|
/* Look for activity on active sockets */
|
|
|
-static void process_sockets(void)
|
|
|
+static void process_recv_sockets(struct work_struct *work)
|
|
|
{
|
|
|
- struct list_head *list;
|
|
|
- struct list_head *temp;
|
|
|
- int count = 0;
|
|
|
-
|
|
|
- spin_lock_bh(&read_sockets_lock);
|
|
|
- list_for_each_safe(list, temp, &read_sockets) {
|
|
|
+ struct connection *con = container_of(work, struct connection, rwork);
|
|
|
+ int err;
|
|
|
|
|
|
- struct connection *con =
|
|
|
- list_entry(list, struct connection, read_list);
|
|
|
- list_del(&con->read_list);
|
|
|
- clear_bit(CF_READ_PENDING, &con->flags);
|
|
|
-
|
|
|
- spin_unlock_bh(&read_sockets_lock);
|
|
|
-
|
|
|
- /* This can reach zero if we are processing requests
|
|
|
- * as they come in.
|
|
|
- */
|
|
|
- if (atomic_read(&con->waiting_requests) == 0) {
|
|
|
- spin_lock_bh(&read_sockets_lock);
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- do {
|
|
|
- con->rx_action(con);
|
|
|
-
|
|
|
- /* Don't starve out everyone else */
|
|
|
- if (++count >= MAX_RX_MSG_COUNT) {
|
|
|
- cond_resched();
|
|
|
- count = 0;
|
|
|
- }
|
|
|
-
|
|
|
- } while (!atomic_dec_and_test(&con->waiting_requests) &&
|
|
|
- !kthread_should_stop());
|
|
|
-
|
|
|
- spin_lock_bh(&read_sockets_lock);
|
|
|
- }
|
|
|
- spin_unlock_bh(&read_sockets_lock);
|
|
|
+ clear_bit(CF_READ_PENDING, &con->flags);
|
|
|
+ do {
|
|
|
+ err = con->rx_action(con);
|
|
|
+ } while (!err);
|
|
|
}
|
|
|
|
|
|
-/* Try to send any messages that are pending
|
|
|
- */
|
|
|
-static void process_output_queue(void)
|
|
|
-{
|
|
|
- struct list_head *list;
|
|
|
- struct list_head *temp;
|
|
|
-
|
|
|
- spin_lock_bh(&write_sockets_lock);
|
|
|
- list_for_each_safe(list, temp, &write_sockets) {
|
|
|
- struct connection *con =
|
|
|
- list_entry(list, struct connection, write_list);
|
|
|
- clear_bit(CF_WRITE_PENDING, &con->flags);
|
|
|
- list_del(&con->write_list);
|
|
|
-
|
|
|
- spin_unlock_bh(&write_sockets_lock);
|
|
|
- send_to_sock(con);
|
|
|
- spin_lock_bh(&write_sockets_lock);
|
|
|
- }
|
|
|
- spin_unlock_bh(&write_sockets_lock);
|
|
|
-}
|
|
|
|
|
|
-static void process_state_queue(void)
|
|
|
+static void process_send_sockets(struct work_struct *work)
|
|
|
{
|
|
|
- struct list_head *list;
|
|
|
- struct list_head *temp;
|
|
|
-
|
|
|
- spin_lock_bh(&state_sockets_lock);
|
|
|
- list_for_each_safe(list, temp, &state_sockets) {
|
|
|
- struct connection *con =
|
|
|
- list_entry(list, struct connection, state_list);
|
|
|
- list_del(&con->state_list);
|
|
|
- clear_bit(CF_CONNECT_PENDING, &con->flags);
|
|
|
- spin_unlock_bh(&state_sockets_lock);
|
|
|
+ struct connection *con = container_of(work, struct connection, swork);
|
|
|
|
|
|
+ if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
|
|
|
connect_to_sock(con);
|
|
|
- spin_lock_bh(&state_sockets_lock);
|
|
|
}
|
|
|
- spin_unlock_bh(&state_sockets_lock);
|
|
|
+
|
|
|
+ clear_bit(CF_WRITE_PENDING, &con->flags);
|
|
|
+ send_to_sock(con);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -992,109 +897,33 @@ static void clean_writequeues(void)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static int read_list_empty(void)
|
|
|
+static void work_stop(void)
|
|
|
{
|
|
|
- int status;
|
|
|
-
|
|
|
- spin_lock_bh(&read_sockets_lock);
|
|
|
- status = list_empty(&read_sockets);
|
|
|
- spin_unlock_bh(&read_sockets_lock);
|
|
|
-
|
|
|
- return status;
|
|
|
-}
|
|
|
-
|
|
|
-/* DLM Transport comms receive daemon */
|
|
|
-static int dlm_recvd(void *data)
|
|
|
-{
|
|
|
- init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
|
|
|
- add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
|
|
|
-
|
|
|
- while (!kthread_should_stop()) {
|
|
|
- set_current_state(TASK_INTERRUPTIBLE);
|
|
|
- if (read_list_empty())
|
|
|
- cond_resched();
|
|
|
- set_current_state(TASK_RUNNING);
|
|
|
-
|
|
|
- process_sockets();
|
|
|
- }
|
|
|
-
|
|
|
- return 0;
|
|
|
+ destroy_workqueue(recv_workqueue);
|
|
|
+ destroy_workqueue(send_workqueue);
|
|
|
}
|
|
|
|
|
|
-static int write_and_state_lists_empty(void)
|
|
|
+static int work_start(void)
|
|
|
{
|
|
|
- int status;
|
|
|
-
|
|
|
- spin_lock_bh(&write_sockets_lock);
|
|
|
- status = list_empty(&write_sockets);
|
|
|
- spin_unlock_bh(&write_sockets_lock);
|
|
|
-
|
|
|
- spin_lock_bh(&state_sockets_lock);
|
|
|
- if (list_empty(&state_sockets) == 0)
|
|
|
- status = 0;
|
|
|
- spin_unlock_bh(&state_sockets_lock);
|
|
|
-
|
|
|
- return status;
|
|
|
-}
|
|
|
-
|
|
|
-/* DLM Transport send daemon */
|
|
|
-static int dlm_sendd(void *data)
|
|
|
-{
|
|
|
- init_waitqueue_entry(&lowcomms_send_waitq_head, current);
|
|
|
- add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
|
|
|
-
|
|
|
- while (!kthread_should_stop()) {
|
|
|
- set_current_state(TASK_INTERRUPTIBLE);
|
|
|
- if (write_and_state_lists_empty())
|
|
|
- cond_resched();
|
|
|
- set_current_state(TASK_RUNNING);
|
|
|
-
|
|
|
- process_state_queue();
|
|
|
- process_output_queue();
|
|
|
- }
|
|
|
-
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-static void daemons_stop(void)
|
|
|
-{
|
|
|
- kthread_stop(recv_task);
|
|
|
- kthread_stop(send_task);
|
|
|
-}
|
|
|
-
|
|
|
-static int daemons_start(void)
|
|
|
-{
|
|
|
- struct task_struct *p;
|
|
|
int error;
|
|
|
-
|
|
|
- p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
|
|
|
- error = IS_ERR(p);
|
|
|
+ recv_workqueue = create_workqueue("dlm_recv");
|
|
|
+ error = IS_ERR(recv_workqueue);
|
|
|
if (error) {
|
|
|
- log_print("can't start dlm_recvd %d", error);
|
|
|
+ log_print("can't start dlm_recv %d", error);
|
|
|
return error;
|
|
|
}
|
|
|
- recv_task = p;
|
|
|
|
|
|
- p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
|
|
|
- error = IS_ERR(p);
|
|
|
+ send_workqueue = create_singlethread_workqueue("dlm_send");
|
|
|
+ error = IS_ERR(send_workqueue);
|
|
|
if (error) {
|
|
|
- log_print("can't start dlm_sendd %d", error);
|
|
|
- kthread_stop(recv_task);
|
|
|
+ log_print("can't start dlm_send %d", error);
|
|
|
+ destroy_workqueue(recv_workqueue);
|
|
|
return error;
|
|
|
}
|
|
|
- send_task = p;
|
|
|
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-/*
|
|
|
- * Return the largest buffer size we can cope with.
|
|
|
- */
|
|
|
-int lowcomms_max_buffer_size(void)
|
|
|
-{
|
|
|
- return PAGE_CACHE_SIZE;
|
|
|
-}
|
|
|
-
|
|
|
void dlm_lowcomms_stop(void)
|
|
|
{
|
|
|
int i;
|
|
@@ -1107,7 +936,7 @@ void dlm_lowcomms_stop(void)
|
|
|
connections[i]->flags |= 0xFF;
|
|
|
}
|
|
|
|
|
|
- daemons_stop();
|
|
|
+ work_stop();
|
|
|
clean_writequeues();
|
|
|
|
|
|
for (i = 0; i < conn_array_size; i++) {
|
|
@@ -1159,7 +988,7 @@ int dlm_lowcomms_start(void)
|
|
|
if (error)
|
|
|
goto fail_unlisten;
|
|
|
|
|
|
- error = daemons_start();
|
|
|
+ error = work_start();
|
|
|
if (error)
|
|
|
goto fail_unlisten;
|
|
|
|