|
@@ -147,6 +147,28 @@ static void o2net_listen_data_ready(struct sock *sk, int bytes);
|
|
|
static void o2net_sc_send_keep_req(struct work_struct *work);
|
|
|
static void o2net_idle_timer(unsigned long data);
|
|
|
static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
|
|
|
+static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc);
|
|
|
+
|
|
|
+/*
|
|
|
+ * FIXME: These should use to_o2nm_cluster_from_node(), but we end up
|
|
|
+ * losing our parent link to the cluster during shutdown. This can be
|
|
|
+ * solved by adding a pre-removal callback to configfs, or passing
|
|
|
+ * around the cluster with the node. -jeffm
|
|
|
+ */
|
|
|
+static inline int o2net_reconnect_delay(struct o2nm_node *node)
|
|
|
+{
|
|
|
+ return o2nm_single_cluster->cl_reconnect_delay_ms;
|
|
|
+}
|
|
|
+
|
|
|
+static inline int o2net_keepalive_delay(struct o2nm_node *node)
|
|
|
+{
|
|
|
+ return o2nm_single_cluster->cl_keepalive_delay_ms;
|
|
|
+}
|
|
|
+
|
|
|
+static inline int o2net_idle_timeout(struct o2nm_node *node)
|
|
|
+{
|
|
|
+ return o2nm_single_cluster->cl_idle_timeout_ms;
|
|
|
+}
|
|
|
|
|
|
static inline int o2net_sys_err_to_errno(enum o2net_system_error err)
|
|
|
{
|
|
@@ -271,6 +293,8 @@ static void sc_kref_release(struct kref *kref)
|
|
|
{
|
|
|
struct o2net_sock_container *sc = container_of(kref,
|
|
|
struct o2net_sock_container, sc_kref);
|
|
|
+ BUG_ON(timer_pending(&sc->sc_idle_timeout));
|
|
|
+
|
|
|
sclog(sc, "releasing\n");
|
|
|
|
|
|
if (sc->sc_sock) {
|
|
@@ -356,6 +380,13 @@ static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
|
|
|
sc_put(sc);
|
|
|
}
|
|
|
|
|
|
+static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
|
|
|
+
|
|
|
+int o2net_num_connected_peers(void)
|
|
|
+{
|
|
|
+ return atomic_read(&o2net_connected_peers);
|
|
|
+}
|
|
|
+
|
|
|
static void o2net_set_nn_state(struct o2net_node *nn,
|
|
|
struct o2net_sock_container *sc,
|
|
|
unsigned valid, int err)
|
|
@@ -366,6 +397,11 @@ static void o2net_set_nn_state(struct o2net_node *nn,
|
|
|
|
|
|
assert_spin_locked(&nn->nn_lock);
|
|
|
|
|
|
+ if (old_sc && !sc)
|
|
|
+ atomic_dec(&o2net_connected_peers);
|
|
|
+ else if (!old_sc && sc)
|
|
|
+ atomic_inc(&o2net_connected_peers);
|
|
|
+
|
|
|
/* the node num comparison and single connect/accept path should stop
|
|
|
* an non-null sc from being overwritten with another */
|
|
|
BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
|
|
@@ -424,9 +460,9 @@ static void o2net_set_nn_state(struct o2net_node *nn,
|
|
|
/* delay if we're withing a RECONNECT_DELAY of the
|
|
|
* last attempt */
|
|
|
delay = (nn->nn_last_connect_attempt +
|
|
|
- msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
|
|
|
+ msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
|
|
|
- jiffies;
|
|
|
- if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
|
|
|
+ if (delay > msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
|
|
|
delay = 0;
|
|
|
mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
|
|
|
queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
|
|
@@ -1099,13 +1135,51 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * Ensure timeouts are consistent with other nodes, otherwise
|
|
|
+ * we can end up with one node thinking that the other must be down,
|
|
|
+ * but isn't. This can ultimately cause corruption.
|
|
|
+ */
|
|
|
+ if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
|
|
|
+ o2net_idle_timeout(sc->sc_node)) {
|
|
|
+ mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
|
|
|
+ "%u ms, but we use %u ms locally. disconnecting\n",
|
|
|
+ SC_NODEF_ARGS(sc),
|
|
|
+ be32_to_cpu(hand->o2net_idle_timeout_ms),
|
|
|
+ o2net_idle_timeout(sc->sc_node));
|
|
|
+ o2net_ensure_shutdown(nn, sc, -ENOTCONN);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
|
|
|
+ o2net_keepalive_delay(sc->sc_node)) {
|
|
|
+ mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
|
|
|
+ "%u ms, but we use %u ms locally. disconnecting\n",
|
|
|
+ SC_NODEF_ARGS(sc),
|
|
|
+ be32_to_cpu(hand->o2net_keepalive_delay_ms),
|
|
|
+ o2net_keepalive_delay(sc->sc_node));
|
|
|
+ o2net_ensure_shutdown(nn, sc, -ENOTCONN);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
|
|
|
+ O2HB_MAX_WRITE_TIMEOUT_MS) {
|
|
|
+ mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
|
|
|
+ "%u ms, but we use %u ms locally. disconnecting\n",
|
|
|
+ SC_NODEF_ARGS(sc),
|
|
|
+ be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
|
|
|
+ O2HB_MAX_WRITE_TIMEOUT_MS);
|
|
|
+ o2net_ensure_shutdown(nn, sc, -ENOTCONN);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
sc->sc_handshake_ok = 1;
|
|
|
|
|
|
spin_lock(&nn->nn_lock);
|
|
|
/* set valid and queue the idle timers only if it hasn't been
|
|
|
* shut down already */
|
|
|
if (nn->nn_sc == sc) {
|
|
|
- o2net_sc_postpone_idle(sc);
|
|
|
+ o2net_sc_reset_idle_timer(sc);
|
|
|
o2net_set_nn_state(nn, sc, 1, 0);
|
|
|
}
|
|
|
spin_unlock(&nn->nn_lock);
|
|
@@ -1131,6 +1205,23 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
|
|
|
sclog(sc, "receiving\n");
|
|
|
do_gettimeofday(&sc->sc_tv_advance_start);
|
|
|
|
|
|
+ if (unlikely(sc->sc_handshake_ok == 0)) {
|
|
|
+ if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
|
|
|
+ data = page_address(sc->sc_page) + sc->sc_page_off;
|
|
|
+ datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
|
|
|
+ ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
|
|
|
+ if (ret > 0)
|
|
|
+ sc->sc_page_off += ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
|
|
|
+ o2net_check_handshake(sc);
|
|
|
+ if (unlikely(sc->sc_handshake_ok == 0))
|
|
|
+ ret = -EPROTO;
|
|
|
+ }
|
|
|
+ goto out;
|
|
|
+ }
|
|
|
+
|
|
|
/* do we need more header? */
|
|
|
if (sc->sc_page_off < sizeof(struct o2net_msg)) {
|
|
|
data = page_address(sc->sc_page) + sc->sc_page_off;
|
|
@@ -1138,15 +1229,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
|
|
|
ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
|
|
|
if (ret > 0) {
|
|
|
sc->sc_page_off += ret;
|
|
|
-
|
|
|
- /* this working relies on the handshake being
|
|
|
- * smaller than the normal message header */
|
|
|
- if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
|
|
|
- !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
|
|
|
- ret = -EPROTO;
|
|
|
- goto out;
|
|
|
- }
|
|
|
-
|
|
|
/* only swab incoming here.. we can
|
|
|
* only get here once as we cross from
|
|
|
* being under to over */
|
|
@@ -1248,6 +1330,18 @@ static int o2net_set_nodelay(struct socket *sock)
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
+static void o2net_initialize_handshake(void)
|
|
|
+{
|
|
|
+ o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
|
|
|
+ O2HB_MAX_WRITE_TIMEOUT_MS);
|
|
|
+ o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
|
|
|
+ o2net_idle_timeout(NULL));
|
|
|
+ o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
|
|
|
+ o2net_keepalive_delay(NULL));
|
|
|
+ o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
|
|
|
+ o2net_reconnect_delay(NULL));
|
|
|
+}
|
|
|
+
|
|
|
/* ------------------------------------------------------------ */
|
|
|
|
|
|
/* called when a connect completes and after a sock is accepted. the
|
|
@@ -1262,6 +1356,7 @@ static void o2net_sc_connect_completed(struct work_struct *work)
|
|
|
(unsigned long long)O2NET_PROTOCOL_VERSION,
|
|
|
(unsigned long long)be64_to_cpu(o2net_hand->connector_id));
|
|
|
|
|
|
+ o2net_initialize_handshake();
|
|
|
o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
|
|
|
sc_put(sc);
|
|
|
}
|
|
@@ -1287,8 +1382,10 @@ static void o2net_idle_timer(unsigned long data)
|
|
|
|
|
|
do_gettimeofday(&now);
|
|
|
|
|
|
- printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for 10 "
|
|
|
- "seconds, shutting it down.\n", SC_NODEF_ARGS(sc));
|
|
|
+ printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u "
|
|
|
+ "seconds, shutting it down.\n", SC_NODEF_ARGS(sc),
|
|
|
+ o2net_idle_timeout(sc->sc_node) / 1000,
|
|
|
+ o2net_idle_timeout(sc->sc_node) % 1000);
|
|
|
mlog(ML_NOTICE, "here are some times that might help debug the "
|
|
|
"situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv "
|
|
|
"%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n",
|
|
@@ -1306,14 +1403,21 @@ static void o2net_idle_timer(unsigned long data)
|
|
|
o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
|
|
|
}
|
|
|
|
|
|
-static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
|
|
|
+static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc)
|
|
|
{
|
|
|
o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
|
|
|
o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work,
|
|
|
- O2NET_KEEPALIVE_DELAY_SECS * HZ);
|
|
|
+ msecs_to_jiffies(o2net_keepalive_delay(sc->sc_node)));
|
|
|
do_gettimeofday(&sc->sc_tv_timer);
|
|
|
mod_timer(&sc->sc_idle_timeout,
|
|
|
- jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ));
|
|
|
+ jiffies + msecs_to_jiffies(o2net_idle_timeout(sc->sc_node)));
|
|
|
+}
|
|
|
+
|
|
|
+static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
|
|
|
+{
|
|
|
+ /* Only push out an existing timer */
|
|
|
+ if (timer_pending(&sc->sc_idle_timeout))
|
|
|
+ o2net_sc_reset_idle_timer(sc);
|
|
|
}
|
|
|
|
|
|
/* this work func is kicked whenever a path sets the nn state which doesn't
|
|
@@ -1435,9 +1539,12 @@ static void o2net_connect_expired(struct work_struct *work)
|
|
|
|
|
|
spin_lock(&nn->nn_lock);
|
|
|
if (!nn->nn_sc_valid) {
|
|
|
+ struct o2nm_node *node = nn->nn_sc->sc_node;
|
|
|
mlog(ML_ERROR, "no connection established with node %u after "
|
|
|
- "%u seconds, giving up and returning errors.\n",
|
|
|
- o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS);
|
|
|
+ "%u.%u seconds, giving up and returning errors.\n",
|
|
|
+ o2net_num_from_nn(nn),
|
|
|
+ o2net_idle_timeout(node) / 1000,
|
|
|
+ o2net_idle_timeout(node) % 1000);
|
|
|
|
|
|
o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
|
|
|
}
|
|
@@ -1478,6 +1585,8 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
|
|
|
|
|
|
if (node_num != o2nm_this_node())
|
|
|
o2net_disconnect_node(node);
|
|
|
+
|
|
|
+ BUG_ON(atomic_read(&o2net_connected_peers) < 0);
|
|
|
}
|
|
|
|
|
|
static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
|
|
@@ -1489,14 +1598,14 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
|
|
|
|
|
|
/* ensure an immediate connect attempt */
|
|
|
nn->nn_last_connect_attempt = jiffies -
|
|
|
- (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1);
|
|
|
+ (msecs_to_jiffies(o2net_reconnect_delay(node)) + 1);
|
|
|
|
|
|
if (node_num != o2nm_this_node()) {
|
|
|
/* heartbeat doesn't work unless a local node number is
|
|
|
* configured and doing so brings up the o2net_wq, so we can
|
|
|
* use it.. */
|
|
|
queue_delayed_work(o2net_wq, &nn->nn_connect_expired,
|
|
|
- O2NET_IDLE_TIMEOUT_SECS * HZ);
|
|
|
+ msecs_to_jiffies(o2net_idle_timeout(node)));
|
|
|
|
|
|
/* believe it or not, accept and node hearbeating testing
|
|
|
* can succeed for this node before we got here.. so
|
|
@@ -1641,6 +1750,7 @@ static int o2net_accept_one(struct socket *sock)
|
|
|
o2net_register_callbacks(sc->sc_sock->sk, sc);
|
|
|
o2net_sc_queue_work(sc, &sc->sc_rx_work);
|
|
|
|
|
|
+ o2net_initialize_handshake();
|
|
|
o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
|
|
|
|
|
|
out:
|