|
@@ -77,6 +77,17 @@
|
|
|
#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
|
|
|
#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */
|
|
|
|
|
|
+/*
|
|
|
+ * connection states
|
|
|
+ */
|
|
|
+#define CON_STATE_CLOSED 1 /* -> PREOPEN */
|
|
|
+#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */
|
|
|
+#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */
|
|
|
+#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */
|
|
|
+#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */
|
|
|
+#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */
|
|
|
+
|
|
|
+
|
|
|
/* static tag bytes (protocol control messages) */
|
|
|
static char tag_msg = CEPH_MSGR_TAG_MSG;
|
|
|
static char tag_ack = CEPH_MSGR_TAG_ACK;
|
|
@@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con)
|
|
|
mutex_lock(&con->mutex);
|
|
|
dout("con_close %p peer %s\n", con,
|
|
|
ceph_pr_addr(&con->peer_addr.in_addr));
|
|
|
- clear_bit(NEGOTIATING, &con->state);
|
|
|
- clear_bit(CONNECTING, &con->state);
|
|
|
- clear_bit(CONNECTED, &con->state);
|
|
|
- clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
|
|
|
- set_bit(CLOSED, &con->state);
|
|
|
+ con->state = CON_STATE_CLOSED;
|
|
|
|
|
|
clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */
|
|
|
clear_bit(KEEPALIVE_PENDING, &con->flags);
|
|
@@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con,
|
|
|
{
|
|
|
mutex_lock(&con->mutex);
|
|
|
dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
|
|
|
- set_bit(OPENING, &con->state);
|
|
|
- WARN_ON(!test_and_clear_bit(CLOSED, &con->state));
|
|
|
+
|
|
|
+ BUG_ON(con->state != CON_STATE_CLOSED);
|
|
|
+ con->state = CON_STATE_PREOPEN;
|
|
|
|
|
|
con->peer_name.type = (__u8) entity_type;
|
|
|
con->peer_name.num = cpu_to_le64(entity_num);
|
|
@@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
|
|
|
INIT_LIST_HEAD(&con->out_sent);
|
|
|
INIT_DELAYED_WORK(&con->work, con_work);
|
|
|
|
|
|
- set_bit(CLOSED, &con->state);
|
|
|
+ con->state = CON_STATE_CLOSED;
|
|
|
}
|
|
|
EXPORT_SYMBOL(ceph_con_init);
|
|
|
|
|
@@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
|
|
|
if (!con->ops->get_authorizer) {
|
|
|
con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
|
|
|
con->out_connect.authorizer_len = 0;
|
|
|
-
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
/* Can't hold the mutex while getting authorizer */
|
|
|
-
|
|
|
mutex_unlock(&con->mutex);
|
|
|
-
|
|
|
auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
|
|
|
-
|
|
|
mutex_lock(&con->mutex);
|
|
|
|
|
|
if (IS_ERR(auth))
|
|
|
return auth;
|
|
|
- if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags))
|
|
|
+ if (con->state != CON_STATE_NEGOTIATING)
|
|
|
return ERR_PTR(-EAGAIN);
|
|
|
|
|
|
con->auth_reply_buf = auth->authorizer_reply_buf;
|
|
|
con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
|
|
|
-
|
|
|
-
|
|
|
return auth;
|
|
|
}
|
|
|
|
|
@@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con)
|
|
|
static void fail_protocol(struct ceph_connection *con)
|
|
|
{
|
|
|
reset_connection(con);
|
|
|
- set_bit(CLOSED, &con->state); /* in case there's queued work */
|
|
|
+ BUG_ON(con->state != CON_STATE_NEGOTIATING);
|
|
|
+ con->state = CON_STATE_CLOSED;
|
|
|
}
|
|
|
|
|
|
static int process_connect(struct ceph_connection *con)
|
|
@@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con)
|
|
|
if (con->ops->peer_reset)
|
|
|
con->ops->peer_reset(con);
|
|
|
mutex_lock(&con->mutex);
|
|
|
- if (test_bit(CLOSED, &con->state) ||
|
|
|
- test_bit(OPENING, &con->state))
|
|
|
+ if (con->state != CON_STATE_NEGOTIATING)
|
|
|
return -EAGAIN;
|
|
|
break;
|
|
|
|
|
@@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con)
|
|
|
fail_protocol(con);
|
|
|
return -1;
|
|
|
}
|
|
|
- clear_bit(NEGOTIATING, &con->state);
|
|
|
- set_bit(CONNECTED, &con->state);
|
|
|
+
|
|
|
+ BUG_ON(con->state != CON_STATE_NEGOTIATING);
|
|
|
+ con->state = CON_STATE_OPEN;
|
|
|
+
|
|
|
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
|
|
|
con->connect_seq++;
|
|
|
con->peer_features = server_feat;
|
|
@@ -1994,8 +1998,9 @@ more:
|
|
|
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
|
|
|
|
|
|
/* open the socket first? */
|
|
|
- if (con->sock == NULL) {
|
|
|
- set_bit(CONNECTING, &con->state);
|
|
|
+ if (con->state == CON_STATE_PREOPEN) {
|
|
|
+ BUG_ON(con->sock);
|
|
|
+ con->state = CON_STATE_CONNECTING;
|
|
|
|
|
|
con_out_kvec_reset(con);
|
|
|
prepare_write_banner(con);
|
|
@@ -2046,8 +2051,7 @@ more_kvec:
|
|
|
}
|
|
|
|
|
|
do_next:
|
|
|
- if (!test_bit(CONNECTING, &con->state) &&
|
|
|
- !test_bit(NEGOTIATING, &con->state)) {
|
|
|
+ if (con->state == CON_STATE_OPEN) {
|
|
|
/* is anything else pending? */
|
|
|
if (!list_empty(&con->out_queue)) {
|
|
|
prepare_write_message(con);
|
|
@@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con)
|
|
|
{
|
|
|
int ret = -1;
|
|
|
|
|
|
- if (!con->sock)
|
|
|
- return 0;
|
|
|
-
|
|
|
- if (test_bit(STANDBY, &con->state))
|
|
|
+more:
|
|
|
+ dout("try_read start on %p state %lu\n", con, con->state);
|
|
|
+ if (con->state != CON_STATE_CONNECTING &&
|
|
|
+ con->state != CON_STATE_NEGOTIATING &&
|
|
|
+ con->state != CON_STATE_OPEN)
|
|
|
return 0;
|
|
|
|
|
|
- dout("try_read start on %p\n", con);
|
|
|
+ BUG_ON(!con->sock);
|
|
|
|
|
|
-more:
|
|
|
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
|
|
|
con->in_base_pos);
|
|
|
|
|
|
- /*
|
|
|
- * process_connect and process_message drop and re-take
|
|
|
- * con->mutex. make sure we handle a racing close or reopen.
|
|
|
- */
|
|
|
- if (test_bit(CLOSED, &con->state) ||
|
|
|
- test_bit(OPENING, &con->state)) {
|
|
|
- ret = -EAGAIN;
|
|
|
- goto out;
|
|
|
- }
|
|
|
-
|
|
|
- if (test_bit(CONNECTING, &con->state)) {
|
|
|
+ if (con->state == CON_STATE_CONNECTING) {
|
|
|
dout("try_read connecting\n");
|
|
|
ret = read_partial_banner(con);
|
|
|
if (ret <= 0)
|
|
@@ -2112,8 +2106,8 @@ more:
|
|
|
if (ret < 0)
|
|
|
goto out;
|
|
|
|
|
|
- clear_bit(CONNECTING, &con->state);
|
|
|
- set_bit(NEGOTIATING, &con->state);
|
|
|
+ BUG_ON(con->state != CON_STATE_CONNECTING);
|
|
|
+ con->state = CON_STATE_NEGOTIATING;
|
|
|
|
|
|
/* Banner is good, exchange connection info */
|
|
|
ret = prepare_write_connect(con);
|
|
@@ -2125,7 +2119,7 @@ more:
|
|
|
goto out;
|
|
|
}
|
|
|
|
|
|
- if (test_bit(NEGOTIATING, &con->state)) {
|
|
|
+ if (con->state == CON_STATE_NEGOTIATING) {
|
|
|
dout("try_read negotiating\n");
|
|
|
ret = read_partial_connect(con);
|
|
|
if (ret <= 0)
|
|
@@ -2136,6 +2130,8 @@ more:
|
|
|
goto more;
|
|
|
}
|
|
|
|
|
|
+ BUG_ON(con->state != CON_STATE_OPEN);
|
|
|
+
|
|
|
if (con->in_base_pos < 0) {
|
|
|
/*
|
|
|
* skipping + discarding content.
|
|
@@ -2169,8 +2165,8 @@ more:
|
|
|
prepare_read_ack(con);
|
|
|
break;
|
|
|
case CEPH_MSGR_TAG_CLOSE:
|
|
|
- clear_bit(CONNECTED, &con->state);
|
|
|
- set_bit(CLOSED, &con->state); /* fixme */
|
|
|
+ con_close_socket(con);
|
|
|
+ con->state = CON_STATE_CLOSED;
|
|
|
goto out;
|
|
|
default:
|
|
|
goto bad_tag;
|
|
@@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work)
|
|
|
mutex_lock(&con->mutex);
|
|
|
restart:
|
|
|
if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) {
|
|
|
- if (test_and_clear_bit(CONNECTED, &con->state))
|
|
|
- con->error_msg = "socket closed";
|
|
|
- else if (test_and_clear_bit(NEGOTIATING, &con->state))
|
|
|
- con->error_msg = "negotiation failed";
|
|
|
- else if (test_and_clear_bit(CONNECTING, &con->state))
|
|
|
+ switch (con->state) {
|
|
|
+ case CON_STATE_CONNECTING:
|
|
|
con->error_msg = "connection failed";
|
|
|
- else
|
|
|
+ break;
|
|
|
+ case CON_STATE_NEGOTIATING:
|
|
|
+ con->error_msg = "negotiation failed";
|
|
|
+ break;
|
|
|
+ case CON_STATE_OPEN:
|
|
|
+ con->error_msg = "socket closed";
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ dout("unrecognized con state %d\n", (int)con->state);
|
|
|
con->error_msg = "unrecognized con state";
|
|
|
+ BUG();
|
|
|
+ }
|
|
|
goto fault;
|
|
|
}
|
|
|
|
|
@@ -2271,17 +2274,16 @@ restart:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (test_bit(STANDBY, &con->state)) {
|
|
|
+ if (con->state == CON_STATE_STANDBY) {
|
|
|
dout("con_work %p STANDBY\n", con);
|
|
|
goto done;
|
|
|
}
|
|
|
- if (test_bit(CLOSED, &con->state)) {
|
|
|
+ if (con->state == CON_STATE_CLOSED) {
|
|
|
dout("con_work %p CLOSED\n", con);
|
|
|
BUG_ON(con->sock);
|
|
|
goto done;
|
|
|
}
|
|
|
- if (test_and_clear_bit(OPENING, &con->state)) {
|
|
|
- /* reopen w/ new peer */
|
|
|
+ if (con->state == CON_STATE_PREOPEN) {
|
|
|
dout("con_work OPENING\n");
|
|
|
BUG_ON(con->sock);
|
|
|
}
|
|
@@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con)
|
|
|
dout("fault %p state %lu to peer %s\n",
|
|
|
con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
|
|
|
|
|
|
- if (test_bit(CLOSED, &con->state))
|
|
|
- goto out_unlock;
|
|
|
+ BUG_ON(con->state != CON_STATE_CONNECTING &&
|
|
|
+ con->state != CON_STATE_NEGOTIATING &&
|
|
|
+ con->state != CON_STATE_OPEN);
|
|
|
|
|
|
con_close_socket(con);
|
|
|
|
|
|
if (test_bit(LOSSYTX, &con->flags)) {
|
|
|
- dout("fault on LOSSYTX channel\n");
|
|
|
+ dout("fault on LOSSYTX channel, marking CLOSED\n");
|
|
|
+ con->state = CON_STATE_CLOSED;
|
|
|
goto out_unlock;
|
|
|
}
|
|
|
|
|
@@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con)
|
|
|
!test_bit(KEEPALIVE_PENDING, &con->flags)) {
|
|
|
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
|
|
|
clear_bit(WRITE_PENDING, &con->flags);
|
|
|
- set_bit(STANDBY, &con->state);
|
|
|
+ con->state = CON_STATE_STANDBY;
|
|
|
} else {
|
|
|
/* retry after a delay. */
|
|
|
+ con->state = CON_STATE_PREOPEN;
|
|
|
if (con->delay == 0)
|
|
|
con->delay = BASE_DELAY_INTERVAL;
|
|
|
else if (con->delay < MAX_DELAY_INTERVAL)
|
|
@@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init);
|
|
|
static void clear_standby(struct ceph_connection *con)
|
|
|
{
|
|
|
/* come back from STANDBY? */
|
|
|
- if (test_and_clear_bit(STANDBY, &con->state)) {
|
|
|
+ if (con->state == CON_STATE_STANDBY) {
|
|
|
dout("clear_standby %p and ++connect_seq\n", con);
|
|
|
+ con->state = CON_STATE_PREOPEN;
|
|
|
con->connect_seq++;
|
|
|
WARN_ON(test_bit(WRITE_PENDING, &con->flags));
|
|
|
WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
|
|
@@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
|
|
|
|
|
|
mutex_lock(&con->mutex);
|
|
|
|
|
|
- if (test_bit(CLOSED, &con->state)) {
|
|
|
+ if (con->state == CON_STATE_CLOSED) {
|
|
|
dout("con_send %p closed, dropping %p\n", con, msg);
|
|
|
ceph_msg_put(msg);
|
|
|
mutex_unlock(&con->mutex);
|