Browse Source

ceph: explicitly specify page alignment in network messages

The alignment used for reading data into or out of pages used to be taken
from the data_off field in the message header.  This only worked as long
as the page alignment matched the object offset, breaking direct io to
non-page aligned offsets.

Instead, explicitly specify the page alignment next to the page vector
in the ceph_msg struct, and use that instead of the message header (which
probably shouldn't be trusted).  The alloc_msg callback is responsible for
filling in this field properly when it sets up the page vector.

Signed-off-by: Sage Weil <sage@newdream.net>
Sage Weil 14 years ago
parent
commit
c5c6b19d4b
3 changed files with 9 additions and 5 deletions
  1. 1 0
      include/linux/ceph/messenger.h
  2. 5 5
      net/ceph/messenger.c
  3. 3 0
      net/ceph/osd_client.c

+ 1 - 0
include/linux/ceph/messenger.h

@@ -82,6 +82,7 @@ struct ceph_msg {
 	struct ceph_buffer *middle;
 	struct ceph_buffer *middle;
 	struct page **pages;            /* data payload.  NOT OWNER. */
 	struct page **pages;            /* data payload.  NOT OWNER. */
 	unsigned nr_pages;              /* size of page array */
 	unsigned nr_pages;              /* size of page array */
+	unsigned page_alignment;        /* io offset in first page */
 	struct ceph_pagelist *pagelist; /* instead of pages */
 	struct ceph_pagelist *pagelist; /* instead of pages */
 	struct list_head list_head;
 	struct list_head list_head;
 	struct kref kref;
 	struct kref kref;

+ 5 - 5
net/ceph/messenger.c

@@ -540,8 +540,7 @@ static void prepare_write_message(struct ceph_connection *con)
 		/* initialize page iterator */
 		/* initialize page iterator */
 		con->out_msg_pos.page = 0;
 		con->out_msg_pos.page = 0;
 		if (m->pages)
 		if (m->pages)
-			con->out_msg_pos.page_pos =
-				le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+			con->out_msg_pos.page_pos = m->page_alignment;
 		else
 		else
 			con->out_msg_pos.page_pos = 0;
 			con->out_msg_pos.page_pos = 0;
 		con->out_msg_pos.data_pos = 0;
 		con->out_msg_pos.data_pos = 0;
@@ -1491,7 +1490,7 @@ static int read_partial_message(struct ceph_connection *con)
 	struct ceph_msg *m = con->in_msg;
 	struct ceph_msg *m = con->in_msg;
 	int ret;
 	int ret;
 	int to, left;
 	int to, left;
-	unsigned front_len, middle_len, data_len, data_off;
+	unsigned front_len, middle_len, data_len;
 	int datacrc = con->msgr->nocrc;
 	int datacrc = con->msgr->nocrc;
 	int skip;
 	int skip;
 	u64 seq;
 	u64 seq;
@@ -1527,7 +1526,6 @@ static int read_partial_message(struct ceph_connection *con)
 	data_len = le32_to_cpu(con->in_hdr.data_len);
 	data_len = le32_to_cpu(con->in_hdr.data_len);
 	if (data_len > CEPH_MSG_MAX_DATA_LEN)
 	if (data_len > CEPH_MSG_MAX_DATA_LEN)
 		return -EIO;
 		return -EIO;
-	data_off = le16_to_cpu(con->in_hdr.data_off);
 
 
 	/* verify seq# */
 	/* verify seq# */
 	seq = le64_to_cpu(con->in_hdr.seq);
 	seq = le64_to_cpu(con->in_hdr.seq);
@@ -1575,7 +1573,7 @@ static int read_partial_message(struct ceph_connection *con)
 
 
 		con->in_msg_pos.page = 0;
 		con->in_msg_pos.page = 0;
 		if (m->pages)
 		if (m->pages)
-			con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+			con->in_msg_pos.page_pos = m->page_alignment;
 		else
 		else
 			con->in_msg_pos.page_pos = 0;
 			con->in_msg_pos.page_pos = 0;
 		con->in_msg_pos.data_pos = 0;
 		con->in_msg_pos.data_pos = 0;
@@ -2300,6 +2298,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
 
 
 	/* data */
 	/* data */
 	m->nr_pages = 0;
 	m->nr_pages = 0;
+	m->page_alignment = 0;
 	m->pages = NULL;
 	m->pages = NULL;
 	m->pagelist = NULL;
 	m->pagelist = NULL;
 	m->bio = NULL;
 	m->bio = NULL;
@@ -2369,6 +2368,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
 			       type, front_len);
 			       type, front_len);
 			return NULL;
 			return NULL;
 		}
 		}
+		msg->page_alignment = le16_to_cpu(hdr->data_off);
 	}
 	}
 	memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 	memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 
 

+ 3 - 0
net/ceph/osd_client.c

@@ -391,6 +391,8 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 		req->r_request->hdr.data_len = cpu_to_le32(data_len);
 		req->r_request->hdr.data_len = cpu_to_le32(data_len);
 	}
 	}
 
 
+	req->r_request->page_alignment = req->r_page_alignment;
+
 	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
 	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
 	msg_size = p - msg->front.iov_base;
 	msg_size = p - msg->front.iov_base;
 	msg->front.iov_len = msg_size;
 	msg->front.iov_len = msg_size;
@@ -1657,6 +1659,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 		}
 		}
 		m->pages = req->r_pages;
 		m->pages = req->r_pages;
 		m->nr_pages = req->r_num_pages;
 		m->nr_pages = req->r_num_pages;
+		m->page_alignment = req->r_page_alignment;
 #ifdef CONFIG_BLOCK
 #ifdef CONFIG_BLOCK
 		m->bio = req->r_bio;
 		m->bio = req->r_bio;
 #endif
 #endif