|
@@ -87,6 +87,13 @@ void ceph_osdc_release_request(struct kref *kref)
|
|
|
ceph_msg_put(req->r_request);
|
|
|
if (req->r_reply)
|
|
|
ceph_msg_put(req->r_reply);
|
|
|
+ if (req->r_con_filling_pages) {
|
|
|
+ dout("release_request revoking pages %p from con %p\n",
|
|
|
+ req->r_pages, req->r_con_filling_pages);
|
|
|
+ ceph_con_revoke_pages(req->r_con_filling_pages,
|
|
|
+ req->r_pages);
|
|
|
+ ceph_con_put(req->r_con_filling_pages);
|
|
|
+ }
|
|
|
if (req->r_own_pages)
|
|
|
ceph_release_page_vector(req->r_pages,
|
|
|
req->r_num_pages);
|
|
@@ -687,7 +694,8 @@ static void handle_timeout(struct work_struct *work)
|
|
|
* handle osd op reply. either call the callback if it is specified,
|
|
|
* or do the completion to wake up the waiting thread.
|
|
|
*/
|
|
|
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
|
|
|
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
|
|
|
+ struct ceph_connection *con)
|
|
|
{
|
|
|
struct ceph_osd_reply_head *rhead = msg->front.iov_base;
|
|
|
struct ceph_osd_request *req;
|
|
@@ -715,6 +723,16 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
|
|
|
ceph_osdc_get_request(req);
|
|
|
flags = le32_to_cpu(rhead->flags);
|
|
|
|
|
|
+ /*
|
|
|
+ * if this connection filled our pages, drop our reference now, to
|
|
|
+ * avoid a (safe but slower) revoke later.
|
|
|
+ */
|
|
|
+ if (req->r_con_filling_pages == con && req->r_pages == msg->pages) {
|
|
|
+ dout(" got pages, dropping con_filling_pages ref %p\n", con);
|
|
|
+ req->r_con_filling_pages = NULL;
|
|
|
+ ceph_con_put(con);
|
|
|
+ }
|
|
|
+
|
|
|
if (req->r_reply) {
|
|
|
/*
|
|
|
* once we see the message has been received, we don't
|
|
@@ -1007,14 +1025,20 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
|
|
|
}
|
|
|
dout("prepare_pages tid %llu has %d pages, want %d\n",
|
|
|
tid, req->r_num_pages, want);
|
|
|
- if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) {
|
|
|
- m->pages = req->r_pages;
|
|
|
- m->nr_pages = req->r_num_pages;
|
|
|
- req->r_reply = m; /* only for duration of read over socket */
|
|
|
- ceph_msg_get(m);
|
|
|
- req->r_prepared_pages = 1;
|
|
|
- ret = 0; /* success */
|
|
|
+ if (unlikely(req->r_num_pages < want))
|
|
|
+ goto out;
|
|
|
+
|
|
|
+ if (req->r_con_filling_pages) {
|
|
|
+ dout("revoking pages %p from old con %p\n", req->r_pages,
|
|
|
+ req->r_con_filling_pages);
|
|
|
+ ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
|
|
|
+ ceph_con_put(req->r_con_filling_pages);
|
|
|
}
|
|
|
+ req->r_con_filling_pages = ceph_con_get(con);
|
|
|
+ req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
|
|
|
+ m->pages = req->r_pages;
|
|
|
+ m->nr_pages = req->r_num_pages;
|
|
|
+ ret = 0; /* success */
|
|
|
out:
|
|
|
mutex_unlock(&osdc->request_mutex);
|
|
|
return ret;
|
|
@@ -1269,7 +1293,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
|
|
|
ceph_osdc_handle_map(osdc, msg);
|
|
|
break;
|
|
|
case CEPH_MSG_OSD_OPREPLY:
|
|
|
- handle_reply(osdc, msg);
|
|
|
+ handle_reply(osdc, msg, con);
|
|
|
break;
|
|
|
|
|
|
default:
|