|
@@ -62,6 +62,8 @@ struct tipc_sock {
|
|
|
static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
|
|
|
static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
|
|
|
static void wakeupdispatch(struct tipc_port *tport);
|
|
|
+static void tipc_data_ready(struct sock *sk, int len);
|
|
|
+static void tipc_write_space(struct sock *sk);
|
|
|
|
|
|
static const struct proto_ops packet_ops;
|
|
|
static const struct proto_ops stream_ops;
|
|
@@ -221,6 +223,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
|
|
|
sock_init_data(sock, sk);
|
|
|
sk->sk_backlog_rcv = backlog_rcv;
|
|
|
sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
|
|
|
+ sk->sk_data_ready = tipc_data_ready;
|
|
|
+ sk->sk_write_space = tipc_write_space;
|
|
|
tipc_sk(sk)->p = tp_ptr;
|
|
|
tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
|
|
|
|
|
@@ -435,7 +439,7 @@ static unsigned int poll(struct file *file, struct socket *sock,
|
|
|
struct sock *sk = sock->sk;
|
|
|
u32 mask = 0;
|
|
|
|
|
|
- poll_wait(file, sk_sleep(sk), wait);
|
|
|
+ sock_poll_wait(file, sk_sleep(sk), wait);
|
|
|
|
|
|
switch ((int)sock->state) {
|
|
|
case SS_READY:
|
|
@@ -1125,6 +1129,39 @@ exit:
|
|
|
return sz_copied ? sz_copied : res;
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * tipc_write_space - wake up thread if port congestion is released
|
|
|
+ * @sk: socket
|
|
|
+ */
|
|
|
+static void tipc_write_space(struct sock *sk)
|
|
|
+{
|
|
|
+ struct socket_wq *wq;
|
|
|
+
|
|
|
+ rcu_read_lock();
|
|
|
+ wq = rcu_dereference(sk->sk_wq);
|
|
|
+ if (wq_has_sleeper(wq))
|
|
|
+ wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
|
|
|
+ POLLWRNORM | POLLWRBAND);
|
|
|
+ rcu_read_unlock();
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * tipc_data_ready - wake up threads to indicate messages have been received
|
|
|
+ * @sk: socket
|
|
|
+ * @len: the length of messages
|
|
|
+ */
|
|
|
+static void tipc_data_ready(struct sock *sk, int len)
|
|
|
+{
|
|
|
+ struct socket_wq *wq;
|
|
|
+
|
|
|
+ rcu_read_lock();
|
|
|
+ wq = rcu_dereference(sk->sk_wq);
|
|
|
+ if (wq_has_sleeper(wq))
|
|
|
+ wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
|
|
|
+ POLLRDNORM | POLLRDBAND);
|
|
|
+ rcu_read_unlock();
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* rx_queue_full - determine if receive queue can accept another message
|
|
|
* @msg: message to be added to queue
|
|
@@ -1222,8 +1259,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
|
|
|
tipc_disconnect_port(tipc_sk_port(sk));
|
|
|
}
|
|
|
|
|
|
- if (waitqueue_active(sk_sleep(sk)))
|
|
|
- wake_up_interruptible(sk_sleep(sk));
|
|
|
+ sk->sk_data_ready(sk, 0);
|
|
|
return TIPC_OK;
|
|
|
}
|
|
|
|
|
@@ -1290,8 +1326,7 @@ static void wakeupdispatch(struct tipc_port *tport)
|
|
|
{
|
|
|
struct sock *sk = (struct sock *)tport->usr_handle;
|
|
|
|
|
|
- if (waitqueue_active(sk_sleep(sk)))
|
|
|
- wake_up_interruptible(sk_sleep(sk));
|
|
|
+ sk->sk_write_space(sk);
|
|
|
}
|
|
|
|
|
|
/**
|