Browse Source

Merge branch 'testing' of github.com:ceph/ceph-client into into linux-3.8-ceph

Alex Elder 12 years ago
parent
commit
4c7a08c83a

File diff suppressed because it is too large
+ 741 - 389
drivers/block/rbd.c


+ 3 - 4
fs/ceph/addr.c

@@ -315,7 +315,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
 				    NULL, 0,
 				    ci->i_truncate_seq, ci->i_truncate_size,
-				    NULL, false, 1, 0);
+				    NULL, false, 0);
 	if (IS_ERR(req))
 		return PTR_ERR(req);
 
@@ -492,8 +492,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 				   &ci->i_layout, snapc,
 				   page_off, len,
 				   ci->i_truncate_seq, ci->i_truncate_size,
-				   &inode->i_mtime,
-				   &page, 1, 0, 0, true);
+				   &inode->i_mtime, &page, 1);
 	if (err < 0) {
 		dout("writepage setting page/mapping error %d %p\n", err, page);
 		SetPageError(page);
@@ -838,7 +837,7 @@ get_more_pages:
 					    snapc, do_sync,
 					    ci->i_truncate_seq,
 					    ci->i_truncate_size,
-					    &inode->i_mtime, true, 1, 0);
+					    &inode->i_mtime, true, 0);
 
 				if (IS_ERR(req)) {
 					rc = PTR_ERR(req);

+ 25 - 7
fs/ceph/caps.c

@@ -611,8 +611,16 @@ retry:
 
 	if (flags & CEPH_CAP_FLAG_AUTH)
 		ci->i_auth_cap = cap;
-	else if (ci->i_auth_cap == cap)
+	else if (ci->i_auth_cap == cap) {
 		ci->i_auth_cap = NULL;
+		spin_lock(&mdsc->cap_dirty_lock);
+		if (!list_empty(&ci->i_dirty_item)) {
+			dout(" moving %p to cap_dirty_migrating\n", inode);
+			list_move(&ci->i_dirty_item,
+				  &mdsc->cap_dirty_migrating);
+		}
+		spin_unlock(&mdsc->cap_dirty_lock);
+	}
 
 	dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
 	     inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
@@ -1460,7 +1468,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_cap *cap;
-	int file_wanted, used;
+	int file_wanted, used, cap_used;
 	int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
 	int issued, implemented, want, retain, revoking, flushing = 0;
 	int mds = -1;   /* keep track of how far we've gone through i_caps list
@@ -1563,9 +1571,14 @@ retry_locked:
 
 		/* NOTE: no side-effects allowed, until we take s_mutex */
 
+		cap_used = used;
+		if (ci->i_auth_cap && cap != ci->i_auth_cap)
+			cap_used &= ~ci->i_auth_cap->issued;
+
 		revoking = cap->implemented & ~cap->issued;
-		dout(" mds%d cap %p issued %s implemented %s revoking %s\n",
+		dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
 		     cap->mds, cap, ceph_cap_string(cap->issued),
+		     ceph_cap_string(cap_used),
 		     ceph_cap_string(cap->implemented),
 		     ceph_cap_string(revoking));
 
@@ -1593,7 +1606,7 @@ retry_locked:
 		}
 
 		/* completed revocation? going down and there are no caps? */
-		if (revoking && (revoking & used) == 0) {
+		if (revoking && (revoking & cap_used) == 0) {
 			dout("completed revocation of %s\n",
 			     ceph_cap_string(cap->implemented & ~cap->issued));
 			goto ack;
@@ -1670,8 +1683,8 @@ ack:
 		sent++;
 
 		/* __send_cap drops i_ceph_lock */
-		delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want,
-				      retain, flushing, NULL);
+		delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
+				      want, retain, flushing, NULL);
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
 	}
 
@@ -2416,7 +2429,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
 		dout("mds wanted %s -> %s\n",
 		     ceph_cap_string(le32_to_cpu(grant->wanted)),
 		     ceph_cap_string(wanted));
-		grant->wanted = cpu_to_le32(wanted);
+		/* imported cap may not have correct mds_wanted */
+		if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
+			check_caps = 1;
 	}
 
 	cap->seq = seq;
@@ -2820,6 +2835,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
 	     (unsigned)seq);
 
+	if (op == CEPH_CAP_OP_IMPORT)
+		ceph_add_cap_releases(mdsc, session);
+
 	/* lookup ino */
 	inode = ceph_find_inode(sb, vino);
 	ci = ceph_inode(inode);

+ 7 - 1
fs/ceph/file.c

@@ -243,6 +243,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 	err = ceph_mdsc_do_request(mdsc,
 				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
 				   req);
+	if (err)
+		goto out_err;
+
 	err = ceph_handle_snapdir(req, dentry, err);
 	if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
 		err = ceph_handle_notrace_create(dir, dentry);
@@ -263,6 +266,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 		err = finish_no_open(file, dn);
 	} else {
 		dout("atomic_open finish_open on dn %p\n", dn);
+		if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
+			*opened |= FILE_CREATED;
+		}
 		err = finish_open(file, dentry, ceph_open, opened);
 	}
 
@@ -535,7 +541,7 @@ more:
 				    ci->i_snap_realm->cached_context,
 				    do_sync,
 				    ci->i_truncate_seq, ci->i_truncate_size,
-				    &mtime, false, 2, page_align);
+				    &mtime, false, page_align);
 	if (IS_ERR(req))
 		return PTR_ERR(req);
 

+ 1 - 1
fs/ceph/ioctl.c

@@ -194,7 +194,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
 		return -EFAULT;
 
 	down_read(&osdc->map_sem);
-	r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len,
+	r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
 					  &dl.object_no, &dl.object_offset,
 					  &olen);
 	if (r < 0)

+ 31 - 2
fs/ceph/mds_client.c

@@ -232,6 +232,30 @@ bad:
 	return -EIO;
 }
 
+/*
+ * parse create results
+ */
+static int parse_reply_info_create(void **p, void *end,
+				  struct ceph_mds_reply_info_parsed *info,
+				  int features)
+{
+	if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
+		if (*p == end) {
+			info->has_create_ino = false;
+		} else {
+			info->has_create_ino = true;
+			info->ino = ceph_decode_64(p);
+		}
+	}
+
+	if (unlikely(*p != end))
+		goto bad;
+	return 0;
+
+bad:
+	return -EIO;
+}
+
 /*
  * parse extra results
  */
@@ -241,8 +265,12 @@ static int parse_reply_info_extra(void **p, void *end,
 {
 	if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
 		return parse_reply_info_filelock(p, end, info, features);
-	else
+	else if (info->head->op == CEPH_MDS_OP_READDIR)
 		return parse_reply_info_dir(p, end, info, features);
+	else if (info->head->op == CEPH_MDS_OP_CREATE)
+		return parse_reply_info_create(p, end, info, features);
+	else
+		return -EIO;
 }
 
 /*
@@ -2170,7 +2198,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 	mutex_lock(&req->r_fill_mutex);
 	err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
 	if (err == 0) {
-		if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
+		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
+				    req->r_op == CEPH_MDS_OP_LSSNAP) &&
 		    rinfo->dir_nr)
 			ceph_readdir_prepopulate(req, req->r_session);
 		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);

+ 6 - 0
fs/ceph/mds_client.h

@@ -74,6 +74,12 @@ struct ceph_mds_reply_info_parsed {
 			struct ceph_mds_reply_info_in *dir_in;
 			u8                            dir_complete, dir_end;
 		};
+
+		/* for create results */
+		struct {
+			bool has_create_ino;
+			u64 ino;
+		};
 	};
 
 	/* encoded blob describing snapshot contexts for certain

+ 4 - 0
fs/ceph/strings.c

@@ -15,6 +15,7 @@ const char *ceph_mds_state_name(int s)
 	case CEPH_MDS_STATE_BOOT:       return "up:boot";
 	case CEPH_MDS_STATE_STANDBY:    return "up:standby";
 	case CEPH_MDS_STATE_STANDBY_REPLAY:    return "up:standby-replay";
+	case CEPH_MDS_STATE_REPLAYONCE: return "up:oneshot-replay";
 	case CEPH_MDS_STATE_CREATING:   return "up:creating";
 	case CEPH_MDS_STATE_STARTING:   return "up:starting";
 		/* up and in */
@@ -50,10 +51,13 @@ const char *ceph_mds_op_name(int op)
 	case CEPH_MDS_OP_LOOKUP:  return "lookup";
 	case CEPH_MDS_OP_LOOKUPHASH:  return "lookuphash";
 	case CEPH_MDS_OP_LOOKUPPARENT:  return "lookupparent";
+	case CEPH_MDS_OP_LOOKUPINO:  return "lookupino";
 	case CEPH_MDS_OP_GETATTR:  return "getattr";
 	case CEPH_MDS_OP_SETXATTR: return "setxattr";
 	case CEPH_MDS_OP_SETATTR: return "setattr";
 	case CEPH_MDS_OP_RMXATTR: return "rmxattr";
+	case CEPH_MDS_OP_SETLAYOUT: return "setlayou";
+	case CEPH_MDS_OP_SETDIRLAYOUT: return "setdirlayout";
 	case CEPH_MDS_OP_READDIR: return "readdir";
 	case CEPH_MDS_OP_MKNOD: return "mknod";
 	case CEPH_MDS_OP_LINK: return "link";

+ 1 - 7
fs/ceph/super.h

@@ -798,13 +798,7 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
 /* file.c */
 extern const struct file_operations ceph_file_fops;
 extern const struct address_space_operations ceph_aops;
-extern int ceph_copy_to_page_vector(struct page **pages,
-				    const char *data,
-				    loff_t off, size_t len);
-extern int ceph_copy_from_page_vector(struct page **pages,
-				    char *data,
-				    loff_t off, size_t len);
-extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
+
 extern int ceph_open(struct inode *inode, struct file *file);
 extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 			    struct file *file, unsigned flags, umode_t mode,

+ 163 - 47
fs/ceph/xattr.c

@@ -29,9 +29,94 @@ struct ceph_vxattr {
 	size_t name_size;	/* strlen(name) + 1 (for '\0') */
 	size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
 			      size_t size);
-	bool readonly;
+	bool readonly, hidden;
+	bool (*exists_cb)(struct ceph_inode_info *ci);
 };
 
+/* layouts */
+
+static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
+{
+	size_t s;
+	char *p = (char *)&ci->i_layout;
+
+	for (s = 0; s < sizeof(ci->i_layout); s++, p++)
+		if (*p)
+			return true;
+	return false;
+}
+
+static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
+					size_t size)
+{
+	int ret;
+	struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+	struct ceph_osd_client *osdc = &fsc->client->osdc;
+	s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
+	const char *pool_name;
+
+	dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
+	down_read(&osdc->map_sem);
+	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
+	if (pool_name)
+		ret = snprintf(val, size,
+		"stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%s",
+		(unsigned long long)ceph_file_layout_su(ci->i_layout),
+		(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
+	        (unsigned long long)ceph_file_layout_object_size(ci->i_layout),
+		pool_name);
+	else
+		ret = snprintf(val, size,
+		"stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld",
+		(unsigned long long)ceph_file_layout_su(ci->i_layout),
+		(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
+	        (unsigned long long)ceph_file_layout_object_size(ci->i_layout),
+		(unsigned long long)pool);
+
+	up_read(&osdc->map_sem);
+	return ret;
+}
+
+static size_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci,
+					       char *val, size_t size)
+{
+	return snprintf(val, size, "%lld",
+			(unsigned long long)ceph_file_layout_su(ci->i_layout));
+}
+
+static size_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci,
+						char *val, size_t size)
+{
+	return snprintf(val, size, "%lld",
+	       (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout));
+}
+
+static size_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci,
+					       char *val, size_t size)
+{
+	return snprintf(val, size, "%lld",
+	       (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
+}
+
+static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
+					char *val, size_t size)
+{
+	int ret;
+	struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+	struct ceph_osd_client *osdc = &fsc->client->osdc;
+	s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
+	const char *pool_name;
+
+	down_read(&osdc->map_sem);
+	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
+	if (pool_name)
+		ret = snprintf(val, size, "%s", pool_name);
+	else
+		ret = snprintf(val, size, "%lld", (unsigned long long)pool);
+	up_read(&osdc->map_sem);
+	return ret;
+}
+
 /* directories */
 
 static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
@@ -83,17 +168,43 @@ static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val,
 			(long)ci->i_rctime.tv_nsec);
 }
 
-#define CEPH_XATTR_NAME(_type, _name)	XATTR_CEPH_PREFIX #_type "." #_name
 
-#define XATTR_NAME_CEPH(_type, _name) \
-		{ \
-			.name = CEPH_XATTR_NAME(_type, _name), \
-			.name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \
-			.getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
-			.readonly = true, \
-		}
+#define CEPH_XATTR_NAME(_type, _name)	XATTR_CEPH_PREFIX #_type "." #_name
+#define CEPH_XATTR_NAME2(_type, _name, _name2)	\
+	XATTR_CEPH_PREFIX #_type "." #_name "." #_name2
+
+#define XATTR_NAME_CEPH(_type, _name)					\
+	{								\
+		.name = CEPH_XATTR_NAME(_type, _name),			\
+		.name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \
+		.getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
+		.readonly = true,				\
+		.hidden = false,				\
+		.exists_cb = NULL,			\
+	}
+#define XATTR_LAYOUT_FIELD(_type, _name, _field)			\
+	{								\
+		.name = CEPH_XATTR_NAME2(_type, _name, _field),	\
+		.name_size = sizeof (CEPH_XATTR_NAME2(_type, _name, _field)), \
+		.getxattr_cb = ceph_vxattrcb_ ## _name ## _ ## _field, \
+		.readonly = false,				\
+		.hidden = true,			\
+		.exists_cb = ceph_vxattrcb_layout_exists,	\
+	}
 
 static struct ceph_vxattr ceph_dir_vxattrs[] = {
+	{
+		.name = "ceph.dir.layout",
+		.name_size = sizeof("ceph.dir.layout"),
+		.getxattr_cb = ceph_vxattrcb_layout,
+		.readonly = false,
+		.hidden = false,
+		.exists_cb = ceph_vxattrcb_layout_exists,
+	},
+	XATTR_LAYOUT_FIELD(dir, layout, stripe_unit),
+	XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
+	XATTR_LAYOUT_FIELD(dir, layout, object_size),
+	XATTR_LAYOUT_FIELD(dir, layout, pool),
 	XATTR_NAME_CEPH(dir, entries),
 	XATTR_NAME_CEPH(dir, files),
 	XATTR_NAME_CEPH(dir, subdirs),
@@ -108,28 +219,19 @@ static size_t ceph_dir_vxattrs_name_size;	/* total size of all names */
 
 /* files */
 
-static size_t ceph_vxattrcb_file_layout(struct ceph_inode_info *ci, char *val,
-				   size_t size)
-{
-	int ret;
-
-	ret = snprintf(val, size,
-		"chunk_bytes=%lld\nstripe_count=%lld\nobject_size=%lld\n",
-		(unsigned long long)ceph_file_layout_su(ci->i_layout),
-		(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
-		(unsigned long long)ceph_file_layout_object_size(ci->i_layout));
-	return ret;
-}
-
 static struct ceph_vxattr ceph_file_vxattrs[] = {
-	XATTR_NAME_CEPH(file, layout),
-	/* The following extended attribute name is deprecated */
 	{
-		.name = XATTR_CEPH_PREFIX "layout",
-		.name_size = sizeof (XATTR_CEPH_PREFIX "layout"),
-		.getxattr_cb = ceph_vxattrcb_file_layout,
-		.readonly = true,
+		.name = "ceph.file.layout",
+		.name_size = sizeof("ceph.file.layout"),
+		.getxattr_cb = ceph_vxattrcb_layout,
+		.readonly = false,
+		.hidden = false,
+		.exists_cb = ceph_vxattrcb_layout_exists,
 	},
+	XATTR_LAYOUT_FIELD(file, layout, stripe_unit),
+	XATTR_LAYOUT_FIELD(file, layout, stripe_count),
+	XATTR_LAYOUT_FIELD(file, layout, object_size),
+	XATTR_LAYOUT_FIELD(file, layout, pool),
 	{ 0 }	/* Required table terminator */
 };
 static size_t ceph_file_vxattrs_name_size;	/* total size of all names */
@@ -164,7 +266,8 @@ static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs)
 	size_t size = 0;
 
 	for (vxattr = vxattrs; vxattr->name; vxattr++)
-		size += vxattr->name_size;
+		if (!vxattr->hidden)
+			size += vxattr->name_size;
 
 	return size;
 }
@@ -572,13 +675,17 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
 	if (!ceph_is_valid_xattr(name))
 		return -ENODATA;
 
-	/* let's see if a virtual xattr was requested */
-	vxattr = ceph_match_vxattr(inode, name);
-
 	spin_lock(&ci->i_ceph_lock);
 	dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
 	     ci->i_xattrs.version, ci->i_xattrs.index_version);
 
+	/* let's see if a virtual xattr was requested */
+	vxattr = ceph_match_vxattr(inode, name);
+	if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
+		err = vxattr->getxattr_cb(ci, value, size);
+		goto out;
+	}
+
 	if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
 	    (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
 		goto get_xattr;
@@ -592,11 +699,6 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
 
 	spin_lock(&ci->i_ceph_lock);
 
-	if (vxattr && vxattr->readonly) {
-		err = vxattr->getxattr_cb(ci, value, size);
-		goto out;
-	}
-
 	err = __build_xattrs(inode);
 	if (err < 0)
 		goto out;
@@ -604,11 +706,8 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
 get_xattr:
 	err = -ENODATA;  /* == ENOATTR */
 	xattr = __get_xattr(ci, name);
-	if (!xattr) {
-		if (vxattr)
-			err = vxattr->getxattr_cb(ci, value, size);
+	if (!xattr)
 		goto out;
-	}
 
 	err = -ERANGE;
 	if (size && size < xattr->val_len)
@@ -664,23 +763,30 @@ list_xattr:
 	vir_namelen = ceph_vxattrs_name_size(vxattrs);
 
 	/* adding 1 byte per each variable due to the null termination */
-	namelen = vir_namelen + ci->i_xattrs.names_size + ci->i_xattrs.count;
+	namelen = ci->i_xattrs.names_size + ci->i_xattrs.count;
 	err = -ERANGE;
-	if (size && namelen > size)
+	if (size && vir_namelen + namelen > size)
 		goto out;
 
-	err = namelen;
+	err = namelen + vir_namelen;
 	if (size == 0)
 		goto out;
 
 	names = __copy_xattr_names(ci, names);
 
 	/* virtual xattr names, too */
-	if (vxattrs)
+	err = namelen;
+	if (vxattrs) {
 		for (i = 0; vxattrs[i].name; i++) {
-			len = sprintf(names, "%s", vxattrs[i].name);
-			names += len + 1;
+			if (!vxattrs[i].hidden &&
+			    !(vxattrs[i].exists_cb &&
+			      !vxattrs[i].exists_cb(ci))) {
+				len = sprintf(names, "%s", vxattrs[i].name);
+				names += len + 1;
+				err += len + 1;
+			}
 		}
+	}
 
 out:
 	spin_unlock(&ci->i_ceph_lock);
@@ -782,6 +888,10 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
 	if (vxattr && vxattr->readonly)
 		return -EOPNOTSUPP;
 
+	/* pass any unhandled ceph.* xattrs through to the MDS */
+	if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN))
+		goto do_sync_unlocked;
+
 	/* preallocate memory for xattr name, value, index node */
 	err = -ENOMEM;
 	newname = kmemdup(name, name_len + 1, GFP_NOFS);
@@ -838,6 +948,7 @@ retry:
 
 do_sync:
 	spin_unlock(&ci->i_ceph_lock);
+do_sync_unlocked:
 	err = ceph_sync_setxattr(dentry, name, value, size, flags);
 out:
 	kfree(newname);
@@ -892,6 +1003,10 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
 	if (vxattr && vxattr->readonly)
 		return -EOPNOTSUPP;
 
+	/* pass any unhandled ceph.* xattrs through to the MDS */
+	if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN))
+		goto do_sync_unlocked;
+
 	err = -ENOMEM;
 	spin_lock(&ci->i_ceph_lock);
 retry:
@@ -931,6 +1046,7 @@ retry:
 	return err;
 do_sync:
 	spin_unlock(&ci->i_ceph_lock);
+do_sync_unlocked:
 	err = ceph_send_removexattr(dentry, name);
 out:
 	return err;

+ 7 - 1
include/linux/ceph/ceph_features.h

@@ -14,13 +14,19 @@
 #define CEPH_FEATURE_DIRLAYOUTHASH  (1<<7)
 /* bits 8-17 defined by user-space; not supported yet here */
 #define CEPH_FEATURE_CRUSH_TUNABLES (1<<18)
+/* bits 19-24 defined by user-space; not supported yet here */
+#define CEPH_FEATURE_CRUSH_TUNABLES2 (1<<25)
+/* bit 26 defined by user-space; not supported yet here */
+#define CEPH_FEATURE_REPLY_CREATE_INODE (1<<27)
 
 /*
  * Features supported.
  */
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  \
 	(CEPH_FEATURE_NOSRCADDR |	 \
-	 CEPH_FEATURE_CRUSH_TUNABLES)
+	 CEPH_FEATURE_CRUSH_TUNABLES |	  \
+	 CEPH_FEATURE_CRUSH_TUNABLES2 |   \
+	 CEPH_FEATURE_REPLY_CREATE_INODE)
 
 #define CEPH_FEATURES_REQUIRED_DEFAULT   \
 	(CEPH_FEATURE_NOSRCADDR)

+ 24 - 8
include/linux/ceph/ceph_fs.h

@@ -21,16 +21,14 @@
  * internal cluster protocols separately from the public,
  * client-facing protocol.
  */
-#define CEPH_OSD_PROTOCOL     8 /* cluster internal */
-#define CEPH_MDS_PROTOCOL    12 /* cluster internal */
-#define CEPH_MON_PROTOCOL     5 /* cluster internal */
 #define CEPH_OSDC_PROTOCOL   24 /* server/client */
 #define CEPH_MDSC_PROTOCOL   32 /* server/client */
 #define CEPH_MONC_PROTOCOL   15 /* server/client */
 
 
-#define CEPH_INO_ROOT  1
-#define CEPH_INO_CEPH  2        /* hidden .ceph dir */
+#define CEPH_INO_ROOT   1
+#define CEPH_INO_CEPH   2       /* hidden .ceph dir */
+#define CEPH_INO_DOTDOT 3	/* used by ceph fuse for parent (..) */
 
 /* arbitrary limit on max # of monitors (cluster of 3 is typical) */
 #define CEPH_MAX_MON   31
@@ -51,7 +49,7 @@ struct ceph_file_layout {
 	__le32 fl_object_stripe_unit;  /* UNUSED.  for per-object parity, if any */
 
 	/* object -> pg layout */
-	__le32 fl_unused;       /* unused; used to be preferred primary (-1) */
+	__le32 fl_unused;       /* unused; used to be preferred primary for pg (-1 for none) */
 	__le32 fl_pg_pool;      /* namespace, crush ruleset, rep level */
 } __attribute__ ((packed));
 
@@ -101,6 +99,8 @@ struct ceph_dir_layout {
 #define CEPH_MSG_MON_SUBSCRIBE_ACK      16
 #define CEPH_MSG_AUTH			17
 #define CEPH_MSG_AUTH_REPLY		18
+#define CEPH_MSG_MON_GET_VERSION        19
+#define CEPH_MSG_MON_GET_VERSION_REPLY  20
 
 /* client <-> mds */
 #define CEPH_MSG_MDS_MAP                21
@@ -220,6 +220,11 @@ struct ceph_mon_subscribe_ack {
 	struct ceph_fsid fsid;
 } __attribute__ ((packed));
 
+/*
+ * mdsmap flags
+ */
+#define CEPH_MDSMAP_DOWN    (1<<0)  /* cluster deliberately down */
+
 /*
  * mds states
  *   > 0 -> in
@@ -233,6 +238,7 @@ struct ceph_mon_subscribe_ack {
 #define CEPH_MDS_STATE_CREATING    -6  /* up, creating MDS instance. */
 #define CEPH_MDS_STATE_STARTING    -7  /* up, starting previously stopped mds */
 #define CEPH_MDS_STATE_STANDBY_REPLAY -8 /* up, tailing active node's journal */
+#define CEPH_MDS_STATE_REPLAYONCE   -9 /* up, replaying an active node's journal */
 
 #define CEPH_MDS_STATE_REPLAY       8  /* up, replaying journal. */
 #define CEPH_MDS_STATE_RESOLVE      9  /* up, disambiguating distributed
@@ -264,6 +270,7 @@ extern const char *ceph_mds_state_name(int s);
 #define CEPH_LOCK_IXATTR      2048
 #define CEPH_LOCK_IFLOCK      4096  /* advisory file locks */
 #define CEPH_LOCK_INO         8192  /* immutable inode bits; not a lock */
+#define CEPH_LOCK_IPOLICY     16384 /* policy lock on dirs. MDS internal */
 
 /* client_session ops */
 enum {
@@ -338,6 +345,12 @@ extern const char *ceph_mds_op_name(int op);
 #define CEPH_SETATTR_SIZE  32
 #define CEPH_SETATTR_CTIME 64
 
+/*
+ * Ceph setxattr request flags.
+ */
+#define CEPH_XATTR_CREATE  1
+#define CEPH_XATTR_REPLACE 2
+
 union ceph_mds_request_args {
 	struct {
 		__le32 mask;                 /* CEPH_CAP_* */
@@ -522,14 +535,17 @@ int ceph_flags_to_mode(int flags);
 #define CEPH_CAP_GWREXTEND  64  /* (file) client can extend EOF */
 #define CEPH_CAP_GLAZYIO   128  /* (file) client can perform lazy io */
 
+#define CEPH_CAP_SIMPLE_BITS  2
+#define CEPH_CAP_FILE_BITS    8
+
 /* per-lock shift */
 #define CEPH_CAP_SAUTH      2
 #define CEPH_CAP_SLINK      4
 #define CEPH_CAP_SXATTR     6
 #define CEPH_CAP_SFILE      8
-#define CEPH_CAP_SFLOCK    20 
+#define CEPH_CAP_SFLOCK    20
 
-#define CEPH_CAP_BITS       22
+#define CEPH_CAP_BITS      22
 
 /* composed values */
 #define CEPH_CAP_AUTH_SHARED  (CEPH_CAP_GSHARED  << CEPH_CAP_SAUTH)

+ 17 - 12
include/linux/ceph/decode.h

@@ -52,10 +52,10 @@ static inline int ceph_has_room(void **p, void *end, size_t n)
 	return end >= *p && n <= end - *p;
 }
 
-#define ceph_decode_need(p, end, n, bad)		\
-	do {						\
-		if (!likely(ceph_has_room(p, end, n)))	\
-			goto bad;			\
+#define ceph_decode_need(p, end, n, bad)			\
+	do {							\
+		if (!likely(ceph_has_room(p, end, n)))		\
+			goto bad;				\
 	} while (0)
 
 #define ceph_decode_64_safe(p, end, v, bad)			\
@@ -99,8 +99,8 @@ static inline int ceph_has_room(void **p, void *end, size_t n)
  *
  * There are two possible failures:
  *   - converting the string would require accessing memory at or
- *     beyond the "end" pointer provided (-E
- *   - memory could not be allocated for the result
+ *     beyond the "end" pointer provided (-ERANGE)
+ *   - memory could not be allocated for the result (-ENOMEM)
  */
 static inline char *ceph_extract_encoded_string(void **p, void *end,
 						size_t *lenp, gfp_t gfp)
@@ -217,10 +217,10 @@ static inline void ceph_encode_string(void **p, void *end,
 	*p += len;
 }
 
-#define ceph_encode_need(p, end, n, bad)		\
-	do {						\
-		if (!likely(ceph_has_room(p, end, n)))	\
-			goto bad;			\
+#define ceph_encode_need(p, end, n, bad)			\
+	do {							\
+		if (!likely(ceph_has_room(p, end, n)))		\
+			goto bad;				\
 	} while (0)
 
 #define ceph_encode_64_safe(p, end, v, bad)			\
@@ -231,12 +231,17 @@ static inline void ceph_encode_string(void **p, void *end,
 #define ceph_encode_32_safe(p, end, v, bad)			\
 	do {							\
 		ceph_encode_need(p, end, sizeof(u32), bad);	\
-		ceph_encode_32(p, v);			\
+		ceph_encode_32(p, v);				\
 	} while (0)
 #define ceph_encode_16_safe(p, end, v, bad)			\
 	do {							\
 		ceph_encode_need(p, end, sizeof(u16), bad);	\
-		ceph_encode_16(p, v);			\
+		ceph_encode_16(p, v);				\
+	} while (0)
+#define ceph_encode_8_safe(p, end, v, bad)			\
+	do {							\
+		ceph_encode_need(p, end, sizeof(u8), bad);	\
+		ceph_encode_8(p, v);				\
 	} while (0)
 
 #define ceph_encode_copy_safe(p, end, pv, n, bad)		\

+ 9 - 7
include/linux/ceph/libceph.h

@@ -193,6 +193,8 @@ static inline int calc_pages_for(u64 off, u64 len)
 }
 
 /* ceph_common.c */
+extern bool libceph_compatible(void *data);
+
 extern const char *ceph_msg_type_name(int type);
 extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
 extern struct kmem_cache *ceph_inode_cachep;
@@ -220,7 +222,7 @@ extern int ceph_open_session(struct ceph_client *client);
 /* pagevec.c */
 extern void ceph_release_page_vector(struct page **pages, int num_pages);
 
-extern struct page **ceph_get_direct_page_vector(const char __user *data,
+extern struct page **ceph_get_direct_page_vector(const void __user *data,
 						 int num_pages,
 						 bool write_page);
 extern void ceph_put_page_vector(struct page **pages, int num_pages,
@@ -228,15 +230,15 @@ extern void ceph_put_page_vector(struct page **pages, int num_pages,
 extern void ceph_release_page_vector(struct page **pages, int num_pages);
 extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
 extern int ceph_copy_user_to_page_vector(struct page **pages,
-					 const char __user *data,
+					 const void __user *data,
 					 loff_t off, size_t len);
-extern int ceph_copy_to_page_vector(struct page **pages,
-				    const char *data,
+extern void ceph_copy_to_page_vector(struct page **pages,
+				    const void *data,
 				    loff_t off, size_t len);
-extern int ceph_copy_from_page_vector(struct page **pages,
-				    char *data,
+extern void ceph_copy_from_page_vector(struct page **pages,
+				    void *data,
 				    loff_t off, size_t len);
-extern int ceph_copy_page_vector_to_user(struct page **pages, char __user *data,
+extern int ceph_copy_page_vector_to_user(struct page **pages, void __user *data,
 				    loff_t off, size_t len);
 extern void ceph_zero_page_vector_range(int off, int len, struct page **pages);
 

+ 2 - 0
include/linux/ceph/messenger.h

@@ -83,9 +83,11 @@ struct ceph_msg {
 	struct list_head list_head;
 
 	struct kref kref;
+#ifdef CONFIG_BLOCK
 	struct bio  *bio;		/* instead of pages/pagelist */
 	struct bio  *bio_iter;		/* bio iterator */
 	int bio_seg;			/* current bio segment */
+#endif /* CONFIG_BLOCK */
 	struct ceph_pagelist *trail;	/* the trailing part of the data */
 	bool front_is_vmalloc;
 	bool more_to_follow;

+ 19 - 35
include/linux/ceph/osd_client.h

@@ -10,6 +10,7 @@
 #include <linux/ceph/osdmap.h>
 #include <linux/ceph/messenger.h>
 #include <linux/ceph/auth.h>
+#include <linux/ceph/pagelist.h>
 
 /* 
  * Maximum object name size 
@@ -22,7 +23,6 @@ struct ceph_snap_context;
 struct ceph_osd_request;
 struct ceph_osd_client;
 struct ceph_authorizer;
-struct ceph_pagelist;
 
 /*
  * completion callback for async writepages
@@ -95,7 +95,7 @@ struct ceph_osd_request {
 	struct bio       *r_bio;	      /* instead of pages */
 #endif
 
-	struct ceph_pagelist *r_trail;	      /* trailing part of the data */
+	struct ceph_pagelist r_trail;	      /* trailing part of the data */
 };
 
 struct ceph_osd_event {
@@ -107,7 +107,6 @@ struct ceph_osd_event {
 	struct rb_node node;
 	struct list_head osd_node;
 	struct kref kref;
-	struct completion completion;
 };
 
 struct ceph_osd_event_work {
@@ -157,7 +156,7 @@ struct ceph_osd_client {
 
 struct ceph_osd_req_op {
 	u16 op;           /* CEPH_OSD_OP_* */
-	u32 flags;        /* CEPH_OSD_FLAG_* */
+	u32 payload_len;
 	union {
 		struct {
 			u64 offset, length;
@@ -166,23 +165,24 @@ struct ceph_osd_req_op {
 		} extent;
 		struct {
 			const char *name;
-			u32 name_len;
 			const char  *val;
+			u32 name_len;
 			u32 value_len;
 			__u8 cmp_op;       /* CEPH_OSD_CMPXATTR_OP_* */
 			__u8 cmp_mode;     /* CEPH_OSD_CMPXATTR_MODE_* */
 		} xattr;
 		struct {
 			const char *class_name;
-			__u8 class_len;
 			const char *method_name;
-			__u8 method_len;
-			__u8 argc;
 			const char *indata;
 			u32 indata_len;
+			__u8 class_len;
+			__u8 method_len;
+			__u8 argc;
 		} cls;
 		struct {
-			u64 cookie, count;
+			u64 cookie;
+			u64 count;
 		} pgls;
 	        struct {
 		        u64 snapid;
@@ -190,12 +190,11 @@ struct ceph_osd_req_op {
 		struct {
 			u64 cookie;
 			u64 ver;
-			__u8 flag;
 			u32 prot_ver;
 			u32 timeout;
+			__u8 flag;
 		} watch;
 	};
-	u32 payload_len;
 };
 
 extern int ceph_osdc_init(struct ceph_osd_client *osdc,
@@ -207,29 +206,19 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
 				 struct ceph_msg *msg);
 
-extern int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
-			struct ceph_file_layout *layout,
-			u64 snapid,
-			u64 off, u64 *plen, u64 *bno,
-			struct ceph_osd_request *req,
-			struct ceph_osd_req_op *op);
-
 extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
-					       int flags,
 					       struct ceph_snap_context *snapc,
-					       struct ceph_osd_req_op *ops,
+					       unsigned int num_op,
 					       bool use_mempool,
-					       gfp_t gfp_flags,
-					       struct page **pages,
-					       struct bio *bio);
+					       gfp_t gfp_flags);
 
 extern void ceph_osdc_build_request(struct ceph_osd_request *req,
-				    u64 off, u64 *plen,
+				    u64 off, u64 len,
+				    unsigned int num_op,
 				    struct ceph_osd_req_op *src_ops,
 				    struct ceph_snap_context *snapc,
-				    struct timespec *mtime,
-				    const char *oid,
-				    int oid_len);
+				    u64 snap_id,
+				    struct timespec *mtime);
 
 extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 				      struct ceph_file_layout *layout,
@@ -239,8 +228,7 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 				      int do_sync, u32 truncate_seq,
 				      u64 truncate_size,
 				      struct timespec *mtime,
-				      bool use_mempool, int num_reply,
-				      int page_align);
+				      bool use_mempool, int page_align);
 
 extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 					 struct ceph_osd_request *req);
@@ -279,17 +267,13 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 				u64 off, u64 len,
 				u32 truncate_seq, u64 truncate_size,
 				struct timespec *mtime,
-				struct page **pages, int nr_pages,
-				int flags, int do_sync, bool nofail);
+				struct page **pages, int nr_pages);
 
 /* watch/notify events */
 extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
 				  void (*event_cb)(u64, u64, u8, void *),
-				  int one_shot, void *data,
-				  struct ceph_osd_event **pevent);
+				  void *data, struct ceph_osd_event **pevent);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
-extern int ceph_osdc_wait_event(struct ceph_osd_event *event,
-				unsigned long timeout);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
 #endif
 

+ 1 - 1
include/linux/ceph/osdmap.h

@@ -110,7 +110,7 @@ extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
 
 /* calculate mapping of a file extent to an object */
 extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
-					 u64 off, u64 *plen,
+					 u64 off, u64 len,
 					 u64 *bno, u64 *oxoff, u64 *oxlen);
 
 /* calculate mapping of object to a placement group */

+ 73 - 20
include/linux/ceph/rados.h

@@ -145,8 +145,12 @@ struct ceph_eversion {
  */
 
 /* status bits */
-#define CEPH_OSD_EXISTS 1
-#define CEPH_OSD_UP     2
+#define CEPH_OSD_EXISTS  (1<<0)
+#define CEPH_OSD_UP      (1<<1)
+#define CEPH_OSD_AUTOOUT (1<<2)  /* osd was automatically marked out */
+#define CEPH_OSD_NEW     (1<<3)  /* osd is new, never marked in */
+
+extern const char *ceph_osd_state_name(int s);
 
 /* osd weights.  fixed point value: 0x10000 == 1.0 ("in"), 0 == "out" */
 #define CEPH_OSD_IN  0x10000
@@ -161,9 +165,25 @@ struct ceph_eversion {
 #define CEPH_OSDMAP_PAUSERD  (1<<2)  /* pause all reads */
 #define CEPH_OSDMAP_PAUSEWR  (1<<3)  /* pause all writes */
 #define CEPH_OSDMAP_PAUSEREC (1<<4)  /* pause recovery */
+#define CEPH_OSDMAP_NOUP     (1<<5)  /* block osd boot */
+#define CEPH_OSDMAP_NODOWN   (1<<6)  /* block osd mark-down/failure */
+#define CEPH_OSDMAP_NOOUT    (1<<7)  /* block osd auto mark-out */
+#define CEPH_OSDMAP_NOIN     (1<<8)  /* block osd auto mark-in */
+#define CEPH_OSDMAP_NOBACKFILL (1<<9) /* block osd backfill */
+#define CEPH_OSDMAP_NORECOVER (1<<10) /* block osd recovery and backfill */
+
+/*
+ * The error code to return when an OSD can't handle a write
+ * because it is too large.
+ */
+#define OSD_WRITETOOBIG EMSGSIZE
 
 /*
  * osd ops
+ *
+ * WARNING: do not use these op codes directly.  Use the helpers
+ * defined below instead.  In certain cases, op code behavior was
+ * redefined, resulting in special-cases in the helpers.
  */
 #define CEPH_OSD_OP_MODE       0xf000
 #define CEPH_OSD_OP_MODE_RD    0x1000
@@ -177,6 +197,7 @@ struct ceph_eversion {
 #define CEPH_OSD_OP_TYPE_ATTR  0x0300
 #define CEPH_OSD_OP_TYPE_EXEC  0x0400
 #define CEPH_OSD_OP_TYPE_PG    0x0500
+#define CEPH_OSD_OP_TYPE_MULTI 0x0600 /* multiobject */
 
 enum {
 	/** data **/
@@ -217,6 +238,23 @@ enum {
 
 	CEPH_OSD_OP_WATCH   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 15,
 
+	/* omap */
+	CEPH_OSD_OP_OMAPGETKEYS   = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 17,
+	CEPH_OSD_OP_OMAPGETVALS   = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 18,
+	CEPH_OSD_OP_OMAPGETHEADER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 19,
+	CEPH_OSD_OP_OMAPGETVALSBYKEYS  =
+	  CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 20,
+	CEPH_OSD_OP_OMAPSETVALS   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 21,
+	CEPH_OSD_OP_OMAPSETHEADER = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 22,
+	CEPH_OSD_OP_OMAPCLEAR     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 23,
+	CEPH_OSD_OP_OMAPRMKEYS    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24,
+	CEPH_OSD_OP_OMAP_CMP      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25,
+
+	/** multi **/
+	CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1,
+	CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2,
+	CEPH_OSD_OP_SRC_CMPXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 3,
+
 	/** attrs **/
 	/* read */
 	CEPH_OSD_OP_GETXATTR  = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
@@ -238,6 +276,7 @@ enum {
 	CEPH_OSD_OP_SCRUB_RESERVE   = CEPH_OSD_OP_MODE_SUB | 6,
 	CEPH_OSD_OP_SCRUB_UNRESERVE = CEPH_OSD_OP_MODE_SUB | 7,
 	CEPH_OSD_OP_SCRUB_STOP      = CEPH_OSD_OP_MODE_SUB | 8,
+	CEPH_OSD_OP_SCRUB_MAP     = CEPH_OSD_OP_MODE_SUB | 9,
 
 	/** lock **/
 	CEPH_OSD_OP_WRLOCK    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1,
@@ -248,10 +287,12 @@ enum {
 	CEPH_OSD_OP_DNLOCK    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
 
 	/** exec **/
+	/* note: the RD bit here is wrong; see special-case below in helper */
 	CEPH_OSD_OP_CALL    = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_EXEC | 1,
 
 	/** pg **/
 	CEPH_OSD_OP_PGLS      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 1,
+	CEPH_OSD_OP_PGLS_FILTER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 2,
 };
 
 static inline int ceph_osd_op_type_lock(int op)
@@ -274,6 +315,10 @@ static inline int ceph_osd_op_type_pg(int op)
 {
 	return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_PG;
 }
+static inline int ceph_osd_op_type_multi(int op)
+{
+	return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_MULTI;
+}
 
 static inline int ceph_osd_op_mode_subop(int op)
 {
@@ -281,11 +326,12 @@ static inline int ceph_osd_op_mode_subop(int op)
 }
 static inline int ceph_osd_op_mode_read(int op)
 {
-	return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_RD;
+	return (op & CEPH_OSD_OP_MODE_RD) &&
+		op != CEPH_OSD_OP_CALL;
 }
 static inline int ceph_osd_op_mode_modify(int op)
 {
-	return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_WR;
+	return op & CEPH_OSD_OP_MODE_WR;
 }
 
 /*
@@ -294,34 +340,38 @@ static inline int ceph_osd_op_mode_modify(int op)
  */
 #define CEPH_OSD_TMAP_HDR 'h'
 #define CEPH_OSD_TMAP_SET 's'
+#define CEPH_OSD_TMAP_CREATE 'c' /* create key */
 #define CEPH_OSD_TMAP_RM  'r'
+#define CEPH_OSD_TMAP_RMSLOPPY 'R'
 
 extern const char *ceph_osd_op_name(int op);
 
-
 /*
  * osd op flags
  *
  * An op may be READ, WRITE, or READ|WRITE.
  */
 enum {
-	CEPH_OSD_FLAG_ACK = 1,          /* want (or is) "ack" ack */
-	CEPH_OSD_FLAG_ONNVRAM = 2,      /* want (or is) "onnvram" ack */
-	CEPH_OSD_FLAG_ONDISK = 4,       /* want (or is) "ondisk" ack */
-	CEPH_OSD_FLAG_RETRY = 8,        /* resend attempt */
-	CEPH_OSD_FLAG_READ = 16,        /* op may read */
-	CEPH_OSD_FLAG_WRITE = 32,       /* op may write */
-	CEPH_OSD_FLAG_ORDERSNAP = 64,   /* EOLDSNAP if snapc is out of order */
-	CEPH_OSD_FLAG_PEERSTAT = 128,   /* msg includes osd_peer_stat */
-	CEPH_OSD_FLAG_BALANCE_READS = 256,
-	CEPH_OSD_FLAG_PARALLELEXEC = 512, /* execute op in parallel */
-	CEPH_OSD_FLAG_PGOP = 1024,      /* pg op, no object */
-	CEPH_OSD_FLAG_EXEC = 2048,      /* op may exec */
-	CEPH_OSD_FLAG_EXEC_PUBLIC = 4096, /* op may exec (public) */
+	CEPH_OSD_FLAG_ACK =            0x0001,  /* want (or is) "ack" ack */
+	CEPH_OSD_FLAG_ONNVRAM =        0x0002,  /* want (or is) "onnvram" ack */
+	CEPH_OSD_FLAG_ONDISK =         0x0004,  /* want (or is) "ondisk" ack */
+	CEPH_OSD_FLAG_RETRY =          0x0008,  /* resend attempt */
+	CEPH_OSD_FLAG_READ =           0x0010,  /* op may read */
+	CEPH_OSD_FLAG_WRITE =          0x0020,  /* op may write */
+	CEPH_OSD_FLAG_ORDERSNAP =      0x0040,  /* EOLDSNAP if snapc is out of order */
+	CEPH_OSD_FLAG_PEERSTAT_OLD =   0x0080,  /* DEPRECATED msg includes osd_peer_stat */
+	CEPH_OSD_FLAG_BALANCE_READS =  0x0100,
+	CEPH_OSD_FLAG_PARALLELEXEC =   0x0200,  /* execute op in parallel */
+	CEPH_OSD_FLAG_PGOP =           0x0400,  /* pg op, no object */
+	CEPH_OSD_FLAG_EXEC =           0x0800,  /* op may exec */
+	CEPH_OSD_FLAG_EXEC_PUBLIC =    0x1000,  /* DEPRECATED op may exec (public) */
+	CEPH_OSD_FLAG_LOCALIZE_READS = 0x2000,  /* read from nearby replica, if any */
+	CEPH_OSD_FLAG_RWORDERED =      0x4000,  /* order wrt concurrent reads */
 };
 
 enum {
 	CEPH_OSD_OP_FLAG_EXCL = 1,      /* EXCL object create */
+	CEPH_OSD_OP_FLAG_FAILOK = 2,    /* continue despite failure */
 };
 
 #define EOLDSNAPC    ERESTART  /* ORDERSNAP flag set; writer has old snapc*/
@@ -381,7 +431,11 @@ struct ceph_osd_op {
 			__le64 ver;
 			__u8 flag;	/* 0 = unwatch, 1 = watch */
 		} __attribute__ ((packed)) watch;
-};
+		struct {
+			__le64 offset, length;
+			__le64 src_offset;
+		} __attribute__ ((packed)) clonerange;
+	};
 	__le32 payload_len;
 } __attribute__ ((packed));
 
@@ -424,5 +478,4 @@ struct ceph_osd_reply_head {
 } __attribute__ ((packed));
 
 
-
 #endif

+ 2 - 0
include/linux/crush/crush.h

@@ -162,6 +162,8 @@ struct crush_map {
 	__u32 choose_local_fallback_tries;
 	/* choose attempts before giving up */ 
 	__u32 choose_total_tries;
+	/* attempt chooseleaf inner descent once; on failure retry outer descent */
+	__u32 chooseleaf_descend_once;
 };
 
 

+ 16 - 0
net/ceph/ceph_common.c

@@ -26,6 +26,22 @@
 #include "crypto.h"
 
 
+/*
+ * Module compatibility interface.  For now it doesn't do anything,
+ * but its existence signals a certain level of functionality.
+ *
+ * The data buffer is used to pass information both to and from
+ * libceph.  The return value indicates whether libceph determines
+ * it is compatible with the caller (from another kernel module),
+ * given the provided data.
+ *
+ * The data pointer can be null.
+ */
+bool libceph_compatible(void *data)
+{
+	return true;
+}
+EXPORT_SYMBOL(libceph_compatible);
 
 /*
  * find filename portion of a path (/foo/bar/baz -> baz)

+ 39 - 0
net/ceph/ceph_strings.c

@@ -21,9 +21,15 @@ const char *ceph_osd_op_name(int op)
 	switch (op) {
 	case CEPH_OSD_OP_READ: return "read";
 	case CEPH_OSD_OP_STAT: return "stat";
+	case CEPH_OSD_OP_MAPEXT: return "mapext";
+	case CEPH_OSD_OP_SPARSE_READ: return "sparse-read";
+	case CEPH_OSD_OP_NOTIFY: return "notify";
+	case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
+	case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
 
 	case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
 
+	case CEPH_OSD_OP_CREATE: return "create";
 	case CEPH_OSD_OP_WRITE: return "write";
 	case CEPH_OSD_OP_DELETE: return "delete";
 	case CEPH_OSD_OP_TRUNCATE: return "truncate";
@@ -39,6 +45,11 @@ const char *ceph_osd_op_name(int op)
 	case CEPH_OSD_OP_TMAPUP: return "tmapup";
 	case CEPH_OSD_OP_TMAPGET: return "tmapget";
 	case CEPH_OSD_OP_TMAPPUT: return "tmapput";
+	case CEPH_OSD_OP_WATCH: return "watch";
+
+	case CEPH_OSD_OP_CLONERANGE: return "clonerange";
+	case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
+	case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr";
 
 	case CEPH_OSD_OP_GETXATTR: return "getxattr";
 	case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
@@ -53,6 +64,10 @@ const char *ceph_osd_op_name(int op)
 	case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
 	case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
 	case CEPH_OSD_OP_SCRUB: return "scrub";
+	case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve";
+	case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve";
+	case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop";
+	case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map";
 
 	case CEPH_OSD_OP_WRLOCK: return "wrlock";
 	case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
@@ -64,10 +79,34 @@ const char *ceph_osd_op_name(int op)
 	case CEPH_OSD_OP_CALL: return "call";
 
 	case CEPH_OSD_OP_PGLS: return "pgls";
+	case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
+	case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
+	case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
+	case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
+	case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys";
+	case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals";
+	case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header";
+	case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear";
+	case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys";
 	}
 	return "???";
 }
 
+const char *ceph_osd_state_name(int s)
+{
+	switch (s) {
+	case CEPH_OSD_EXISTS:
+		return "exists";
+	case CEPH_OSD_UP:
+		return "up";
+	case CEPH_OSD_AUTOOUT:
+		return "autoout";
+	case CEPH_OSD_NEW:
+		return "new";
+	default:
+		return "???";
+	}
+}
 
 const char *ceph_pool_op_name(int op)
 {

+ 11 - 4
net/ceph/crush/mapper.c

@@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
  * @outpos: our position in that vector
  * @firstn: true if choosing "first n" items, false if choosing "indep"
  * @recurse_to_leaf: true if we want one device under each item of given type
+ * @descend_once: true if we should only try one descent before giving up
  * @out2: second output vector for leaf items (if @recurse_to_leaf)
  */
 static int crush_choose(const struct crush_map *map,
@@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map,
 			int x, int numrep, int type,
 			int *out, int outpos,
 			int firstn, int recurse_to_leaf,
-			int *out2)
+			int descend_once, int *out2)
 {
 	int rep;
 	unsigned int ftotal, flocal;
@@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map,
 				}
 
 				reject = 0;
-				if (recurse_to_leaf) {
+				if (!collide && recurse_to_leaf) {
 					if (item < 0) {
 						if (crush_choose(map,
 							 map->buckets[-1-item],
@@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map,
 							 x, outpos+1, 0,
 							 out2, outpos,
 							 firstn, 0,
+							 map->chooseleaf_descend_once,
 							 NULL) <= outpos)
 							/* didn't get leaf */
 							reject = 1;
@@ -422,7 +424,10 @@ reject:
 					ftotal++;
 					flocal++;
 
-					if (collide && flocal <= map->choose_local_tries)
+					if (reject && descend_once)
+						/* let outer call try again */
+						skip_rep = 1;
+					else if (collide && flocal <= map->choose_local_tries)
 						/* retry locally a few times */
 						retry_bucket = 1;
 					else if (map->choose_local_fallback_tries > 0 &&
@@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map,
 	int i, j;
 	int numrep;
 	int firstn;
+	const int descend_once = 0;
 
 	if ((__u32)ruleno >= map->max_rules) {
 		dprintk(" bad ruleno %d\n", ruleno);
@@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map,
 						      curstep->arg2,
 						      o+osize, j,
 						      firstn,
-						      recurse_to_leaf, c+osize);
+						      recurse_to_leaf,
+						      descend_once, c+osize);
 			}
 
 			if (recurse_to_leaf)

+ 4 - 1
net/ceph/messenger.c

@@ -9,8 +9,9 @@
 #include <linux/slab.h>
 #include <linux/socket.h>
 #include <linux/string.h>
+#ifdef	CONFIG_BLOCK
 #include <linux/bio.h>
-#include <linux/blkdev.h>
+#endif	/* CONFIG_BLOCK */
 #include <linux/dns_resolver.h>
 #include <net/tcp.h>
 
@@ -2651,9 +2652,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 	m->page_alignment = 0;
 	m->pages = NULL;
 	m->pagelist = NULL;
+#ifdef	CONFIG_BLOCK
 	m->bio = NULL;
 	m->bio_iter = NULL;
 	m->bio_seg = 0;
+#endif	/* CONFIG_BLOCK */
 	m->trail = NULL;
 
 	/* front */

+ 173 - 245
net/ceph/osd_client.c

@@ -23,7 +23,7 @@
 
 static const struct ceph_connection_operations osd_con_ops;
 
-static void send_queued(struct ceph_osd_client *osdc);
+static void __send_queued(struct ceph_osd_client *osdc);
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 static void __register_request(struct ceph_osd_client *osdc,
 			       struct ceph_osd_request *req);
@@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 static void __send_request(struct ceph_osd_client *osdc,
 			   struct ceph_osd_request *req);
 
-static int op_needs_trail(int op)
-{
-	switch (op) {
-	case CEPH_OSD_OP_GETXATTR:
-	case CEPH_OSD_OP_SETXATTR:
-	case CEPH_OSD_OP_CMPXATTR:
-	case CEPH_OSD_OP_CALL:
-	case CEPH_OSD_OP_NOTIFY:
-		return 1;
-	default:
-		return 0;
-	}
-}
-
 static int op_has_extent(int op)
 {
 	return (op == CEPH_OSD_OP_READ ||
 		op == CEPH_OSD_OP_WRITE);
 }
 
-int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
-			struct ceph_file_layout *layout,
-			u64 snapid,
-			u64 off, u64 *plen, u64 *bno,
-			struct ceph_osd_request *req,
-			struct ceph_osd_req_op *op)
-{
-	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
-	u64 orig_len = *plen;
-	u64 objoff, objlen;    /* extent in object */
-	int r;
-
-	reqhead->snapid = cpu_to_le64(snapid);
-
-	/* object extent? */
-	r = ceph_calc_file_object_mapping(layout, off, plen, bno,
-					  &objoff, &objlen);
-	if (r < 0)
-		return r;
-	if (*plen < orig_len)
-		dout(" skipping last %llu, final file extent %llu~%llu\n",
-		     orig_len - *plen, off, *plen);
-
-	if (op_has_extent(op->op)) {
-		op->extent.offset = objoff;
-		op->extent.length = objlen;
-	}
-	req->r_num_pages = calc_pages_for(off, *plen);
-	req->r_page_alignment = off & ~PAGE_MASK;
-	if (op->op == CEPH_OSD_OP_WRITE)
-		op->payload_len = *plen;
-
-	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
-	     *bno, objoff, objlen, req->r_num_pages);
-	return 0;
-}
-EXPORT_SYMBOL(ceph_calc_raw_layout);
-
 /*
  * Implement client access to distributed object storage cluster.
  *
@@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
  *
  * fill osd op in request message.
  */
-static int calc_layout(struct ceph_osd_client *osdc,
-		       struct ceph_vino vino,
+static int calc_layout(struct ceph_vino vino,
 		       struct ceph_file_layout *layout,
 		       u64 off, u64 *plen,
 		       struct ceph_osd_request *req,
 		       struct ceph_osd_req_op *op)
 {
-	u64 bno;
+	u64 orig_len = *plen;
+	u64 bno = 0;
+	u64 objoff = 0;
+	u64 objlen = 0;
 	int r;
 
-	r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
-				 plen, &bno, req, op);
+	/* object extent? */
+	r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno,
+					  &objoff, &objlen);
 	if (r < 0)
 		return r;
+	if (objlen < orig_len) {
+		*plen = objlen;
+		dout(" skipping last %llu, final file extent %llu~%llu\n",
+		     orig_len - *plen, off, *plen);
+	}
+
+	if (op_has_extent(op->op)) {
+		u32 osize = le32_to_cpu(layout->fl_object_size);
+		op->extent.offset = objoff;
+		op->extent.length = objlen;
+		if (op->extent.truncate_size <= off - objoff) {
+			op->extent.truncate_size = 0;
+		} else {
+			op->extent.truncate_size -= off - objoff;
+			if (op->extent.truncate_size > osize)
+				op->extent.truncate_size = osize;
+		}
+	}
+	req->r_num_pages = calc_pages_for(off, *plen);
+	req->r_page_alignment = off & ~PAGE_MASK;
+	if (op->op == CEPH_OSD_OP_WRITE)
+		op->payload_len = *plen;
+
+	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
+	     bno, objoff, objlen, req->r_num_pages);
 
 	snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
 	req->r_oid_len = strlen(req->r_oid);
@@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref)
 	if (req->r_request)
 		ceph_msg_put(req->r_request);
 	if (req->r_con_filling_msg) {
-		dout("%s revoking pages %p from con %p\n", __func__,
-		     req->r_pages, req->r_con_filling_msg);
+		dout("%s revoking msg %p from con %p\n", __func__,
+		     req->r_reply, req->r_con_filling_msg);
 		ceph_msg_revoke_incoming(req->r_reply);
 		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
+		req->r_con_filling_msg = NULL;
 	}
 	if (req->r_reply)
 		ceph_msg_put(req->r_reply);
 	if (req->r_own_pages)
 		ceph_release_page_vector(req->r_pages,
 					 req->r_num_pages);
-#ifdef CONFIG_BLOCK
-	if (req->r_bio)
-		bio_put(req->r_bio);
-#endif
 	ceph_put_snap_context(req->r_snapc);
-	if (req->r_trail) {
-		ceph_pagelist_release(req->r_trail);
-		kfree(req->r_trail);
-	}
+	ceph_pagelist_release(&req->r_trail);
 	if (req->r_mempool)
 		mempool_free(req, req->r_osdc->req_mempool);
 	else
@@ -174,34 +144,14 @@ void ceph_osdc_release_request(struct kref *kref)
 }
 EXPORT_SYMBOL(ceph_osdc_release_request);
 
-static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
-{
-	int i = 0;
-
-	if (needs_trail)
-		*needs_trail = 0;
-	while (ops[i].op) {
-		if (needs_trail && op_needs_trail(ops[i].op))
-			*needs_trail = 1;
-		i++;
-	}
-
-	return i;
-}
-
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
-					       int flags,
 					       struct ceph_snap_context *snapc,
-					       struct ceph_osd_req_op *ops,
+					       unsigned int num_op,
 					       bool use_mempool,
-					       gfp_t gfp_flags,
-					       struct page **pages,
-					       struct bio *bio)
+					       gfp_t gfp_flags)
 {
 	struct ceph_osd_request *req;
 	struct ceph_msg *msg;
-	int needs_trail;
-	int num_op = get_num_ops(ops, &needs_trail);
 	size_t msg_size = sizeof(struct ceph_osd_request_head);
 
 	msg_size += num_op*sizeof(struct ceph_osd_op);
@@ -228,10 +178,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	INIT_LIST_HEAD(&req->r_req_lru_item);
 	INIT_LIST_HEAD(&req->r_osd_item);
 
-	req->r_flags = flags;
-
-	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
-
 	/* create reply message */
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,15 +190,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	}
 	req->r_reply = msg;
 
-	/* allocate space for the trailing data */
-	if (needs_trail) {
-		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
-		if (!req->r_trail) {
-			ceph_osdc_put_request(req);
-			return NULL;
-		}
-		ceph_pagelist_init(req->r_trail);
-	}
+	ceph_pagelist_init(&req->r_trail);
 
 	/* create request message; allow space for oid */
 	msg_size += MAX_OBJ_NAME_SIZE;
@@ -270,13 +208,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	memset(msg->front.iov_base, 0, msg->front.iov_len);
 
 	req->r_request = msg;
-	req->r_pages = pages;
-#ifdef CONFIG_BLOCK
-	if (bio) {
-		req->r_bio = bio;
-		bio_get(req->r_bio);
-	}
-#endif
 
 	return req;
 }
@@ -289,6 +220,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
 	dst->op = cpu_to_le16(src->op);
 
 	switch (src->op) {
+	case CEPH_OSD_OP_STAT:
+		break;
 	case CEPH_OSD_OP_READ:
 	case CEPH_OSD_OP_WRITE:
 		dst->extent.offset =
@@ -300,52 +233,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
 		dst->extent.truncate_seq =
 			cpu_to_le32(src->extent.truncate_seq);
 		break;
-
-	case CEPH_OSD_OP_GETXATTR:
-	case CEPH_OSD_OP_SETXATTR:
-	case CEPH_OSD_OP_CMPXATTR:
-		BUG_ON(!req->r_trail);
-
-		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
-		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
-		dst->xattr.cmp_op = src->xattr.cmp_op;
-		dst->xattr.cmp_mode = src->xattr.cmp_mode;
-		ceph_pagelist_append(req->r_trail, src->xattr.name,
-				     src->xattr.name_len);
-		ceph_pagelist_append(req->r_trail, src->xattr.val,
-				     src->xattr.value_len);
-		break;
 	case CEPH_OSD_OP_CALL:
-		BUG_ON(!req->r_trail);
-
 		dst->cls.class_len = src->cls.class_len;
 		dst->cls.method_len = src->cls.method_len;
 		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 
-		ceph_pagelist_append(req->r_trail, src->cls.class_name,
+		ceph_pagelist_append(&req->r_trail, src->cls.class_name,
 				     src->cls.class_len);
-		ceph_pagelist_append(req->r_trail, src->cls.method_name,
+		ceph_pagelist_append(&req->r_trail, src->cls.method_name,
 				     src->cls.method_len);
-		ceph_pagelist_append(req->r_trail, src->cls.indata,
+		ceph_pagelist_append(&req->r_trail, src->cls.indata,
 				     src->cls.indata_len);
 		break;
-	case CEPH_OSD_OP_ROLLBACK:
-		dst->snap.snapid = cpu_to_le64(src->snap.snapid);
-		break;
 	case CEPH_OSD_OP_STARTSYNC:
 		break;
-	case CEPH_OSD_OP_NOTIFY:
-		{
-			__le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
-			__le32 timeout = cpu_to_le32(src->watch.timeout);
-
-			BUG_ON(!req->r_trail);
-
-			ceph_pagelist_append(req->r_trail,
-						&prot_ver, sizeof(prot_ver));
-			ceph_pagelist_append(req->r_trail,
-						&timeout, sizeof(timeout));
-		}
 	case CEPH_OSD_OP_NOTIFY_ACK:
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
@@ -356,6 +257,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
 		pr_err("unrecognized osd opcode %d\n", dst->op);
 		WARN_ON(1);
 		break;
+	case CEPH_OSD_OP_MAPEXT:
+	case CEPH_OSD_OP_MASKTRUNC:
+	case CEPH_OSD_OP_SPARSE_READ:
+	case CEPH_OSD_OP_NOTIFY:
+	case CEPH_OSD_OP_ASSERT_VER:
+	case CEPH_OSD_OP_WRITEFULL:
+	case CEPH_OSD_OP_TRUNCATE:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_DELETE:
+	case CEPH_OSD_OP_APPEND:
+	case CEPH_OSD_OP_SETTRUNC:
+	case CEPH_OSD_OP_TRIMTRUNC:
+	case CEPH_OSD_OP_TMAPUP:
+	case CEPH_OSD_OP_TMAPPUT:
+	case CEPH_OSD_OP_TMAPGET:
+	case CEPH_OSD_OP_CREATE:
+	case CEPH_OSD_OP_ROLLBACK:
+	case CEPH_OSD_OP_OMAPGETKEYS:
+	case CEPH_OSD_OP_OMAPGETVALS:
+	case CEPH_OSD_OP_OMAPGETHEADER:
+	case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
+	case CEPH_OSD_OP_MODE_RD:
+	case CEPH_OSD_OP_OMAPSETVALS:
+	case CEPH_OSD_OP_OMAPSETHEADER:
+	case CEPH_OSD_OP_OMAPCLEAR:
+	case CEPH_OSD_OP_OMAPRMKEYS:
+	case CEPH_OSD_OP_OMAP_CMP:
+	case CEPH_OSD_OP_CLONERANGE:
+	case CEPH_OSD_OP_ASSERT_SRC_VERSION:
+	case CEPH_OSD_OP_SRC_CMPXATTR:
+	case CEPH_OSD_OP_GETXATTR:
+	case CEPH_OSD_OP_GETXATTRS:
+	case CEPH_OSD_OP_CMPXATTR:
+	case CEPH_OSD_OP_SETXATTR:
+	case CEPH_OSD_OP_SETXATTRS:
+	case CEPH_OSD_OP_RESETXATTRS:
+	case CEPH_OSD_OP_RMXATTR:
+	case CEPH_OSD_OP_PULL:
+	case CEPH_OSD_OP_PUSH:
+	case CEPH_OSD_OP_BALANCEREADS:
+	case CEPH_OSD_OP_UNBALANCEREADS:
+	case CEPH_OSD_OP_SCRUB:
+	case CEPH_OSD_OP_SCRUB_RESERVE:
+	case CEPH_OSD_OP_SCRUB_UNRESERVE:
+	case CEPH_OSD_OP_SCRUB_STOP:
+	case CEPH_OSD_OP_SCRUB_MAP:
+	case CEPH_OSD_OP_WRLOCK:
+	case CEPH_OSD_OP_WRUNLOCK:
+	case CEPH_OSD_OP_RDLOCK:
+	case CEPH_OSD_OP_RDUNLOCK:
+	case CEPH_OSD_OP_UPLOCK:
+	case CEPH_OSD_OP_DNLOCK:
+	case CEPH_OSD_OP_PGLS:
+	case CEPH_OSD_OP_PGLS_FILTER:
+		pr_err("unsupported osd opcode %s\n",
+			ceph_osd_op_name(dst->op));
+		WARN_ON(1);
+		break;
 	}
 	dst->payload_len = cpu_to_le32(src->payload_len);
 }
@@ -365,25 +324,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-			     u64 off, u64 *plen,
+			     u64 off, u64 len, unsigned int num_op,
 			     struct ceph_osd_req_op *src_ops,
-			     struct ceph_snap_context *snapc,
-			     struct timespec *mtime,
-			     const char *oid,
-			     int oid_len)
+			     struct ceph_snap_context *snapc, u64 snap_id,
+			     struct timespec *mtime)
 {
 	struct ceph_msg *msg = req->r_request;
 	struct ceph_osd_request_head *head;
 	struct ceph_osd_req_op *src_op;
 	struct ceph_osd_op *op;
 	void *p;
-	int num_op = get_num_ops(src_ops, NULL);
 	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
 	int flags = req->r_flags;
-	u64 data_len = 0;
+	u64 data_len;
 	int i;
 
+	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
+
 	head = msg->front.iov_base;
+	head->snapid = cpu_to_le64(snap_id);
 	op = (void *)(head + 1);
 	p = (void *)(op + num_op);
 
@@ -393,23 +352,17 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 	head->flags = cpu_to_le32(flags);
 	if (flags & CEPH_OSD_FLAG_WRITE)
 		ceph_encode_timespec(&head->mtime, mtime);
+	BUG_ON(num_op > (unsigned int) ((u16) -1));
 	head->num_ops = cpu_to_le16(num_op);
 
-
 	/* fill in oid */
-	head->object_len = cpu_to_le32(oid_len);
-	memcpy(p, oid, oid_len);
-	p += oid_len;
+	head->object_len = cpu_to_le32(req->r_oid_len);
+	memcpy(p, req->r_oid, req->r_oid_len);
+	p += req->r_oid_len;
 
 	src_op = src_ops;
-	while (src_op->op) {
-		osd_req_encode_op(req, op, src_op);
-		src_op++;
-		op++;
-	}
-
-	if (req->r_trail)
-		data_len += req->r_trail->length;
+	while (num_op--)
+		osd_req_encode_op(req, op++, src_op++);
 
 	if (snapc) {
 		head->snap_seq = cpu_to_le64(snapc->seq);
@@ -420,14 +373,12 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 		}
 	}
 
+	data_len = req->r_trail.length;
 	if (flags & CEPH_OSD_FLAG_WRITE) {
 		req->r_request->hdr.data_off = cpu_to_le16(off);
-		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
-	} else if (data_len) {
-		req->r_request->hdr.data_off = 0;
-		req->r_request->hdr.data_len = cpu_to_le32(data_len);
+		data_len += len;
 	}
-
+	req->r_request->hdr.data_len = cpu_to_le32(data_len);
 	req->r_request->page_alignment = req->r_page_alignment;
 
 	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
@@ -459,34 +410,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 					       u32 truncate_seq,
 					       u64 truncate_size,
 					       struct timespec *mtime,
-					       bool use_mempool, int num_reply,
+					       bool use_mempool,
 					       int page_align)
 {
-	struct ceph_osd_req_op ops[3];
+	struct ceph_osd_req_op ops[2];
 	struct ceph_osd_request *req;
+	unsigned int num_op = 1;
 	int r;
 
+	memset(&ops, 0, sizeof ops);
+
 	ops[0].op = opcode;
 	ops[0].extent.truncate_seq = truncate_seq;
 	ops[0].extent.truncate_size = truncate_size;
-	ops[0].payload_len = 0;
 
 	if (do_sync) {
 		ops[1].op = CEPH_OSD_OP_STARTSYNC;
-		ops[1].payload_len = 0;
-		ops[2].op = 0;
-	} else
-		ops[1].op = 0;
-
-	req = ceph_osdc_alloc_request(osdc, flags,
-					 snapc, ops,
-					 use_mempool,
-					 GFP_NOFS, NULL, NULL);
+		num_op++;
+	}
+
+	req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
+					GFP_NOFS);
 	if (!req)
 		return ERR_PTR(-ENOMEM);
+	req->r_flags = flags;
 
 	/* calculate max write size */
-	r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+	r = calc_layout(vino, layout, off, plen, req, ops);
 	if (r < 0)
 		return ERR_PTR(r);
 	req->r_file_layout = *layout;  /* keep a copy */
@@ -496,10 +446,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	req->r_num_pages = calc_pages_for(page_align, *plen);
 	req->r_page_alignment = page_align;
 
-	ceph_osdc_build_request(req, off, plen, ops,
-				snapc,
-				mtime,
-				req->r_oid, req->r_oid_len);
+	ceph_osdc_build_request(req, off, *plen, num_op, ops,
+				snapc, vino.snap, mtime);
 
 	return req;
 }
@@ -623,8 +571,8 @@ static void osd_reset(struct ceph_connection *con)
 	down_read(&osdc->map_sem);
 	mutex_lock(&osdc->request_mutex);
 	__kick_osd_requests(osdc, osd);
+	__send_queued(osdc);
 	mutex_unlock(&osdc->request_mutex);
-	send_queued(osdc);
 	up_read(&osdc->map_sem);
 }
 
@@ -739,31 +687,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-	struct ceph_osd_request *req;
-	int ret = 0;
+	struct ceph_entity_addr *peer_addr;
 
 	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
 	if (list_empty(&osd->o_requests) &&
 	    list_empty(&osd->o_linger_requests)) {
 		__remove_osd(osdc, osd);
-		ret = -ENODEV;
-	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
-			  &osd->o_con.peer_addr,
-			  sizeof(osd->o_con.peer_addr)) == 0 &&
-		   !ceph_con_opened(&osd->o_con)) {
+
+		return -ENODEV;
+	}
+
+	peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+	if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
+			!ceph_con_opened(&osd->o_con)) {
+		struct ceph_osd_request *req;
+
 		dout(" osd addr hasn't changed and connection never opened,"
 		     " letting msgr retry");
 		/* touch each r_stamp for handle_timeout()'s benfit */
 		list_for_each_entry(req, &osd->o_requests, r_osd_item)
 			req->r_stamp = jiffies;
-		ret = -EAGAIN;
-	} else {
-		ceph_con_close(&osd->o_con);
-		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
-			      &osdc->osdmap->osd_addr[osd->o_osd]);
-		osd->o_incarnation++;
+
+		return -EAGAIN;
 	}
-	return ret;
+
+	ceph_con_close(&osd->o_con);
+	ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
+	osd->o_incarnation++;
+
+	return 0;
 }
 
 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -1062,16 +1014,13 @@ static void __send_request(struct ceph_osd_client *osdc,
 /*
  * Send any requests in the queue (req_unsent).
  */
-static void send_queued(struct ceph_osd_client *osdc)
+static void __send_queued(struct ceph_osd_client *osdc)
 {
 	struct ceph_osd_request *req, *tmp;
 
-	dout("send_queued\n");
-	mutex_lock(&osdc->request_mutex);
-	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+	dout("__send_queued\n");
+	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
 		__send_request(osdc, req);
-	}
-	mutex_unlock(&osdc->request_mutex);
 }
 
 /*
@@ -1123,8 +1072,8 @@ static void handle_timeout(struct work_struct *work)
 	}
 
 	__schedule_osd_timeout(osdc);
+	__send_queued(osdc);
 	mutex_unlock(&osdc->request_mutex);
-	send_queued(osdc);
 	up_read(&osdc->map_sem);
 }
 
@@ -1462,7 +1411,9 @@ done:
 	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
 		ceph_monc_request_next_osdmap(&osdc->client->monc);
 
-	send_queued(osdc);
+	mutex_lock(&osdc->request_mutex);
+	__send_queued(osdc);
+	mutex_unlock(&osdc->request_mutex);
 	up_read(&osdc->map_sem);
 	wake_up_all(&osdc->client->auth_wq);
 	return;
@@ -1556,8 +1507,7 @@ static void __remove_event(struct ceph_osd_event *event)
 
 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
 			   void (*event_cb)(u64, u64, u8, void *),
-			   int one_shot, void *data,
-			   struct ceph_osd_event **pevent)
+			   void *data, struct ceph_osd_event **pevent)
 {
 	struct ceph_osd_event *event;
 
@@ -1567,14 +1517,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
 
 	dout("create_event %p\n", event);
 	event->cb = event_cb;
-	event->one_shot = one_shot;
+	event->one_shot = 0;
 	event->data = data;
 	event->osdc = osdc;
 	INIT_LIST_HEAD(&event->osd_node);
 	RB_CLEAR_NODE(&event->node);
 	kref_init(&event->kref);   /* one ref for us */
 	kref_get(&event->kref);    /* one ref for the caller */
-	init_completion(&event->completion);
 
 	spin_lock(&osdc->event_lock);
 	event->cookie = ++osdc->event_count;
@@ -1610,7 +1559,6 @@ static void do_event_work(struct work_struct *work)
 
 	dout("do_event_work completing %p\n", event);
 	event->cb(ver, notify_id, opcode, event->data);
-	complete(&event->completion);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
@@ -1620,7 +1568,8 @@ static void do_event_work(struct work_struct *work)
 /*
  * Process osd watch notifications
  */
-void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_watch_notify(struct ceph_osd_client *osdc,
+				struct ceph_msg *msg)
 {
 	void *p, *end;
 	u8 proto_ver;
@@ -1641,9 +1590,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 	spin_lock(&osdc->event_lock);
 	event = __find_event(osdc, cookie);
 	if (event) {
+		BUG_ON(event->one_shot);
 		get_event(event);
-		if (event->one_shot)
-			__remove_event(event);
 	}
 	spin_unlock(&osdc->event_lock);
 	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
@@ -1668,7 +1616,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 	return;
 
 done_err:
-	complete(&event->completion);
 	ceph_osdc_put_event(event);
 	return;
 
@@ -1677,21 +1624,6 @@ bad:
 	return;
 }
 
-int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
-{
-	int err;
-
-	dout("wait_event %p\n", event);
-	err = wait_for_completion_interruptible_timeout(&event->completion,
-							timeout * HZ);
-	ceph_osdc_put_event(event);
-	if (err > 0)
-		err = 0;
-	dout("wait_event %p returns %d\n", event, err);
-	return err;
-}
-EXPORT_SYMBOL(ceph_osdc_wait_event);
-
 /*
  * Register request, send initial attempt.
  */
@@ -1706,7 +1638,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 #ifdef CONFIG_BLOCK
 	req->r_request->bio = req->r_bio;
 #endif
-	req->r_request->trail = req->r_trail;
+	req->r_request->trail = &req->r_trail;
 
 	register_request(osdc, req);
 
@@ -1865,7 +1797,6 @@ out_mempool:
 out:
 	return err;
 }
-EXPORT_SYMBOL(ceph_osdc_init);
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
@@ -1882,7 +1813,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
 	ceph_msgpool_destroy(&osdc->msgpool_op);
 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 }
-EXPORT_SYMBOL(ceph_osdc_stop);
 
 /*
  * Read some contiguous pages.  If we cross a stripe boundary, shorten
@@ -1902,7 +1832,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 	req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
 				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
 				    NULL, 0, truncate_seq, truncate_size, NULL,
-				    false, 1, page_align);
+				    false, page_align);
 	if (IS_ERR(req))
 		return PTR_ERR(req);
 
@@ -1931,8 +1861,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 			 u64 off, u64 len,
 			 u32 truncate_seq, u64 truncate_size,
 			 struct timespec *mtime,
-			 struct page **pages, int num_pages,
-			 int flags, int do_sync, bool nofail)
+			 struct page **pages, int num_pages)
 {
 	struct ceph_osd_request *req;
 	int rc = 0;
@@ -1941,11 +1870,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 	BUG_ON(vino.snap != CEPH_NOSNAP);
 	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
 				    CEPH_OSD_OP_WRITE,
-				    flags | CEPH_OSD_FLAG_ONDISK |
-					    CEPH_OSD_FLAG_WRITE,
-				    snapc, do_sync,
+				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+				    snapc, 0,
 				    truncate_seq, truncate_size, mtime,
-				    nofail, 1, page_align);
+				    true, page_align);
 	if (IS_ERR(req))
 		return PTR_ERR(req);
 
@@ -1954,7 +1882,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 	dout("writepages %llu~%llu (%d pages)\n", off, len,
 	     req->r_num_pages);
 
-	rc = ceph_osdc_start_request(osdc, req, nofail);
+	rc = ceph_osdc_start_request(osdc, req, true);
 	if (!rc)
 		rc = ceph_osdc_wait_request(osdc, req);
 
@@ -2047,7 +1975,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 	if (data_len > 0) {
 		int want = calc_pages_for(req->r_page_alignment, data_len);
 
-		if (unlikely(req->r_num_pages < want)) {
+		if (req->r_pages && unlikely(req->r_num_pages < want)) {
 			pr_warning("tid %lld reply has %d bytes %d pages, we"
 				   " had only %d pages ready\n", tid, data_len,
 				   want, req->r_num_pages);

+ 20 - 23
net/ceph/osdmap.c

@@ -13,26 +13,18 @@
 
 char *ceph_osdmap_state_str(char *str, int len, int state)
 {
-	int flag = 0;
-
 	if (!len)
-		goto done;
-
-	*str = '\0';
-	if (state) {
-		if (state & CEPH_OSD_EXISTS) {
-			snprintf(str, len, "exists");
-			flag = 1;
-		}
-		if (state & CEPH_OSD_UP) {
-			snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""),
-				 "up");
-			flag = 1;
-		}
-	} else {
+		return str;
+
+	if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
+		snprintf(str, len, "exists, up");
+	else if (state & CEPH_OSD_EXISTS)
+		snprintf(str, len, "exists");
+	else if (state & CEPH_OSD_UP)
+		snprintf(str, len, "up");
+	else
 		snprintf(str, len, "doesn't exist");
-	}
-done:
+
 	return str;
 }
 
@@ -170,6 +162,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
         c->choose_local_tries = 2;
         c->choose_local_fallback_tries = 5;
         c->choose_total_tries = 19;
+	c->chooseleaf_descend_once = 0;
 
 	ceph_decode_need(p, end, 4*sizeof(u32), bad);
 	magic = ceph_decode_32(p);
@@ -336,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
         dout("crush decode tunable choose_total_tries = %d",
              c->choose_total_tries);
 
+	ceph_decode_need(p, end, sizeof(u32), done);
+	c->chooseleaf_descend_once = ceph_decode_32(p);
+	dout("crush decode tunable chooseleaf_descend_once = %d",
+	     c->chooseleaf_descend_once);
+
 done:
 	dout("crush_decode success\n");
 	return c;
@@ -1010,7 +1008,7 @@ bad:
  * pass a stride back to the caller.
  */
 int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
-				   u64 off, u64 *plen,
+				   u64 off, u64 len,
 				   u64 *ono,
 				   u64 *oxoff, u64 *oxlen)
 {
@@ -1021,7 +1019,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
 	u32 su_per_object;
 	u64 t, su_offset;
 
-	dout("mapping %llu~%llu  osize %u fl_su %u\n", off, *plen,
+	dout("mapping %llu~%llu  osize %u fl_su %u\n", off, len,
 	     osize, su);
 	if (su == 0 || sc == 0)
 		goto invalid;
@@ -1054,11 +1052,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
 
 	/*
 	 * Calculate the length of the extent being written to the selected
-	 * object. This is the minimum of the full length requested (plen) or
+	 * object. This is the minimum of the full length requested (len) or
 	 * the remainder of the current stripe being written to.
 	 */
-	*oxlen = min_t(u64, *plen, su - su_offset);
-	*plen = *oxlen;
+	*oxlen = min_t(u64, len, su - su_offset);
 
 	dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
 	return 0;

+ 11 - 13
net/ceph/pagevec.c

@@ -12,7 +12,7 @@
 /*
  * build a vector of user pages
  */
-struct page **ceph_get_direct_page_vector(const char __user *data,
+struct page **ceph_get_direct_page_vector(const void __user *data,
 					  int num_pages, bool write_page)
 {
 	struct page **pages;
@@ -93,7 +93,7 @@ EXPORT_SYMBOL(ceph_alloc_page_vector);
  * copy user data into a page vector
  */
 int ceph_copy_user_to_page_vector(struct page **pages,
-					 const char __user *data,
+					 const void __user *data,
 					 loff_t off, size_t len)
 {
 	int i = 0;
@@ -118,17 +118,17 @@ int ceph_copy_user_to_page_vector(struct page **pages,
 }
 EXPORT_SYMBOL(ceph_copy_user_to_page_vector);
 
-int ceph_copy_to_page_vector(struct page **pages,
-				    const char *data,
+void ceph_copy_to_page_vector(struct page **pages,
+				    const void *data,
 				    loff_t off, size_t len)
 {
 	int i = 0;
 	size_t po = off & ~PAGE_CACHE_MASK;
 	size_t left = len;
-	size_t l;
 
 	while (left > 0) {
-		l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+		size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+
 		memcpy(page_address(pages[i]) + po, data, l);
 		data += l;
 		left -= l;
@@ -138,21 +138,20 @@ int ceph_copy_to_page_vector(struct page **pages,
 			i++;
 		}
 	}
-	return len;
 }
 EXPORT_SYMBOL(ceph_copy_to_page_vector);
 
-int ceph_copy_from_page_vector(struct page **pages,
-				    char *data,
+void ceph_copy_from_page_vector(struct page **pages,
+				    void *data,
 				    loff_t off, size_t len)
 {
 	int i = 0;
 	size_t po = off & ~PAGE_CACHE_MASK;
 	size_t left = len;
-	size_t l;
 
 	while (left > 0) {
-		l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+		size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+
 		memcpy(data, page_address(pages[i]) + po, l);
 		data += l;
 		left -= l;
@@ -162,7 +161,6 @@ int ceph_copy_from_page_vector(struct page **pages,
 			i++;
 		}
 	}
-	return len;
 }
 EXPORT_SYMBOL(ceph_copy_from_page_vector);
 
@@ -170,7 +168,7 @@ EXPORT_SYMBOL(ceph_copy_from_page_vector);
  * copy user data from a page vector into a user pointer
  */
 int ceph_copy_page_vector_to_user(struct page **pages,
-					 char __user *data,
+					 void __user *data,
 					 loff_t off, size_t len)
 {
 	int i = 0;

Some files were not shown because too many files changed in this diff