|
@@ -63,6 +63,9 @@
|
|
|
#define NEEDED_RMEM (4*1024*1024)
|
|
|
#define CONN_HASH_SIZE 32
|
|
|
|
|
|
+/* Number of messages to send before rescheduling */
|
|
|
+#define MAX_SEND_MSG_COUNT 25
|
|
|
+
|
|
|
struct cbuf {
|
|
|
unsigned int base;
|
|
|
unsigned int len;
|
|
@@ -108,6 +111,7 @@ struct connection {
|
|
|
#define CF_INIT_PENDING 4
|
|
|
#define CF_IS_OTHERCON 5
|
|
|
#define CF_CLOSE 6
|
|
|
+#define CF_APP_LIMITED 7
|
|
|
struct list_head writequeue; /* List of outgoing writequeue_entries */
|
|
|
spinlock_t writequeue_lock;
|
|
|
int (*rx_action) (struct connection *); /* What to do when active */
|
|
@@ -295,7 +299,17 @@ static void lowcomms_write_space(struct sock *sk)
|
|
|
{
|
|
|
struct connection *con = sock2con(sk);
|
|
|
|
|
|
- if (con && !test_and_set_bit(CF_WRITE_PENDING, &con->flags))
|
|
|
+ if (!con)
|
|
|
+ return;
|
|
|
+
|
|
|
+ clear_bit(SOCK_NOSPACE, &con->sock->flags);
|
|
|
+
|
|
|
+ if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
|
|
|
+ con->sock->sk->sk_write_pending--;
|
|
|
+ clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
|
|
|
queue_work(send_workqueue, &con->swork);
|
|
|
}
|
|
|
|
|
@@ -915,6 +929,7 @@ static void tcp_connect_to_sock(struct connection *con)
|
|
|
struct sockaddr_storage saddr, src_addr;
|
|
|
int addr_len;
|
|
|
struct socket *sock = NULL;
|
|
|
+ int one = 1;
|
|
|
|
|
|
if (con->nodeid == 0) {
|
|
|
log_print("attempt to connect sock 0 foiled");
|
|
@@ -960,6 +975,11 @@ static void tcp_connect_to_sock(struct connection *con)
|
|
|
make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
|
|
|
|
|
|
log_print("connecting to %d", con->nodeid);
|
|
|
+
|
|
|
+ /* Turn off Nagle's algorithm */
|
|
|
+ kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
|
|
|
+ sizeof(one));
|
|
|
+
|
|
|
result =
|
|
|
sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
|
|
|
O_NONBLOCK);
|
|
@@ -1011,6 +1031,10 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
|
|
|
goto create_out;
|
|
|
}
|
|
|
|
|
|
+ /* Turn off Nagle's algorithm */
|
|
|
+ kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
|
|
|
+ sizeof(one));
|
|
|
+
|
|
|
result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
|
|
|
(char *)&one, sizeof(one));
|
|
|
|
|
@@ -1297,6 +1321,7 @@ static void send_to_sock(struct connection *con)
|
|
|
const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
|
|
|
struct writequeue_entry *e;
|
|
|
int len, offset;
|
|
|
+ int count = 0;
|
|
|
|
|
|
mutex_lock(&con->sock_mutex);
|
|
|
if (con->sock == NULL)
|
|
@@ -1319,14 +1344,27 @@ static void send_to_sock(struct connection *con)
|
|
|
ret = kernel_sendpage(con->sock, e->page, offset, len,
|
|
|
msg_flags);
|
|
|
if (ret == -EAGAIN || ret == 0) {
|
|
|
+ if (ret == -EAGAIN &&
|
|
|
+ test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
|
|
|
+ !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
|
|
|
+ /* Notify TCP that we're limited by the
|
|
|
+ * application window size.
|
|
|
+ */
|
|
|
+ set_bit(SOCK_NOSPACE, &con->sock->flags);
|
|
|
+ con->sock->sk->sk_write_pending++;
|
|
|
+ }
|
|
|
cond_resched();
|
|
|
goto out;
|
|
|
}
|
|
|
if (ret <= 0)
|
|
|
goto send_error;
|
|
|
}
|
|
|
- /* Don't starve people filling buffers */
|
|
|
+
|
|
|
+ /* Don't starve people filling buffers */
|
|
|
+ if (++count >= MAX_SEND_MSG_COUNT) {
|
|
|
cond_resched();
|
|
|
+ count = 0;
|
|
|
+ }
|
|
|
|
|
|
spin_lock(&con->writequeue_lock);
|
|
|
e->offset += ret;
|
|
@@ -1430,20 +1468,19 @@ static void work_stop(void)
|
|
|
|
|
|
static int work_start(void)
|
|
|
{
|
|
|
- int error;
|
|
|
- recv_workqueue = create_workqueue("dlm_recv");
|
|
|
- error = IS_ERR(recv_workqueue);
|
|
|
- if (error) {
|
|
|
- log_print("can't start dlm_recv %d", error);
|
|
|
- return error;
|
|
|
+ recv_workqueue = alloc_workqueue("dlm_recv", WQ_MEM_RECLAIM |
|
|
|
+ WQ_HIGHPRI | WQ_FREEZEABLE, 0);
|
|
|
+ if (!recv_workqueue) {
|
|
|
+ log_print("can't start dlm_recv");
|
|
|
+ return -ENOMEM;
|
|
|
}
|
|
|
|
|
|
- send_workqueue = create_singlethread_workqueue("dlm_send");
|
|
|
- error = IS_ERR(send_workqueue);
|
|
|
- if (error) {
|
|
|
- log_print("can't start dlm_send %d", error);
|
|
|
+ send_workqueue = alloc_workqueue("dlm_send", WQ_MEM_RECLAIM |
|
|
|
+ WQ_HIGHPRI | WQ_FREEZEABLE, 0);
|
|
|
+ if (!send_workqueue) {
|
|
|
+ log_print("can't start dlm_send");
|
|
|
destroy_workqueue(recv_workqueue);
|
|
|
- return error;
|
|
|
+ return -ENOMEM;
|
|
|
}
|
|
|
|
|
|
return 0;
|