|
@@ -40,7 +40,7 @@
|
|
|
static void __wake_requests(struct ceph_mds_client *mdsc,
|
|
|
struct list_head *head);
|
|
|
|
|
|
-const static struct ceph_connection_operations mds_con_ops;
|
|
|
+static const struct ceph_connection_operations mds_con_ops;
|
|
|
|
|
|
|
|
|
/*
|
|
@@ -665,10 +665,10 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
|
|
|
struct ceph_msg *msg;
|
|
|
struct ceph_mds_session_head *h;
|
|
|
|
|
|
- msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
|
|
|
- if (IS_ERR(msg)) {
|
|
|
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
|
|
|
+ if (!msg) {
|
|
|
pr_err("create_session_msg ENOMEM creating msg\n");
|
|
|
- return ERR_PTR(PTR_ERR(msg));
|
|
|
+ return NULL;
|
|
|
}
|
|
|
h = msg->front.iov_base;
|
|
|
h->op = cpu_to_le32(op);
|
|
@@ -687,7 +687,6 @@ static int __open_session(struct ceph_mds_client *mdsc,
|
|
|
struct ceph_msg *msg;
|
|
|
int mstate;
|
|
|
int mds = session->s_mds;
|
|
|
- int err = 0;
|
|
|
|
|
|
/* wait for mds to go active? */
|
|
|
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
|
|
@@ -698,13 +697,9 @@ static int __open_session(struct ceph_mds_client *mdsc,
|
|
|
|
|
|
/* send connect message */
|
|
|
msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
|
|
|
- if (IS_ERR(msg)) {
|
|
|
- err = PTR_ERR(msg);
|
|
|
- goto out;
|
|
|
- }
|
|
|
+ if (!msg)
|
|
|
+ return -ENOMEM;
|
|
|
ceph_con_send(&session->s_con, msg);
|
|
|
-
|
|
|
-out:
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -804,12 +799,49 @@ out:
|
|
|
}
|
|
|
|
|
|
static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
|
|
|
- void *arg)
|
|
|
+ void *arg)
|
|
|
{
|
|
|
struct ceph_inode_info *ci = ceph_inode(inode);
|
|
|
+ int drop = 0;
|
|
|
+
|
|
|
dout("removing cap %p, ci is %p, inode is %p\n",
|
|
|
cap, ci, &ci->vfs_inode);
|
|
|
- ceph_remove_cap(cap);
|
|
|
+ spin_lock(&inode->i_lock);
|
|
|
+ __ceph_remove_cap(cap);
|
|
|
+ if (!__ceph_is_any_real_caps(ci)) {
|
|
|
+ struct ceph_mds_client *mdsc =
|
|
|
+ &ceph_sb_to_client(inode->i_sb)->mdsc;
|
|
|
+
|
|
|
+ spin_lock(&mdsc->cap_dirty_lock);
|
|
|
+ if (!list_empty(&ci->i_dirty_item)) {
|
|
|
+ pr_info(" dropping dirty %s state for %p %lld\n",
|
|
|
+ ceph_cap_string(ci->i_dirty_caps),
|
|
|
+ inode, ceph_ino(inode));
|
|
|
+ ci->i_dirty_caps = 0;
|
|
|
+ list_del_init(&ci->i_dirty_item);
|
|
|
+ drop = 1;
|
|
|
+ }
|
|
|
+ if (!list_empty(&ci->i_flushing_item)) {
|
|
|
+ pr_info(" dropping dirty+flushing %s state for %p %lld\n",
|
|
|
+ ceph_cap_string(ci->i_flushing_caps),
|
|
|
+ inode, ceph_ino(inode));
|
|
|
+ ci->i_flushing_caps = 0;
|
|
|
+ list_del_init(&ci->i_flushing_item);
|
|
|
+ mdsc->num_cap_flushing--;
|
|
|
+ drop = 1;
|
|
|
+ }
|
|
|
+ if (drop && ci->i_wrbuffer_ref) {
|
|
|
+ pr_info(" dropping dirty data for %p %lld\n",
|
|
|
+ inode, ceph_ino(inode));
|
|
|
+ ci->i_wrbuffer_ref = 0;
|
|
|
+ ci->i_wrbuffer_ref_head = 0;
|
|
|
+ drop++;
|
|
|
+ }
|
|
|
+ spin_unlock(&mdsc->cap_dirty_lock);
|
|
|
+ }
|
|
|
+ spin_unlock(&inode->i_lock);
|
|
|
+ while (drop--)
|
|
|
+ iput(inode);
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -821,6 +853,7 @@ static void remove_session_caps(struct ceph_mds_session *session)
|
|
|
dout("remove_session_caps on %p\n", session);
|
|
|
iterate_session_caps(session, remove_session_caps_cb, NULL);
|
|
|
BUG_ON(session->s_nr_caps > 0);
|
|
|
+ BUG_ON(!list_empty(&session->s_cap_flushing));
|
|
|
cleanup_cap_releases(session);
|
|
|
}
|
|
|
|
|
@@ -883,8 +916,8 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
|
|
|
ceph_mds_state_name(state));
|
|
|
msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
|
|
|
++session->s_renew_seq);
|
|
|
- if (IS_ERR(msg))
|
|
|
- return PTR_ERR(msg);
|
|
|
+ if (!msg)
|
|
|
+ return -ENOMEM;
|
|
|
ceph_con_send(&session->s_con, msg);
|
|
|
return 0;
|
|
|
}
|
|
@@ -931,17 +964,15 @@ static int request_close_session(struct ceph_mds_client *mdsc,
|
|
|
struct ceph_mds_session *session)
|
|
|
{
|
|
|
struct ceph_msg *msg;
|
|
|
- int err = 0;
|
|
|
|
|
|
dout("request_close_session mds%d state %s seq %lld\n",
|
|
|
session->s_mds, session_state_name(session->s_state),
|
|
|
session->s_seq);
|
|
|
msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
|
|
|
- if (IS_ERR(msg))
|
|
|
- err = PTR_ERR(msg);
|
|
|
- else
|
|
|
- ceph_con_send(&session->s_con, msg);
|
|
|
- return err;
|
|
|
+ if (!msg)
|
|
|
+ return -ENOMEM;
|
|
|
+ ceph_con_send(&session->s_con, msg);
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -1059,7 +1090,7 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,
|
|
|
while (session->s_num_cap_releases < session->s_nr_caps + extra) {
|
|
|
spin_unlock(&session->s_cap_lock);
|
|
|
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
|
|
|
- 0, 0, NULL);
|
|
|
+ GFP_NOFS);
|
|
|
if (!msg)
|
|
|
goto out_unlocked;
|
|
|
dout("add_cap_releases %p msg %p now %d\n", session, msg,
|
|
@@ -1151,10 +1182,8 @@ static void send_cap_releases(struct ceph_mds_client *mdsc,
|
|
|
struct ceph_msg *msg;
|
|
|
|
|
|
dout("send_cap_releases mds%d\n", session->s_mds);
|
|
|
- while (1) {
|
|
|
- spin_lock(&session->s_cap_lock);
|
|
|
- if (list_empty(&session->s_cap_releases_done))
|
|
|
- break;
|
|
|
+ spin_lock(&session->s_cap_lock);
|
|
|
+ while (!list_empty(&session->s_cap_releases_done)) {
|
|
|
msg = list_first_entry(&session->s_cap_releases_done,
|
|
|
struct ceph_msg, list_head);
|
|
|
list_del_init(&msg->list_head);
|
|
@@ -1162,10 +1191,49 @@ static void send_cap_releases(struct ceph_mds_client *mdsc,
|
|
|
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
|
|
|
dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
|
|
|
ceph_con_send(&session->s_con, msg);
|
|
|
+ spin_lock(&session->s_cap_lock);
|
|
|
}
|
|
|
spin_unlock(&session->s_cap_lock);
|
|
|
}
|
|
|
|
|
|
+static void discard_cap_releases(struct ceph_mds_client *mdsc,
|
|
|
+ struct ceph_mds_session *session)
|
|
|
+{
|
|
|
+ struct ceph_msg *msg;
|
|
|
+ struct ceph_mds_cap_release *head;
|
|
|
+ unsigned num;
|
|
|
+
|
|
|
+ dout("discard_cap_releases mds%d\n", session->s_mds);
|
|
|
+ spin_lock(&session->s_cap_lock);
|
|
|
+
|
|
|
+ /* zero out the in-progress message */
|
|
|
+ msg = list_first_entry(&session->s_cap_releases,
|
|
|
+ struct ceph_msg, list_head);
|
|
|
+ head = msg->front.iov_base;
|
|
|
+ num = le32_to_cpu(head->num);
|
|
|
+ dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
|
|
|
+ head->num = cpu_to_le32(0);
|
|
|
+ session->s_num_cap_releases += num;
|
|
|
+
|
|
|
+ /* requeue completed messages */
|
|
|
+ while (!list_empty(&session->s_cap_releases_done)) {
|
|
|
+ msg = list_first_entry(&session->s_cap_releases_done,
|
|
|
+ struct ceph_msg, list_head);
|
|
|
+ list_del_init(&msg->list_head);
|
|
|
+
|
|
|
+ head = msg->front.iov_base;
|
|
|
+ num = le32_to_cpu(head->num);
|
|
|
+ dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
|
|
|
+ num);
|
|
|
+ session->s_num_cap_releases += num;
|
|
|
+ head->num = cpu_to_le32(0);
|
|
|
+ msg->front.iov_len = sizeof(*head);
|
|
|
+ list_add(&msg->list_head, &session->s_cap_releases);
|
|
|
+ }
|
|
|
+
|
|
|
+ spin_unlock(&session->s_cap_lock);
|
|
|
+}
|
|
|
+
|
|
|
/*
|
|
|
* requests
|
|
|
*/
|
|
@@ -1181,6 +1249,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
|
|
|
if (!req)
|
|
|
return ERR_PTR(-ENOMEM);
|
|
|
|
|
|
+ mutex_init(&req->r_fill_mutex);
|
|
|
req->r_started = jiffies;
|
|
|
req->r_resend_mds = -1;
|
|
|
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
|
|
@@ -1251,7 +1320,7 @@ retry:
|
|
|
len += 1 + temp->d_name.len;
|
|
|
temp = temp->d_parent;
|
|
|
if (temp == NULL) {
|
|
|
- pr_err("build_path_dentry corrupt dentry %p\n", dentry);
|
|
|
+ pr_err("build_path corrupt dentry %p\n", dentry);
|
|
|
return ERR_PTR(-EINVAL);
|
|
|
}
|
|
|
}
|
|
@@ -1267,7 +1336,7 @@ retry:
|
|
|
struct inode *inode = temp->d_inode;
|
|
|
|
|
|
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
|
|
|
- dout("build_path_dentry path+%d: %p SNAPDIR\n",
|
|
|
+ dout("build_path path+%d: %p SNAPDIR\n",
|
|
|
pos, temp);
|
|
|
} else if (stop_on_nosnap && inode &&
|
|
|
ceph_snap(inode) == CEPH_NOSNAP) {
|
|
@@ -1278,20 +1347,18 @@ retry:
|
|
|
break;
|
|
|
strncpy(path + pos, temp->d_name.name,
|
|
|
temp->d_name.len);
|
|
|
- dout("build_path_dentry path+%d: %p '%.*s'\n",
|
|
|
- pos, temp, temp->d_name.len, path + pos);
|
|
|
}
|
|
|
if (pos)
|
|
|
path[--pos] = '/';
|
|
|
temp = temp->d_parent;
|
|
|
if (temp == NULL) {
|
|
|
- pr_err("build_path_dentry corrupt dentry\n");
|
|
|
+ pr_err("build_path corrupt dentry\n");
|
|
|
kfree(path);
|
|
|
return ERR_PTR(-EINVAL);
|
|
|
}
|
|
|
}
|
|
|
if (pos != 0) {
|
|
|
- pr_err("build_path_dentry did not end path lookup where "
|
|
|
+ pr_err("build_path did not end path lookup where "
|
|
|
"expected, namelen is %d, pos is %d\n", len, pos);
|
|
|
/* presumably this is only possible if racing with a
|
|
|
rename of one of the parent directories (we can not
|
|
@@ -1303,7 +1370,7 @@ retry:
|
|
|
|
|
|
*base = ceph_ino(temp->d_inode);
|
|
|
*plen = len;
|
|
|
- dout("build_path_dentry on %p %d built %llx '%.*s'\n",
|
|
|
+ dout("build_path on %p %d built %llx '%.*s'\n",
|
|
|
dentry, atomic_read(&dentry->d_count), *base, len, path);
|
|
|
return path;
|
|
|
}
|
|
@@ -1426,9 +1493,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
|
|
|
if (req->r_old_dentry_drop)
|
|
|
len += req->r_old_dentry->d_name.len;
|
|
|
|
|
|
- msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
|
|
|
- if (IS_ERR(msg))
|
|
|
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
|
|
|
+ if (!msg) {
|
|
|
+ msg = ERR_PTR(-ENOMEM);
|
|
|
goto out_free2;
|
|
|
+ }
|
|
|
|
|
|
msg->hdr.tid = cpu_to_le64(req->r_tid);
|
|
|
|
|
@@ -1517,9 +1586,9 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
|
|
|
}
|
|
|
msg = create_request_message(mdsc, req, mds);
|
|
|
if (IS_ERR(msg)) {
|
|
|
- req->r_reply = ERR_PTR(PTR_ERR(msg));
|
|
|
+ req->r_err = PTR_ERR(msg);
|
|
|
complete_request(mdsc, req);
|
|
|
- return -PTR_ERR(msg);
|
|
|
+ return PTR_ERR(msg);
|
|
|
}
|
|
|
req->r_request = msg;
|
|
|
|
|
@@ -1552,7 +1621,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
|
|
|
int mds = -1;
|
|
|
int err = -EAGAIN;
|
|
|
|
|
|
- if (req->r_reply)
|
|
|
+ if (req->r_err || req->r_got_result)
|
|
|
goto out;
|
|
|
|
|
|
if (req->r_timeout &&
|
|
@@ -1609,7 +1678,7 @@ out:
|
|
|
return err;
|
|
|
|
|
|
finish:
|
|
|
- req->r_reply = ERR_PTR(err);
|
|
|
+ req->r_err = err;
|
|
|
complete_request(mdsc, req);
|
|
|
goto out;
|
|
|
}
|
|
@@ -1630,10 +1699,9 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
|
|
|
|
|
|
/*
|
|
|
* Wake up threads with requests pending for @mds, so that they can
|
|
|
- * resubmit their requests to a possibly different mds. If @all is set,
|
|
|
- * wake up if their requests has been forwarded to @mds, too.
|
|
|
+ * resubmit their requests to a possibly different mds.
|
|
|
*/
|
|
|
-static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
|
|
|
+static void kick_requests(struct ceph_mds_client *mdsc, int mds)
|
|
|
{
|
|
|
struct ceph_mds_request *req;
|
|
|
struct rb_node *p;
|
|
@@ -1689,63 +1757,77 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
|
|
|
__register_request(mdsc, req, dir);
|
|
|
__do_request(mdsc, req);
|
|
|
|
|
|
- /* wait */
|
|
|
- if (!req->r_reply) {
|
|
|
- mutex_unlock(&mdsc->mutex);
|
|
|
- if (req->r_timeout) {
|
|
|
- err = (long)wait_for_completion_interruptible_timeout(
|
|
|
- &req->r_completion, req->r_timeout);
|
|
|
- if (err == 0)
|
|
|
- req->r_reply = ERR_PTR(-EIO);
|
|
|
- else if (err < 0)
|
|
|
- req->r_reply = ERR_PTR(err);
|
|
|
- } else {
|
|
|
- err = wait_for_completion_interruptible(
|
|
|
- &req->r_completion);
|
|
|
- if (err)
|
|
|
- req->r_reply = ERR_PTR(err);
|
|
|
- }
|
|
|
- mutex_lock(&mdsc->mutex);
|
|
|
+ if (req->r_err) {
|
|
|
+ err = req->r_err;
|
|
|
+ __unregister_request(mdsc, req);
|
|
|
+ dout("do_request early error %d\n", err);
|
|
|
+ goto out;
|
|
|
}
|
|
|
|
|
|
- if (IS_ERR(req->r_reply)) {
|
|
|
- err = PTR_ERR(req->r_reply);
|
|
|
- req->r_reply = NULL;
|
|
|
+ /* wait */
|
|
|
+ mutex_unlock(&mdsc->mutex);
|
|
|
+ dout("do_request waiting\n");
|
|
|
+ if (req->r_timeout) {
|
|
|
+ err = (long)wait_for_completion_interruptible_timeout(
|
|
|
+ &req->r_completion, req->r_timeout);
|
|
|
+ if (err == 0)
|
|
|
+ err = -EIO;
|
|
|
+ } else {
|
|
|
+ err = wait_for_completion_interruptible(&req->r_completion);
|
|
|
+ }
|
|
|
+ dout("do_request waited, got %d\n", err);
|
|
|
+ mutex_lock(&mdsc->mutex);
|
|
|
|
|
|
- if (err == -ERESTARTSYS) {
|
|
|
- /* aborted */
|
|
|
- req->r_aborted = true;
|
|
|
+ /* only abort if we didn't race with a real reply */
|
|
|
+ if (req->r_got_result) {
|
|
|
+ err = le32_to_cpu(req->r_reply_info.head->result);
|
|
|
+ } else if (err < 0) {
|
|
|
+ dout("aborted request %lld with %d\n", req->r_tid, err);
|
|
|
|
|
|
- if (req->r_locked_dir &&
|
|
|
- (req->r_op & CEPH_MDS_OP_WRITE)) {
|
|
|
- struct ceph_inode_info *ci =
|
|
|
- ceph_inode(req->r_locked_dir);
|
|
|
+ /*
|
|
|
+ * ensure we aren't running concurrently with
|
|
|
+ * ceph_fill_trace or ceph_readdir_prepopulate, which
|
|
|
+ * rely on locks (dir mutex) held by our caller.
|
|
|
+ */
|
|
|
+ mutex_lock(&req->r_fill_mutex);
|
|
|
+ req->r_err = err;
|
|
|
+ req->r_aborted = true;
|
|
|
+ mutex_unlock(&req->r_fill_mutex);
|
|
|
|
|
|
- dout("aborted, clearing I_COMPLETE on %p\n",
|
|
|
- req->r_locked_dir);
|
|
|
- spin_lock(&req->r_locked_dir->i_lock);
|
|
|
- ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
|
|
|
- ci->i_release_count++;
|
|
|
- spin_unlock(&req->r_locked_dir->i_lock);
|
|
|
- }
|
|
|
- } else {
|
|
|
- /* clean up this request */
|
|
|
- __unregister_request(mdsc, req);
|
|
|
- if (!list_empty(&req->r_unsafe_item))
|
|
|
- list_del_init(&req->r_unsafe_item);
|
|
|
- complete(&req->r_safe_completion);
|
|
|
- }
|
|
|
- } else if (req->r_err) {
|
|
|
- err = req->r_err;
|
|
|
+ if (req->r_locked_dir &&
|
|
|
+ (req->r_op & CEPH_MDS_OP_WRITE))
|
|
|
+ ceph_invalidate_dir_request(req);
|
|
|
} else {
|
|
|
- err = le32_to_cpu(req->r_reply_info.head->result);
|
|
|
+ err = req->r_err;
|
|
|
}
|
|
|
- mutex_unlock(&mdsc->mutex);
|
|
|
|
|
|
+out:
|
|
|
+ mutex_unlock(&mdsc->mutex);
|
|
|
dout("do_request %p done, result %d\n", req, err);
|
|
|
return err;
|
|
|
}
|
|
|
|
|
|
+/*
|
|
|
+ * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
|
|
|
+ * namespace request.
|
|
|
+ */
|
|
|
+void ceph_invalidate_dir_request(struct ceph_mds_request *req)
|
|
|
+{
|
|
|
+ struct inode *inode = req->r_locked_dir;
|
|
|
+ struct ceph_inode_info *ci = ceph_inode(inode);
|
|
|
+
|
|
|
+ dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
|
|
|
+ spin_lock(&inode->i_lock);
|
|
|
+ ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
|
|
|
+ ci->i_release_count++;
|
|
|
+ spin_unlock(&inode->i_lock);
|
|
|
+
|
|
|
+ if (req->r_dentry)
|
|
|
+ ceph_invalidate_dentry_lease(req->r_dentry);
|
|
|
+ if (req->r_old_dentry)
|
|
|
+ ceph_invalidate_dentry_lease(req->r_old_dentry);
|
|
|
+}
|
|
|
+
|
|
|
/*
|
|
|
* Handle mds reply.
|
|
|
*
|
|
@@ -1797,6 +1879,12 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
|
|
|
mutex_unlock(&mdsc->mutex);
|
|
|
goto out;
|
|
|
}
|
|
|
+ if (req->r_got_safe && !head->safe) {
|
|
|
+ pr_warning("got unsafe after safe on %llu from mds%d\n",
|
|
|
+ tid, mds);
|
|
|
+ mutex_unlock(&mdsc->mutex);
|
|
|
+ goto out;
|
|
|
+ }
|
|
|
|
|
|
result = le32_to_cpu(head->result);
|
|
|
|
|
@@ -1838,11 +1926,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
|
|
|
mutex_unlock(&mdsc->mutex);
|
|
|
goto out;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- BUG_ON(req->r_reply);
|
|
|
-
|
|
|
- if (!head->safe) {
|
|
|
+ } else {
|
|
|
req->r_got_unsafe = true;
|
|
|
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
|
|
|
}
|
|
@@ -1871,21 +1955,30 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
|
|
|
}
|
|
|
|
|
|
/* insert trace into our cache */
|
|
|
+ mutex_lock(&req->r_fill_mutex);
|
|
|
err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
|
|
|
if (err == 0) {
|
|
|
if (result == 0 && rinfo->dir_nr)
|
|
|
ceph_readdir_prepopulate(req, req->r_session);
|
|
|
ceph_unreserve_caps(&req->r_caps_reservation);
|
|
|
}
|
|
|
+ mutex_unlock(&req->r_fill_mutex);
|
|
|
|
|
|
up_read(&mdsc->snap_rwsem);
|
|
|
out_err:
|
|
|
- if (err) {
|
|
|
- req->r_err = err;
|
|
|
+ mutex_lock(&mdsc->mutex);
|
|
|
+ if (!req->r_aborted) {
|
|
|
+ if (err) {
|
|
|
+ req->r_err = err;
|
|
|
+ } else {
|
|
|
+ req->r_reply = msg;
|
|
|
+ ceph_msg_get(msg);
|
|
|
+ req->r_got_result = true;
|
|
|
+ }
|
|
|
} else {
|
|
|
- req->r_reply = msg;
|
|
|
- ceph_msg_get(msg);
|
|
|
+ dout("reply arrived after request %lld was aborted\n", tid);
|
|
|
}
|
|
|
+ mutex_unlock(&mdsc->mutex);
|
|
|
|
|
|
add_cap_releases(mdsc, req->r_session, -1);
|
|
|
mutex_unlock(&session->s_mutex);
|
|
@@ -1984,6 +2077,8 @@ static void handle_session(struct ceph_mds_session *session,
|
|
|
|
|
|
switch (op) {
|
|
|
case CEPH_SESSION_OPEN:
|
|
|
+ if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
|
|
|
+ pr_info("mds%d reconnect success\n", session->s_mds);
|
|
|
session->s_state = CEPH_MDS_SESSION_OPEN;
|
|
|
renewed_caps(mdsc, session, 0);
|
|
|
wake = 1;
|
|
@@ -1997,10 +2092,12 @@ static void handle_session(struct ceph_mds_session *session,
|
|
|
break;
|
|
|
|
|
|
case CEPH_SESSION_CLOSE:
|
|
|
+ if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
|
|
|
+ pr_info("mds%d reconnect denied\n", session->s_mds);
|
|
|
remove_session_caps(session);
|
|
|
wake = 1; /* for good measure */
|
|
|
complete(&mdsc->session_close_waiters);
|
|
|
- kick_requests(mdsc, mds, 0); /* cur only */
|
|
|
+ kick_requests(mdsc, mds);
|
|
|
break;
|
|
|
|
|
|
case CEPH_SESSION_STALE:
|
|
@@ -2132,54 +2229,44 @@ out:
|
|
|
*
|
|
|
* called with mdsc->mutex held.
|
|
|
*/
|
|
|
-static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
|
|
|
+static void send_mds_reconnect(struct ceph_mds_client *mdsc,
|
|
|
+ struct ceph_mds_session *session)
|
|
|
{
|
|
|
- struct ceph_mds_session *session = NULL;
|
|
|
struct ceph_msg *reply;
|
|
|
struct rb_node *p;
|
|
|
+ int mds = session->s_mds;
|
|
|
int err = -ENOMEM;
|
|
|
struct ceph_pagelist *pagelist;
|
|
|
|
|
|
- pr_info("reconnect to recovering mds%d\n", mds);
|
|
|
+ pr_info("mds%d reconnect start\n", mds);
|
|
|
|
|
|
pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
|
|
|
if (!pagelist)
|
|
|
goto fail_nopagelist;
|
|
|
ceph_pagelist_init(pagelist);
|
|
|
|
|
|
- reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
|
|
|
- if (IS_ERR(reply)) {
|
|
|
- err = PTR_ERR(reply);
|
|
|
+ reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
|
|
|
+ if (!reply)
|
|
|
goto fail_nomsg;
|
|
|
- }
|
|
|
-
|
|
|
- /* find session */
|
|
|
- session = __ceph_lookup_mds_session(mdsc, mds);
|
|
|
- mutex_unlock(&mdsc->mutex); /* drop lock for duration */
|
|
|
|
|
|
- if (session) {
|
|
|
- mutex_lock(&session->s_mutex);
|
|
|
+ mutex_lock(&session->s_mutex);
|
|
|
+ session->s_state = CEPH_MDS_SESSION_RECONNECTING;
|
|
|
+ session->s_seq = 0;
|
|
|
|
|
|
- session->s_state = CEPH_MDS_SESSION_RECONNECTING;
|
|
|
- session->s_seq = 0;
|
|
|
+ ceph_con_open(&session->s_con,
|
|
|
+ ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
|
|
|
|
|
|
- ceph_con_open(&session->s_con,
|
|
|
- ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
|
|
|
-
|
|
|
- /* replay unsafe requests */
|
|
|
- replay_unsafe_requests(mdsc, session);
|
|
|
- } else {
|
|
|
- dout("no session for mds%d, will send short reconnect\n",
|
|
|
- mds);
|
|
|
- }
|
|
|
+ /* replay unsafe requests */
|
|
|
+ replay_unsafe_requests(mdsc, session);
|
|
|
|
|
|
down_read(&mdsc->snap_rwsem);
|
|
|
|
|
|
- if (!session)
|
|
|
- goto send;
|
|
|
dout("session %p state %s\n", session,
|
|
|
session_state_name(session->s_state));
|
|
|
|
|
|
+ /* drop old cap expires; we're about to reestablish that state */
|
|
|
+ discard_cap_releases(mdsc, session);
|
|
|
+
|
|
|
/* traverse this session's caps */
|
|
|
err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
|
|
|
if (err)
|
|
@@ -2208,36 +2295,29 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
|
|
|
goto fail;
|
|
|
}
|
|
|
|
|
|
-send:
|
|
|
reply->pagelist = pagelist;
|
|
|
reply->hdr.data_len = cpu_to_le32(pagelist->length);
|
|
|
reply->nr_pages = calc_pages_for(0, pagelist->length);
|
|
|
ceph_con_send(&session->s_con, reply);
|
|
|
|
|
|
- session->s_state = CEPH_MDS_SESSION_OPEN;
|
|
|
mutex_unlock(&session->s_mutex);
|
|
|
|
|
|
mutex_lock(&mdsc->mutex);
|
|
|
__wake_requests(mdsc, &session->s_waiting);
|
|
|
mutex_unlock(&mdsc->mutex);
|
|
|
|
|
|
- ceph_put_mds_session(session);
|
|
|
-
|
|
|
up_read(&mdsc->snap_rwsem);
|
|
|
- mutex_lock(&mdsc->mutex);
|
|
|
return;
|
|
|
|
|
|
fail:
|
|
|
ceph_msg_put(reply);
|
|
|
up_read(&mdsc->snap_rwsem);
|
|
|
mutex_unlock(&session->s_mutex);
|
|
|
- ceph_put_mds_session(session);
|
|
|
fail_nomsg:
|
|
|
ceph_pagelist_release(pagelist);
|
|
|
kfree(pagelist);
|
|
|
fail_nopagelist:
|
|
|
pr_err("error %d preparing reconnect for mds%d\n", err, mds);
|
|
|
- mutex_lock(&mdsc->mutex);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -2290,7 +2370,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
|
|
|
}
|
|
|
|
|
|
/* kick any requests waiting on the recovering mds */
|
|
|
- kick_requests(mdsc, i, 1);
|
|
|
+ kick_requests(mdsc, i);
|
|
|
} else if (oldstate == newstate) {
|
|
|
continue; /* nothing new with this mds */
|
|
|
}
|
|
@@ -2299,22 +2379,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
|
|
|
* send reconnect?
|
|
|
*/
|
|
|
if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
|
|
|
- newstate >= CEPH_MDS_STATE_RECONNECT)
|
|
|
- send_mds_reconnect(mdsc, i);
|
|
|
+ newstate >= CEPH_MDS_STATE_RECONNECT) {
|
|
|
+ mutex_unlock(&mdsc->mutex);
|
|
|
+ send_mds_reconnect(mdsc, s);
|
|
|
+ mutex_lock(&mdsc->mutex);
|
|
|
+ }
|
|
|
|
|
|
/*
|
|
|
- * kick requests on any mds that has gone active.
|
|
|
- *
|
|
|
- * kick requests on cur or forwarder: we may have sent
|
|
|
- * the request to mds1, mds1 told us it forwarded it
|
|
|
- * to mds2, but then we learn mds1 failed and can't be
|
|
|
- * sure it successfully forwarded our request before
|
|
|
- * it died.
|
|
|
+ * kick request on any mds that has gone active.
|
|
|
*/
|
|
|
if (oldstate < CEPH_MDS_STATE_ACTIVE &&
|
|
|
newstate >= CEPH_MDS_STATE_ACTIVE) {
|
|
|
- pr_info("mds%d reconnect completed\n", s->s_mds);
|
|
|
- kick_requests(mdsc, i, 1);
|
|
|
+ if (oldstate != CEPH_MDS_STATE_CREATING &&
|
|
|
+ oldstate != CEPH_MDS_STATE_STARTING)
|
|
|
+ pr_info("mds%d recovery completed\n", s->s_mds);
|
|
|
+ kick_requests(mdsc, i);
|
|
|
ceph_kick_flushing_caps(mdsc, s);
|
|
|
wake_up_session_caps(s, 1);
|
|
|
}
|
|
@@ -2457,8 +2536,8 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
|
|
|
dnamelen = dentry->d_name.len;
|
|
|
len += dnamelen;
|
|
|
|
|
|
- msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
|
|
|
- if (IS_ERR(msg))
|
|
|
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
|
|
|
+ if (!msg)
|
|
|
return;
|
|
|
lease = msg->front.iov_base;
|
|
|
lease->action = action;
|
|
@@ -2603,7 +2682,9 @@ static void delayed_work(struct work_struct *work)
|
|
|
else
|
|
|
ceph_con_keepalive(&s->s_con);
|
|
|
add_cap_releases(mdsc, s, -1);
|
|
|
- send_cap_releases(mdsc, s);
|
|
|
+ if (s->s_state == CEPH_MDS_SESSION_OPEN ||
|
|
|
+ s->s_state == CEPH_MDS_SESSION_HUNG)
|
|
|
+ send_cap_releases(mdsc, s);
|
|
|
mutex_unlock(&s->s_mutex);
|
|
|
ceph_put_mds_session(s);
|
|
|
|
|
@@ -2620,6 +2701,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
|
|
|
mdsc->client = client;
|
|
|
mutex_init(&mdsc->mutex);
|
|
|
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
|
|
|
+ if (mdsc->mdsmap == NULL)
|
|
|
+ return -ENOMEM;
|
|
|
+
|
|
|
init_completion(&mdsc->safe_umount_waiters);
|
|
|
init_completion(&mdsc->session_close_waiters);
|
|
|
INIT_LIST_HEAD(&mdsc->waiting_for_map);
|
|
@@ -2645,6 +2729,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
|
|
|
init_waitqueue_head(&mdsc->cap_flushing_wq);
|
|
|
spin_lock_init(&mdsc->dentry_lru_lock);
|
|
|
INIT_LIST_HEAD(&mdsc->dentry_lru);
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -2740,6 +2825,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
|
|
|
{
|
|
|
u64 want_tid, want_flush;
|
|
|
|
|
|
+ if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
|
|
|
+ return;
|
|
|
+
|
|
|
dout("sync\n");
|
|
|
mutex_lock(&mdsc->mutex);
|
|
|
want_tid = mdsc->last_tid;
|
|
@@ -2922,9 +3010,10 @@ static void con_put(struct ceph_connection *con)
|
|
|
static void peer_reset(struct ceph_connection *con)
|
|
|
{
|
|
|
struct ceph_mds_session *s = con->private;
|
|
|
+ struct ceph_mds_client *mdsc = s->s_mdsc;
|
|
|
|
|
|
- pr_err("mds%d gave us the boot. IMPLEMENT RECONNECT.\n",
|
|
|
- s->s_mds);
|
|
|
+ pr_warning("mds%d closed our session\n", s->s_mds);
|
|
|
+ send_mds_reconnect(mdsc, s);
|
|
|
}
|
|
|
|
|
|
static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
|
|
@@ -3031,7 +3120,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
|
|
|
return ceph_monc_validate_auth(&mdsc->client->monc);
|
|
|
}
|
|
|
|
|
|
-const static struct ceph_connection_operations mds_con_ops = {
|
|
|
+static const struct ceph_connection_operations mds_con_ops = {
|
|
|
.get = con_get,
|
|
|
.put = con_put,
|
|
|
.dispatch = dispatch,
|