|
@@ -38,48 +38,54 @@ static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
|
|
|
static struct lock_class_key socket_class;
|
|
|
#endif
|
|
|
|
|
|
+/*
|
|
|
+ * When skipping (ignoring) a block of input we read it into a "skip
|
|
|
+ * buffer," which is this many bytes in size.
|
|
|
+ */
|
|
|
+#define SKIP_BUF_SIZE 1024
|
|
|
|
|
|
static void queue_con(struct ceph_connection *con);
|
|
|
static void con_work(struct work_struct *);
|
|
|
static void ceph_fault(struct ceph_connection *con);
|
|
|
|
|
|
/*
|
|
|
- * nicely render a sockaddr as a string.
|
|
|
+ * Nicely render a sockaddr as a string. An array of formatted
|
|
|
+ * strings is used, to approximate reentrancy.
|
|
|
*/
|
|
|
-#define MAX_ADDR_STR 20
|
|
|
-#define MAX_ADDR_STR_LEN 60
|
|
|
-static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN];
|
|
|
-static DEFINE_SPINLOCK(addr_str_lock);
|
|
|
-static int last_addr_str;
|
|
|
+#define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */
|
|
|
+#define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG)
|
|
|
+#define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1)
|
|
|
+#define MAX_ADDR_STR_LEN 64 /* 54 is enough */
|
|
|
+
|
|
|
+static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
|
|
|
+static atomic_t addr_str_seq = ATOMIC_INIT(0);
|
|
|
+
|
|
|
+static struct page *zero_page; /* used in certain error cases */
|
|
|
|
|
|
const char *ceph_pr_addr(const struct sockaddr_storage *ss)
|
|
|
{
|
|
|
int i;
|
|
|
char *s;
|
|
|
- struct sockaddr_in *in4 = (void *)ss;
|
|
|
- struct sockaddr_in6 *in6 = (void *)ss;
|
|
|
-
|
|
|
- spin_lock(&addr_str_lock);
|
|
|
- i = last_addr_str++;
|
|
|
- if (last_addr_str == MAX_ADDR_STR)
|
|
|
- last_addr_str = 0;
|
|
|
- spin_unlock(&addr_str_lock);
|
|
|
+ struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
|
|
|
+ struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
|
|
|
+
|
|
|
+ i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
|
|
|
s = addr_str[i];
|
|
|
|
|
|
switch (ss->ss_family) {
|
|
|
case AF_INET:
|
|
|
- snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr,
|
|
|
- (unsigned int)ntohs(in4->sin_port));
|
|
|
+ snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr,
|
|
|
+ ntohs(in4->sin_port));
|
|
|
break;
|
|
|
|
|
|
case AF_INET6:
|
|
|
- snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr,
|
|
|
- (unsigned int)ntohs(in6->sin6_port));
|
|
|
+ snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr,
|
|
|
+ ntohs(in6->sin6_port));
|
|
|
break;
|
|
|
|
|
|
default:
|
|
|
- snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)",
|
|
|
- (int)ss->ss_family);
|
|
|
+ snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
|
|
|
+ ss->ss_family);
|
|
|
}
|
|
|
|
|
|
return s;
|
|
@@ -95,22 +101,43 @@ static void encode_my_addr(struct ceph_messenger *msgr)
|
|
|
/*
|
|
|
* work queue for all reading and writing to/from the socket.
|
|
|
*/
|
|
|
-struct workqueue_struct *ceph_msgr_wq;
|
|
|
+static struct workqueue_struct *ceph_msgr_wq;
|
|
|
+
|
|
|
+void _ceph_msgr_exit(void)
|
|
|
+{
|
|
|
+ if (ceph_msgr_wq) {
|
|
|
+ destroy_workqueue(ceph_msgr_wq);
|
|
|
+ ceph_msgr_wq = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ BUG_ON(zero_page == NULL);
|
|
|
+ kunmap(zero_page);
|
|
|
+ page_cache_release(zero_page);
|
|
|
+ zero_page = NULL;
|
|
|
+}
|
|
|
|
|
|
int ceph_msgr_init(void)
|
|
|
{
|
|
|
+ BUG_ON(zero_page != NULL);
|
|
|
+ zero_page = ZERO_PAGE(0);
|
|
|
+ page_cache_get(zero_page);
|
|
|
+
|
|
|
ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
|
|
|
- if (!ceph_msgr_wq) {
|
|
|
- pr_err("msgr_init failed to create workqueue\n");
|
|
|
- return -ENOMEM;
|
|
|
- }
|
|
|
- return 0;
|
|
|
+ if (ceph_msgr_wq)
|
|
|
+ return 0;
|
|
|
+
|
|
|
+ pr_err("msgr_init failed to create workqueue\n");
|
|
|
+ _ceph_msgr_exit();
|
|
|
+
|
|
|
+ return -ENOMEM;
|
|
|
}
|
|
|
EXPORT_SYMBOL(ceph_msgr_init);
|
|
|
|
|
|
void ceph_msgr_exit(void)
|
|
|
{
|
|
|
- destroy_workqueue(ceph_msgr_wq);
|
|
|
+ BUG_ON(ceph_msgr_wq == NULL);
|
|
|
+
|
|
|
+ _ceph_msgr_exit();
|
|
|
}
|
|
|
EXPORT_SYMBOL(ceph_msgr_exit);
|
|
|
|
|
@@ -128,8 +155,8 @@ EXPORT_SYMBOL(ceph_msgr_flush);
|
|
|
/* data available on socket, or listen socket received a connect */
|
|
|
static void ceph_data_ready(struct sock *sk, int count_unused)
|
|
|
{
|
|
|
- struct ceph_connection *con =
|
|
|
- (struct ceph_connection *)sk->sk_user_data;
|
|
|
+ struct ceph_connection *con = sk->sk_user_data;
|
|
|
+
|
|
|
if (sk->sk_state != TCP_CLOSE_WAIT) {
|
|
|
dout("ceph_data_ready on %p state = %lu, queueing work\n",
|
|
|
con, con->state);
|
|
@@ -140,26 +167,30 @@ static void ceph_data_ready(struct sock *sk, int count_unused)
|
|
|
/* socket has buffer space for writing */
|
|
|
static void ceph_write_space(struct sock *sk)
|
|
|
{
|
|
|
- struct ceph_connection *con =
|
|
|
- (struct ceph_connection *)sk->sk_user_data;
|
|
|
+ struct ceph_connection *con = sk->sk_user_data;
|
|
|
|
|
|
- /* only queue to workqueue if there is data we want to write. */
|
|
|
+ /* only queue to workqueue if there is data we want to write,
|
|
|
+ * and there is sufficient space in the socket buffer to accept
|
|
|
+ * more data. clear SOCK_NOSPACE so that ceph_write_space()
|
|
|
+ * doesn't get called again until try_write() fills the socket
|
|
|
+ * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
|
|
|
+ * and net/core/stream.c:sk_stream_write_space().
|
|
|
+ */
|
|
|
if (test_bit(WRITE_PENDING, &con->state)) {
|
|
|
- dout("ceph_write_space %p queueing write work\n", con);
|
|
|
- queue_con(con);
|
|
|
+ if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
|
|
|
+ dout("ceph_write_space %p queueing write work\n", con);
|
|
|
+ clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
|
|
|
+ queue_con(con);
|
|
|
+ }
|
|
|
} else {
|
|
|
dout("ceph_write_space %p nothing to write\n", con);
|
|
|
}
|
|
|
-
|
|
|
- /* since we have our own write_space, clear the SOCK_NOSPACE flag */
|
|
|
- clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
|
|
|
}
|
|
|
|
|
|
/* socket's state has changed */
|
|
|
static void ceph_state_change(struct sock *sk)
|
|
|
{
|
|
|
- struct ceph_connection *con =
|
|
|
- (struct ceph_connection *)sk->sk_user_data;
|
|
|
+ struct ceph_connection *con = sk->sk_user_data;
|
|
|
|
|
|
dout("ceph_state_change %p state = %lu sk_state = %u\n",
|
|
|
con, con->state, sk->sk_state);
|
|
@@ -184,6 +215,8 @@ static void ceph_state_change(struct sock *sk)
|
|
|
dout("ceph_state_change TCP_ESTABLISHED\n");
|
|
|
queue_con(con);
|
|
|
break;
|
|
|
+ default: /* Everything else is uninteresting */
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -194,7 +227,7 @@ static void set_sock_callbacks(struct socket *sock,
|
|
|
struct ceph_connection *con)
|
|
|
{
|
|
|
struct sock *sk = sock->sk;
|
|
|
- sk->sk_user_data = (void *)con;
|
|
|
+ sk->sk_user_data = con;
|
|
|
sk->sk_data_ready = ceph_data_ready;
|
|
|
sk->sk_write_space = ceph_write_space;
|
|
|
sk->sk_state_change = ceph_state_change;
|
|
@@ -208,7 +241,7 @@ static void set_sock_callbacks(struct socket *sock,
|
|
|
/*
|
|
|
* initiate connection to a remote socket.
|
|
|
*/
|
|
|
-static struct socket *ceph_tcp_connect(struct ceph_connection *con)
|
|
|
+static int ceph_tcp_connect(struct ceph_connection *con)
|
|
|
{
|
|
|
struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
|
|
|
struct socket *sock;
|
|
@@ -218,8 +251,7 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
|
|
|
ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
|
|
|
IPPROTO_TCP, &sock);
|
|
|
if (ret)
|
|
|
- return ERR_PTR(ret);
|
|
|
- con->sock = sock;
|
|
|
+ return ret;
|
|
|
sock->sk->sk_allocation = GFP_NOFS;
|
|
|
|
|
|
#ifdef CONFIG_LOCKDEP
|
|
@@ -236,19 +268,17 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
|
|
|
dout("connect %s EINPROGRESS sk_state = %u\n",
|
|
|
ceph_pr_addr(&con->peer_addr.in_addr),
|
|
|
sock->sk->sk_state);
|
|
|
- ret = 0;
|
|
|
- }
|
|
|
- if (ret < 0) {
|
|
|
+ } else if (ret < 0) {
|
|
|
pr_err("connect %s error %d\n",
|
|
|
ceph_pr_addr(&con->peer_addr.in_addr), ret);
|
|
|
sock_release(sock);
|
|
|
- con->sock = NULL;
|
|
|
con->error_msg = "connect error";
|
|
|
+
|
|
|
+ return ret;
|
|
|
}
|
|
|
+ con->sock = sock;
|
|
|
|
|
|
- if (ret < 0)
|
|
|
- return ERR_PTR(ret);
|
|
|
- return sock;
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
|
|
@@ -284,6 +314,19 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
|
|
|
return r;
|
|
|
}
|
|
|
|
|
|
+static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
|
|
|
+ int offset, size_t size, int more)
|
|
|
+{
|
|
|
+ int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
|
|
|
+ int ret;
|
|
|
+
|
|
|
+ ret = kernel_sendpage(sock, page, offset, size, flags);
|
|
|
+ if (ret == -EAGAIN)
|
|
|
+ ret = 0;
|
|
|
+
|
|
|
+ return ret;
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
/*
|
|
|
* Shutdown/close the socket for the given connection.
|
|
@@ -391,22 +434,23 @@ bool ceph_con_opened(struct ceph_connection *con)
|
|
|
*/
|
|
|
struct ceph_connection *ceph_con_get(struct ceph_connection *con)
|
|
|
{
|
|
|
- dout("con_get %p nref = %d -> %d\n", con,
|
|
|
- atomic_read(&con->nref), atomic_read(&con->nref) + 1);
|
|
|
- if (atomic_inc_not_zero(&con->nref))
|
|
|
- return con;
|
|
|
- return NULL;
|
|
|
+ int nref = __atomic_add_unless(&con->nref, 1, 0);
|
|
|
+
|
|
|
+ dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
|
|
|
+
|
|
|
+ return nref ? con : NULL;
|
|
|
}
|
|
|
|
|
|
void ceph_con_put(struct ceph_connection *con)
|
|
|
{
|
|
|
- dout("con_put %p nref = %d -> %d\n", con,
|
|
|
- atomic_read(&con->nref), atomic_read(&con->nref) - 1);
|
|
|
- BUG_ON(atomic_read(&con->nref) == 0);
|
|
|
- if (atomic_dec_and_test(&con->nref)) {
|
|
|
+ int nref = atomic_dec_return(&con->nref);
|
|
|
+
|
|
|
+ BUG_ON(nref < 0);
|
|
|
+ if (nref == 0) {
|
|
|
BUG_ON(con->sock);
|
|
|
kfree(con);
|
|
|
}
|
|
|
+ dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -442,14 +486,35 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
+static void ceph_con_out_kvec_reset(struct ceph_connection *con)
|
|
|
+{
|
|
|
+ con->out_kvec_left = 0;
|
|
|
+ con->out_kvec_bytes = 0;
|
|
|
+ con->out_kvec_cur = &con->out_kvec[0];
|
|
|
+}
|
|
|
+
|
|
|
+static void ceph_con_out_kvec_add(struct ceph_connection *con,
|
|
|
+ size_t size, void *data)
|
|
|
+{
|
|
|
+ int index;
|
|
|
+
|
|
|
+ index = con->out_kvec_left;
|
|
|
+ BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
|
|
|
+
|
|
|
+ con->out_kvec[index].iov_len = size;
|
|
|
+ con->out_kvec[index].iov_base = data;
|
|
|
+ con->out_kvec_left++;
|
|
|
+ con->out_kvec_bytes += size;
|
|
|
+}
|
|
|
|
|
|
/*
|
|
|
* Prepare footer for currently outgoing message, and finish things
|
|
|
* off. Assumes out_kvec* are already valid.. we just add on to the end.
|
|
|
*/
|
|
|
-static void prepare_write_message_footer(struct ceph_connection *con, int v)
|
|
|
+static void prepare_write_message_footer(struct ceph_connection *con)
|
|
|
{
|
|
|
struct ceph_msg *m = con->out_msg;
|
|
|
+ int v = con->out_kvec_left;
|
|
|
|
|
|
dout("prepare_write_message_footer %p\n", con);
|
|
|
con->out_kvec_is_msg = true;
|
|
@@ -467,9 +532,9 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v)
|
|
|
static void prepare_write_message(struct ceph_connection *con)
|
|
|
{
|
|
|
struct ceph_msg *m;
|
|
|
- int v = 0;
|
|
|
+ u32 crc;
|
|
|
|
|
|
- con->out_kvec_bytes = 0;
|
|
|
+ ceph_con_out_kvec_reset(con);
|
|
|
con->out_kvec_is_msg = true;
|
|
|
con->out_msg_done = false;
|
|
|
|
|
@@ -477,16 +542,13 @@ static void prepare_write_message(struct ceph_connection *con)
|
|
|
* TCP packet that's a good thing. */
|
|
|
if (con->in_seq > con->in_seq_acked) {
|
|
|
con->in_seq_acked = con->in_seq;
|
|
|
- con->out_kvec[v].iov_base = &tag_ack;
|
|
|
- con->out_kvec[v++].iov_len = 1;
|
|
|
+ ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
|
|
|
con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
|
|
|
- con->out_kvec[v].iov_base = &con->out_temp_ack;
|
|
|
- con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack);
|
|
|
- con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
|
|
|
+ ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
|
|
|
+ &con->out_temp_ack);
|
|
|
}
|
|
|
|
|
|
- m = list_first_entry(&con->out_queue,
|
|
|
- struct ceph_msg, list_head);
|
|
|
+ m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
|
|
|
con->out_msg = m;
|
|
|
|
|
|
/* put message on sent list */
|
|
@@ -510,30 +572,26 @@ static void prepare_write_message(struct ceph_connection *con)
|
|
|
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
|
|
|
|
|
|
/* tag + hdr + front + middle */
|
|
|
- con->out_kvec[v].iov_base = &tag_msg;
|
|
|
- con->out_kvec[v++].iov_len = 1;
|
|
|
- con->out_kvec[v].iov_base = &m->hdr;
|
|
|
- con->out_kvec[v++].iov_len = sizeof(m->hdr);
|
|
|
- con->out_kvec[v++] = m->front;
|
|
|
+ ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
|
|
|
+ ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
|
|
|
+ ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
|
|
|
+
|
|
|
if (m->middle)
|
|
|
- con->out_kvec[v++] = m->middle->vec;
|
|
|
- con->out_kvec_left = v;
|
|
|
- con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
|
|
|
- (m->middle ? m->middle->vec.iov_len : 0);
|
|
|
- con->out_kvec_cur = con->out_kvec;
|
|
|
+ ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
|
|
|
+ m->middle->vec.iov_base);
|
|
|
|
|
|
/* fill in crc (except data pages), footer */
|
|
|
- con->out_msg->hdr.crc =
|
|
|
- cpu_to_le32(crc32c(0, (void *)&m->hdr,
|
|
|
- sizeof(m->hdr) - sizeof(m->hdr.crc)));
|
|
|
+ crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
|
|
|
+ con->out_msg->hdr.crc = cpu_to_le32(crc);
|
|
|
con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
|
|
|
- con->out_msg->footer.front_crc =
|
|
|
- cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
|
|
|
- if (m->middle)
|
|
|
- con->out_msg->footer.middle_crc =
|
|
|
- cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
|
|
|
- m->middle->vec.iov_len));
|
|
|
- else
|
|
|
+
|
|
|
+ crc = crc32c(0, m->front.iov_base, m->front.iov_len);
|
|
|
+ con->out_msg->footer.front_crc = cpu_to_le32(crc);
|
|
|
+ if (m->middle) {
|
|
|
+ crc = crc32c(0, m->middle->vec.iov_base,
|
|
|
+ m->middle->vec.iov_len);
|
|
|
+ con->out_msg->footer.middle_crc = cpu_to_le32(crc);
|
|
|
+ } else
|
|
|
con->out_msg->footer.middle_crc = 0;
|
|
|
con->out_msg->footer.data_crc = 0;
|
|
|
dout("prepare_write_message front_crc %u data_crc %u\n",
|
|
@@ -549,11 +607,11 @@ static void prepare_write_message(struct ceph_connection *con)
|
|
|
else
|
|
|
con->out_msg_pos.page_pos = 0;
|
|
|
con->out_msg_pos.data_pos = 0;
|
|
|
- con->out_msg_pos.did_page_crc = 0;
|
|
|
+ con->out_msg_pos.did_page_crc = false;
|
|
|
con->out_more = 1; /* data + footer will follow */
|
|
|
} else {
|
|
|
/* no, queue up footer too and be done */
|
|
|
- prepare_write_message_footer(con, v);
|
|
|
+ prepare_write_message_footer(con);
|
|
|
}
|
|
|
|
|
|
set_bit(WRITE_PENDING, &con->state);
|
|
@@ -568,14 +626,14 @@ static void prepare_write_ack(struct ceph_connection *con)
|
|
|
con->in_seq_acked, con->in_seq);
|
|
|
con->in_seq_acked = con->in_seq;
|
|
|
|
|
|
- con->out_kvec[0].iov_base = &tag_ack;
|
|
|
- con->out_kvec[0].iov_len = 1;
|
|
|
+ ceph_con_out_kvec_reset(con);
|
|
|
+
|
|
|
+ ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
|
|
|
+
|
|
|
con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
|
|
|
- con->out_kvec[1].iov_base = &con->out_temp_ack;
|
|
|
- con->out_kvec[1].iov_len = sizeof(con->out_temp_ack);
|
|
|
- con->out_kvec_left = 2;
|
|
|
- con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
|
|
|
- con->out_kvec_cur = con->out_kvec;
|
|
|
+ ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
|
|
|
+ &con->out_temp_ack);
|
|
|
+
|
|
|
con->out_more = 1; /* more will follow.. eventually.. */
|
|
|
set_bit(WRITE_PENDING, &con->state);
|
|
|
}
|
|
@@ -586,11 +644,8 @@ static void prepare_write_ack(struct ceph_connection *con)
|
|
|
static void prepare_write_keepalive(struct ceph_connection *con)
|
|
|
{
|
|
|
dout("prepare_write_keepalive %p\n", con);
|
|
|
- con->out_kvec[0].iov_base = &tag_keepalive;
|
|
|
- con->out_kvec[0].iov_len = 1;
|
|
|
- con->out_kvec_left = 1;
|
|
|
- con->out_kvec_bytes = 1;
|
|
|
- con->out_kvec_cur = con->out_kvec;
|
|
|
+ ceph_con_out_kvec_reset(con);
|
|
|
+ ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
|
|
|
set_bit(WRITE_PENDING, &con->state);
|
|
|
}
|
|
|
|
|
@@ -619,12 +674,9 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
|
|
|
con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
|
|
|
con->out_connect.authorizer_len = cpu_to_le32(auth_len);
|
|
|
|
|
|
- if (auth_len) {
|
|
|
- con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
|
|
|
- con->out_kvec[con->out_kvec_left].iov_len = auth_len;
|
|
|
- con->out_kvec_left++;
|
|
|
- con->out_kvec_bytes += auth_len;
|
|
|
- }
|
|
|
+ if (auth_len)
|
|
|
+ ceph_con_out_kvec_add(con, auth_len, auth_buf);
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -634,22 +686,18 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
|
|
|
static void prepare_write_banner(struct ceph_messenger *msgr,
|
|
|
struct ceph_connection *con)
|
|
|
{
|
|
|
- int len = strlen(CEPH_BANNER);
|
|
|
+ ceph_con_out_kvec_reset(con);
|
|
|
+ ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
|
|
|
+ ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
|
|
|
+ &msgr->my_enc_addr);
|
|
|
|
|
|
- con->out_kvec[0].iov_base = CEPH_BANNER;
|
|
|
- con->out_kvec[0].iov_len = len;
|
|
|
- con->out_kvec[1].iov_base = &msgr->my_enc_addr;
|
|
|
- con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
|
|
|
- con->out_kvec_left = 2;
|
|
|
- con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
|
|
|
- con->out_kvec_cur = con->out_kvec;
|
|
|
con->out_more = 0;
|
|
|
set_bit(WRITE_PENDING, &con->state);
|
|
|
}
|
|
|
|
|
|
static int prepare_write_connect(struct ceph_messenger *msgr,
|
|
|
struct ceph_connection *con,
|
|
|
- int after_banner)
|
|
|
+ int include_banner)
|
|
|
{
|
|
|
unsigned global_seq = get_global_seq(con->msgr, 0);
|
|
|
int proto;
|
|
@@ -678,22 +726,18 @@ static int prepare_write_connect(struct ceph_messenger *msgr,
|
|
|
con->out_connect.protocol_version = cpu_to_le32(proto);
|
|
|
con->out_connect.flags = 0;
|
|
|
|
|
|
- if (!after_banner) {
|
|
|
- con->out_kvec_left = 0;
|
|
|
- con->out_kvec_bytes = 0;
|
|
|
- }
|
|
|
- con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
|
|
|
- con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
|
|
|
- con->out_kvec_left++;
|
|
|
- con->out_kvec_bytes += sizeof(con->out_connect);
|
|
|
- con->out_kvec_cur = con->out_kvec;
|
|
|
+ if (include_banner)
|
|
|
+ prepare_write_banner(msgr, con);
|
|
|
+ else
|
|
|
+ ceph_con_out_kvec_reset(con);
|
|
|
+ ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
|
|
|
+
|
|
|
con->out_more = 0;
|
|
|
set_bit(WRITE_PENDING, &con->state);
|
|
|
|
|
|
return prepare_connect_authorizer(con);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/*
|
|
|
* write as much of pending kvecs to the socket as we can.
|
|
|
* 1 -> done
|
|
@@ -714,17 +758,18 @@ static int write_partial_kvec(struct ceph_connection *con)
|
|
|
con->out_kvec_bytes -= ret;
|
|
|
if (con->out_kvec_bytes == 0)
|
|
|
break; /* done */
|
|
|
- while (ret > 0) {
|
|
|
- if (ret >= con->out_kvec_cur->iov_len) {
|
|
|
- ret -= con->out_kvec_cur->iov_len;
|
|
|
- con->out_kvec_cur++;
|
|
|
- con->out_kvec_left--;
|
|
|
- } else {
|
|
|
- con->out_kvec_cur->iov_len -= ret;
|
|
|
- con->out_kvec_cur->iov_base += ret;
|
|
|
- ret = 0;
|
|
|
- break;
|
|
|
- }
|
|
|
+
|
|
|
+ /* account for full iov entries consumed */
|
|
|
+ while (ret >= con->out_kvec_cur->iov_len) {
|
|
|
+ BUG_ON(!con->out_kvec_left);
|
|
|
+ ret -= con->out_kvec_cur->iov_len;
|
|
|
+ con->out_kvec_cur++;
|
|
|
+ con->out_kvec_left--;
|
|
|
+ }
|
|
|
+ /* and for a partially-consumed entry */
|
|
|
+ if (ret) {
|
|
|
+ con->out_kvec_cur->iov_len -= ret;
|
|
|
+ con->out_kvec_cur->iov_base += ret;
|
|
|
}
|
|
|
}
|
|
|
con->out_kvec_left = 0;
|
|
@@ -773,7 +818,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
|
|
|
struct ceph_msg *msg = con->out_msg;
|
|
|
unsigned data_len = le32_to_cpu(msg->hdr.data_len);
|
|
|
size_t len;
|
|
|
- int crc = con->msgr->nocrc;
|
|
|
+ bool do_datacrc = !con->msgr->nocrc;
|
|
|
int ret;
|
|
|
int total_max_write;
|
|
|
int in_trail = 0;
|
|
@@ -790,9 +835,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
|
|
|
|
|
|
while (data_len > con->out_msg_pos.data_pos) {
|
|
|
struct page *page = NULL;
|
|
|
- void *kaddr = NULL;
|
|
|
int max_write = PAGE_SIZE;
|
|
|
- int page_shift = 0;
|
|
|
+ int bio_offset = 0;
|
|
|
|
|
|
total_max_write = data_len - trail_len -
|
|
|
con->out_msg_pos.data_pos;
|
|
@@ -811,58 +855,47 @@ static int write_partial_msg_pages(struct ceph_connection *con)
|
|
|
|
|
|
page = list_first_entry(&msg->trail->head,
|
|
|
struct page, lru);
|
|
|
- if (crc)
|
|
|
- kaddr = kmap(page);
|
|
|
max_write = PAGE_SIZE;
|
|
|
} else if (msg->pages) {
|
|
|
page = msg->pages[con->out_msg_pos.page];
|
|
|
- if (crc)
|
|
|
- kaddr = kmap(page);
|
|
|
} else if (msg->pagelist) {
|
|
|
page = list_first_entry(&msg->pagelist->head,
|
|
|
struct page, lru);
|
|
|
- if (crc)
|
|
|
- kaddr = kmap(page);
|
|
|
#ifdef CONFIG_BLOCK
|
|
|
} else if (msg->bio) {
|
|
|
struct bio_vec *bv;
|
|
|
|
|
|
bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
|
|
|
page = bv->bv_page;
|
|
|
- page_shift = bv->bv_offset;
|
|
|
- if (crc)
|
|
|
- kaddr = kmap(page) + page_shift;
|
|
|
+ bio_offset = bv->bv_offset;
|
|
|
max_write = bv->bv_len;
|
|
|
#endif
|
|
|
} else {
|
|
|
- page = con->msgr->zero_page;
|
|
|
- if (crc)
|
|
|
- kaddr = page_address(con->msgr->zero_page);
|
|
|
+ page = zero_page;
|
|
|
}
|
|
|
len = min_t(int, max_write - con->out_msg_pos.page_pos,
|
|
|
total_max_write);
|
|
|
|
|
|
- if (crc && !con->out_msg_pos.did_page_crc) {
|
|
|
- void *base = kaddr + con->out_msg_pos.page_pos;
|
|
|
+ if (do_datacrc && !con->out_msg_pos.did_page_crc) {
|
|
|
+ void *base;
|
|
|
+ u32 crc;
|
|
|
u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
|
|
|
+ char *kaddr;
|
|
|
|
|
|
+ kaddr = kmap(page);
|
|
|
BUG_ON(kaddr == NULL);
|
|
|
- con->out_msg->footer.data_crc =
|
|
|
- cpu_to_le32(crc32c(tmpcrc, base, len));
|
|
|
- con->out_msg_pos.did_page_crc = 1;
|
|
|
+ base = kaddr + con->out_msg_pos.page_pos + bio_offset;
|
|
|
+ crc = crc32c(tmpcrc, base, len);
|
|
|
+ con->out_msg->footer.data_crc = cpu_to_le32(crc);
|
|
|
+ con->out_msg_pos.did_page_crc = true;
|
|
|
}
|
|
|
- ret = kernel_sendpage(con->sock, page,
|
|
|
- con->out_msg_pos.page_pos + page_shift,
|
|
|
- len,
|
|
|
- MSG_DONTWAIT | MSG_NOSIGNAL |
|
|
|
- MSG_MORE);
|
|
|
-
|
|
|
- if (crc &&
|
|
|
- (msg->pages || msg->pagelist || msg->bio || in_trail))
|
|
|
+ ret = ceph_tcp_sendpage(con->sock, page,
|
|
|
+ con->out_msg_pos.page_pos + bio_offset,
|
|
|
+ len, 1);
|
|
|
+
|
|
|
+ if (do_datacrc)
|
|
|
kunmap(page);
|
|
|
|
|
|
- if (ret == -EAGAIN)
|
|
|
- ret = 0;
|
|
|
if (ret <= 0)
|
|
|
goto out;
|
|
|
|
|
@@ -871,7 +904,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
|
|
|
if (ret == len) {
|
|
|
con->out_msg_pos.page_pos = 0;
|
|
|
con->out_msg_pos.page++;
|
|
|
- con->out_msg_pos.did_page_crc = 0;
|
|
|
+ con->out_msg_pos.did_page_crc = false;
|
|
|
if (in_trail)
|
|
|
list_move_tail(&page->lru,
|
|
|
&msg->trail->head);
|
|
@@ -888,12 +921,10 @@ static int write_partial_msg_pages(struct ceph_connection *con)
|
|
|
dout("write_partial_msg_pages %p msg %p done\n", con, msg);
|
|
|
|
|
|
/* prepare and queue up footer, too */
|
|
|
- if (!crc)
|
|
|
+ if (!do_datacrc)
|
|
|
con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
|
|
|
- con->out_kvec_bytes = 0;
|
|
|
- con->out_kvec_left = 0;
|
|
|
- con->out_kvec_cur = con->out_kvec;
|
|
|
- prepare_write_message_footer(con, 0);
|
|
|
+ ceph_con_out_kvec_reset(con);
|
|
|
+ prepare_write_message_footer(con);
|
|
|
ret = 1;
|
|
|
out:
|
|
|
return ret;
|
|
@@ -907,12 +938,9 @@ static int write_partial_skip(struct ceph_connection *con)
|
|
|
int ret;
|
|
|
|
|
|
while (con->out_skip > 0) {
|
|
|
- struct kvec iov = {
|
|
|
- .iov_base = page_address(con->msgr->zero_page),
|
|
|
- .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
|
|
|
- };
|
|
|
+ size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
|
|
|
|
|
|
- ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
|
|
|
+ ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
|
|
|
if (ret <= 0)
|
|
|
goto out;
|
|
|
con->out_skip -= ret;
|
|
@@ -1085,8 +1113,8 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)
|
|
|
static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
|
|
|
char delim, const char **ipend)
|
|
|
{
|
|
|
- struct sockaddr_in *in4 = (void *)ss;
|
|
|
- struct sockaddr_in6 *in6 = (void *)ss;
|
|
|
+ struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
|
|
|
+ struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
|
|
|
|
|
|
memset(ss, 0, sizeof(*ss));
|
|
|
|
|
@@ -1512,10 +1540,9 @@ static int read_partial_message_section(struct ceph_connection *con,
|
|
|
if (ret <= 0)
|
|
|
return ret;
|
|
|
section->iov_len += ret;
|
|
|
- if (section->iov_len == sec_len)
|
|
|
- *crc = crc32c(0, section->iov_base,
|
|
|
- section->iov_len);
|
|
|
}
|
|
|
+ if (section->iov_len == sec_len)
|
|
|
+ *crc = crc32c(0, section->iov_base, section->iov_len);
|
|
|
|
|
|
return 1;
|
|
|
}
|
|
@@ -1527,7 +1554,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
|
|
|
|
|
|
static int read_partial_message_pages(struct ceph_connection *con,
|
|
|
struct page **pages,
|
|
|
- unsigned data_len, int datacrc)
|
|
|
+ unsigned data_len, bool do_datacrc)
|
|
|
{
|
|
|
void *p;
|
|
|
int ret;
|
|
@@ -1540,7 +1567,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
|
|
|
p = kmap(pages[con->in_msg_pos.page]);
|
|
|
ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
|
|
|
left);
|
|
|
- if (ret > 0 && datacrc)
|
|
|
+ if (ret > 0 && do_datacrc)
|
|
|
con->in_data_crc =
|
|
|
crc32c(con->in_data_crc,
|
|
|
p + con->in_msg_pos.page_pos, ret);
|
|
@@ -1560,7 +1587,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
|
|
|
#ifdef CONFIG_BLOCK
|
|
|
static int read_partial_message_bio(struct ceph_connection *con,
|
|
|
struct bio **bio_iter, int *bio_seg,
|
|
|
- unsigned data_len, int datacrc)
|
|
|
+ unsigned data_len, bool do_datacrc)
|
|
|
{
|
|
|
struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
|
|
|
void *p;
|
|
@@ -1576,7 +1603,7 @@ static int read_partial_message_bio(struct ceph_connection *con,
|
|
|
|
|
|
ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
|
|
|
left);
|
|
|
- if (ret > 0 && datacrc)
|
|
|
+ if (ret > 0 && do_datacrc)
|
|
|
con->in_data_crc =
|
|
|
crc32c(con->in_data_crc,
|
|
|
p + con->in_msg_pos.page_pos, ret);
|
|
@@ -1603,9 +1630,10 @@ static int read_partial_message(struct ceph_connection *con)
|
|
|
int ret;
|
|
|
int to, left;
|
|
|
unsigned front_len, middle_len, data_len;
|
|
|
- int datacrc = con->msgr->nocrc;
|
|
|
+ bool do_datacrc = !con->msgr->nocrc;
|
|
|
int skip;
|
|
|
u64 seq;
|
|
|
+ u32 crc;
|
|
|
|
|
|
dout("read_partial_message con %p msg %p\n", con, m);
|
|
|
|
|
@@ -1618,17 +1646,16 @@ static int read_partial_message(struct ceph_connection *con)
|
|
|
if (ret <= 0)
|
|
|
return ret;
|
|
|
con->in_base_pos += ret;
|
|
|
- if (con->in_base_pos == sizeof(con->in_hdr)) {
|
|
|
- u32 crc = crc32c(0, (void *)&con->in_hdr,
|
|
|
- sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
|
|
|
- if (crc != le32_to_cpu(con->in_hdr.crc)) {
|
|
|
- pr_err("read_partial_message bad hdr "
|
|
|
- " crc %u != expected %u\n",
|
|
|
- crc, con->in_hdr.crc);
|
|
|
- return -EBADMSG;
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
+
|
|
|
+ crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
|
|
|
+ if (cpu_to_le32(crc) != con->in_hdr.crc) {
|
|
|
+ pr_err("read_partial_message bad hdr "
|
|
|
+ " crc %u != expected %u\n",
|
|
|
+ crc, con->in_hdr.crc);
|
|
|
+ return -EBADMSG;
|
|
|
+ }
|
|
|
+
|
|
|
front_len = le32_to_cpu(con->in_hdr.front_len);
|
|
|
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
|
|
|
return -EIO;
|
|
@@ -1714,7 +1741,7 @@ static int read_partial_message(struct ceph_connection *con)
|
|
|
while (con->in_msg_pos.data_pos < data_len) {
|
|
|
if (m->pages) {
|
|
|
ret = read_partial_message_pages(con, m->pages,
|
|
|
- data_len, datacrc);
|
|
|
+ data_len, do_datacrc);
|
|
|
if (ret <= 0)
|
|
|
return ret;
|
|
|
#ifdef CONFIG_BLOCK
|
|
@@ -1722,7 +1749,7 @@ static int read_partial_message(struct ceph_connection *con)
|
|
|
|
|
|
ret = read_partial_message_bio(con,
|
|
|
&m->bio_iter, &m->bio_seg,
|
|
|
- data_len, datacrc);
|
|
|
+ data_len, do_datacrc);
|
|
|
if (ret <= 0)
|
|
|
return ret;
|
|
|
#endif
|
|
@@ -1757,7 +1784,7 @@ static int read_partial_message(struct ceph_connection *con)
|
|
|
m, con->in_middle_crc, m->footer.middle_crc);
|
|
|
return -EBADMSG;
|
|
|
}
|
|
|
- if (datacrc &&
|
|
|
+ if (do_datacrc &&
|
|
|
(m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
|
|
|
con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
|
|
|
pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
|
|
@@ -1819,7 +1846,6 @@ more:
|
|
|
|
|
|
/* open the socket first? */
|
|
|
if (con->sock == NULL) {
|
|
|
- prepare_write_banner(msgr, con);
|
|
|
prepare_write_connect(msgr, con, 1);
|
|
|
prepare_read_banner(con);
|
|
|
set_bit(CONNECTING, &con->state);
|
|
@@ -1829,11 +1855,9 @@ more:
|
|
|
con->in_tag = CEPH_MSGR_TAG_READY;
|
|
|
dout("try_write initiating connect on %p new state %lu\n",
|
|
|
con, con->state);
|
|
|
- con->sock = ceph_tcp_connect(con);
|
|
|
- if (IS_ERR(con->sock)) {
|
|
|
- con->sock = NULL;
|
|
|
+ ret = ceph_tcp_connect(con);
|
|
|
+ if (ret < 0) {
|
|
|
con->error_msg = "connect error";
|
|
|
- ret = -1;
|
|
|
goto out;
|
|
|
}
|
|
|
}
|
|
@@ -1953,8 +1977,9 @@ more:
|
|
|
*
|
|
|
* FIXME: there must be a better way to do this!
|
|
|
*/
|
|
|
- static char buf[1024];
|
|
|
- int skip = min(1024, -con->in_base_pos);
|
|
|
+ static char buf[SKIP_BUF_SIZE];
|
|
|
+ int skip = min((int) sizeof (buf), -con->in_base_pos);
|
|
|
+
|
|
|
dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
|
|
|
ret = ceph_tcp_recvmsg(con->sock, buf, skip);
|
|
|
if (ret <= 0)
|
|
@@ -2216,15 +2241,6 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
|
|
|
|
|
|
spin_lock_init(&msgr->global_seq_lock);
|
|
|
|
|
|
- /* the zero page is needed if a request is "canceled" while the message
|
|
|
- * is being written over the socket */
|
|
|
- msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
|
|
|
- if (!msgr->zero_page) {
|
|
|
- kfree(msgr);
|
|
|
- return ERR_PTR(-ENOMEM);
|
|
|
- }
|
|
|
- kmap(msgr->zero_page);
|
|
|
-
|
|
|
if (myaddr)
|
|
|
msgr->inst.addr = *myaddr;
|
|
|
|
|
@@ -2241,8 +2257,6 @@ EXPORT_SYMBOL(ceph_messenger_create);
|
|
|
void ceph_messenger_destroy(struct ceph_messenger *msgr)
|
|
|
{
|
|
|
dout("destroy %p\n", msgr);
|
|
|
- kunmap(msgr->zero_page);
|
|
|
- __free_page(msgr->zero_page);
|
|
|
kfree(msgr);
|
|
|
dout("destroyed messenger %p\n", msgr);
|
|
|
}
|