瀏覽代碼

ceph: properly handle aborted mds requests

Previously, if the MDS request was interrupted, we would unregister the
request and ignore any reply.  This could cause the caps or other cache
state to become out of sync.  (For instance, aborting dbench and doing
rm -r on clients would complain about a non-empty directory because the
client didn't realize it's aborted file create request completed.)

Even we don't unregister, we still can't process the reply normally because
we are no longer holding the caller's locks (like the dir i_mutex).

So, mark aborted operations with r_aborted, and in the reply handler, be
sure to process all the caps.  Do not process the namespace changes,
though, since we no longer will hold the dir i_mutex.  The dentry lease
state can also be ignored as it's more forgiving.

Signed-off-by: Sage Weil <sage@newdream.net>
Sage Weil 15 年之前
父節點
當前提交
5b1daecd59
共有 3 個文件被更改,包括 34 次插入11 次删除
  1. 10 6
      fs/ceph/inode.c
  2. 23 5
      fs/ceph/mds_client.c
  3. 1 0
      fs/ceph/mds_client.h

+ 10 - 6
fs/ceph/inode.c

@@ -915,6 +915,16 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 	}
 
 	if (rinfo->head->is_dentry) {
+		struct inode *dir = req->r_locked_dir;
+
+		err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
+				 session, req->r_request_started, -1,
+				 &req->r_caps_reservation);
+		if (err < 0)
+			return err;
+	}
+
+	if (rinfo->head->is_dentry && !req->r_aborted) {
 		/*
 		 * lookup link rename   : null -> possibly existing inode
 		 * mknod symlink mkdir  : null -> new inode
@@ -932,12 +942,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 		BUG_ON(ceph_snap(dir) !=
 		       le64_to_cpu(rinfo->diri.in->snapid));
 
-		err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
-				 session, req->r_request_started, -1,
-				 &req->r_caps_reservation);
-		if (err < 0)
-			return err;
-
 		/* do we have a lease on the whole dir? */
 		have_dir_cap =
 			(le32_to_cpu(rinfo->diri.in->cap.caps) &

+ 23 - 5
fs/ceph/mds_client.c

@@ -1624,11 +1624,29 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
 		err = PTR_ERR(req->r_reply);
 		req->r_reply = NULL;
 
-		/* clean up */
-		__unregister_request(mdsc, req);
-		if (!list_empty(&req->r_unsafe_item))
-			list_del_init(&req->r_unsafe_item);
-		complete(&req->r_safe_completion);
+		if (err == -ERESTARTSYS) {
+			/* aborted */
+			req->r_aborted = true;
+
+			if (req->r_locked_dir &&
+			    (req->r_op & CEPH_MDS_OP_WRITE)) {
+				struct ceph_inode_info *ci =
+					ceph_inode(req->r_locked_dir);
+
+				dout("aborted, clearing I_COMPLETE on %p\n", 
+				     req->r_locked_dir);
+				spin_lock(&req->r_locked_dir->i_lock);
+				ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
+				ci->i_release_count++;
+				spin_unlock(&req->r_locked_dir->i_lock);
+			}
+		} else {
+			/* clean up this request */
+			__unregister_request(mdsc, req);
+			if (!list_empty(&req->r_unsafe_item))
+				list_del_init(&req->r_unsafe_item);
+			complete(&req->r_safe_completion);
+		}
 	} else if (req->r_err) {
 		err = req->r_err;
 	} else {

+ 1 - 0
fs/ceph/mds_client.h

@@ -188,6 +188,7 @@ struct ceph_mds_request {
 	struct ceph_msg  *r_reply;
 	struct ceph_mds_reply_info_parsed r_reply_info;
 	int r_err;
+	bool r_aborted;
 
 	unsigned long r_timeout;  /* optional.  jiffies */
 	unsigned long r_started;  /* start time to measure timeout against */