Browse Source

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph changes from Alex Elder:
 "This is a big pull.

  Most of it is culmination of Alex's work to implement RBD image
  layering, which is now complete (yay!).

  There is also some work from Yan to fix i_mutex behavior surrounding
  writes in cephfs, a sync write fix, a fix for RBD images that get
  resized while they are mapped, and a few patches from me that resolve
  annoying auth warnings and fix several bugs in the ceph auth code."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (254 commits)
  rbd: fix image request leak on parent read
  libceph: use slab cache for osd client requests
  libceph: allocate ceph message data with a slab allocator
  libceph: allocate ceph messages with a slab allocator
  rbd: allocate image object names with a slab allocator
  rbd: allocate object requests with a slab allocator
  rbd: allocate name separate from obj_request
  rbd: allocate image requests with a slab allocator
  rbd: use binary search for snapshot lookup
  rbd: clear EXISTS flag if mapped snapshot disappears
  rbd: kill off the snapshot list
  rbd: define rbd_snap_size() and rbd_snap_features()
  rbd: use snap_id not index to look up snap info
  rbd: look up snapshot name in names buffer
  rbd: drop obj_request->version
  rbd: drop rbd_obj_method_sync() version parameter
  rbd: more version parameter removal
  rbd: get rid of some version parameters
  rbd: stop tracking header object version
  rbd: snap names are pointer to constant data
  ...
Linus Torvalds 12 years ago
parent
commit
91f8575685

+ 0 - 20
Documentation/ABI/testing/sysfs-bus-rbd

@@ -66,27 +66,7 @@ current_snap
 
 	The current snapshot for which the device is mapped.
 
-snap_*
-
-	A directory per each snapshot
-
 parent
 
 	Information identifying the pool, image, and snapshot id for
 	the parent image in a layered rbd image (format 2 only).
-
-Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name>
--------------------------------------------------------------
-
-snap_id
-
-	The rados internal snapshot id assigned for this snapshot
-
-snap_size
-
-	The size of the image when this snapshot was taken.
-
-snap_features
-
-	A hexadecimal encoding of the feature bits for this snapshot.
-

File diff suppressed because it is too large
+ 481 - 208
drivers/block/rbd.c


+ 108 - 114
fs/ceph/addr.c

@@ -236,15 +236,21 @@ static int ceph_readpage(struct file *filp, struct page *page)
 static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 {
 	struct inode *inode = req->r_inode;
+	struct ceph_osd_data *osd_data;
 	int rc = req->r_result;
 	int bytes = le32_to_cpu(msg->hdr.data_len);
+	int num_pages;
 	int i;
 
 	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
 
 	/* unlock all pages, zeroing any data we didn't read */
-	for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
-		struct page *page = req->r_pages[i];
+	osd_data = osd_req_op_extent_osd_data(req, 0);
+	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+	num_pages = calc_pages_for((u64)osd_data->alignment,
+					(u64)osd_data->length);
+	for (i = 0; i < num_pages; i++) {
+		struct page *page = osd_data->pages[i];
 
 		if (bytes < (int)PAGE_CACHE_SIZE) {
 			/* zero (remainder of) page */
@@ -257,8 +263,9 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 		SetPageUptodate(page);
 		unlock_page(page);
 		page_cache_release(page);
+		bytes -= PAGE_CACHE_SIZE;
 	}
-	kfree(req->r_pages);
+	kfree(osd_data->pages);
 }
 
 static void ceph_unlock_page_vector(struct page **pages, int num_pages)
@@ -279,6 +286,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 		&ceph_inode_to_client(inode)->client->osdc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct page *page = list_entry(page_list->prev, struct page, lru);
+	struct ceph_vino vino;
 	struct ceph_osd_request *req;
 	u64 off;
 	u64 len;
@@ -303,18 +311,17 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 	len = nr_pages << PAGE_CACHE_SHIFT;
 	dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
 	     off, len);
-
-	req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
-				    off, &len,
-				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
-				    NULL, 0,
+	vino = ceph_vino(inode);
+	req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
+				    1, CEPH_OSD_OP_READ,
+				    CEPH_OSD_FLAG_READ, NULL,
 				    ci->i_truncate_seq, ci->i_truncate_size,
-				    NULL, false, 0);
+				    false);
 	if (IS_ERR(req))
 		return PTR_ERR(req);
 
 	/* build page vector */
-	nr_pages = len >> PAGE_CACHE_SHIFT;
+	nr_pages = calc_pages_for(0, len);
 	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
 	ret = -ENOMEM;
 	if (!pages)
@@ -336,11 +343,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 		}
 		pages[i] = page;
 	}
-	req->r_pages = pages;
-	req->r_num_pages = nr_pages;
+	osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
 	req->r_callback = finish_read;
 	req->r_inode = inode;
 
+	ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
+
 	dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
 	ret = ceph_osdc_start_request(osdc, req, false);
 	if (ret < 0)
@@ -373,7 +381,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 		max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
 			>> PAGE_SHIFT;
 
-	dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages,
+	dout("readpages %p file %p nr_pages %d max %d\n", inode,
+		file, nr_pages,
 	     max);
 	while (!list_empty(page_list)) {
 		rc = start_read(inode, page_list, max);
@@ -548,17 +557,23 @@ static void writepages_finish(struct ceph_osd_request *req,
 {
 	struct inode *inode = req->r_inode;
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_osd_data *osd_data;
 	unsigned wrote;
 	struct page *page;
+	int num_pages;
 	int i;
 	struct ceph_snap_context *snapc = req->r_snapc;
 	struct address_space *mapping = inode->i_mapping;
 	int rc = req->r_result;
-	u64 bytes = le64_to_cpu(req->r_request_ops[0].extent.length);
+	u64 bytes = req->r_ops[0].extent.length;
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	long writeback_stat;
 	unsigned issued = ceph_caps_issued(ci);
 
+	osd_data = osd_req_op_extent_osd_data(req, 0);
+	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+	num_pages = calc_pages_for((u64)osd_data->alignment,
+					(u64)osd_data->length);
 	if (rc >= 0) {
 		/*
 		 * Assume we wrote the pages we originally sent.  The
@@ -566,7 +581,7 @@ static void writepages_finish(struct ceph_osd_request *req,
 		 * raced with a truncation and was adjusted at the osd,
 		 * so don't believe the reply.
 		 */
-		wrote = req->r_num_pages;
+		wrote = num_pages;
 	} else {
 		wrote = 0;
 		mapping_set_error(mapping, rc);
@@ -575,8 +590,8 @@ static void writepages_finish(struct ceph_osd_request *req,
 	     inode, rc, bytes, wrote);
 
 	/* clean all pages */
-	for (i = 0; i < req->r_num_pages; i++) {
-		page = req->r_pages[i];
+	for (i = 0; i < num_pages; i++) {
+		page = osd_data->pages[i];
 		BUG_ON(!page);
 		WARN_ON(!PageUptodate(page));
 
@@ -605,32 +620,34 @@ static void writepages_finish(struct ceph_osd_request *req,
 		unlock_page(page);
 	}
 	dout("%p wrote+cleaned %d pages\n", inode, wrote);
-	ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc);
+	ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
 
-	ceph_release_pages(req->r_pages, req->r_num_pages);
-	if (req->r_pages_from_pool)
-		mempool_free(req->r_pages,
+	ceph_release_pages(osd_data->pages, num_pages);
+	if (osd_data->pages_from_pool)
+		mempool_free(osd_data->pages,
 			     ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
 	else
-		kfree(req->r_pages);
+		kfree(osd_data->pages);
 	ceph_osdc_put_request(req);
 }
 
-/*
- * allocate a page vec, either directly, or if necessary, via a the
- * mempool.  we avoid the mempool if we can because req->r_num_pages
- * may be less than the maximum write size.
- */
-static void alloc_page_vec(struct ceph_fs_client *fsc,
-			   struct ceph_osd_request *req)
+static struct ceph_osd_request *
+ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len,
+				struct ceph_snap_context *snapc, int num_ops)
 {
-	req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages,
-			       GFP_NOFS);
-	if (!req->r_pages) {
-		req->r_pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS);
-		req->r_pages_from_pool = 1;
-		WARN_ON(!req->r_pages);
-	}
+	struct ceph_fs_client *fsc;
+	struct ceph_inode_info *ci;
+	struct ceph_vino vino;
+
+	fsc = ceph_inode_to_client(inode);
+	ci = ceph_inode(inode);
+	vino = ceph_vino(inode);
+	/* BUG_ON(vino.snap != CEPH_NOSNAP); */
+
+	return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+			vino, offset, len, num_ops, CEPH_OSD_OP_WRITE,
+			CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK,
+			snapc, ci->i_truncate_seq, ci->i_truncate_size, true);
 }
 
 /*
@@ -653,7 +670,7 @@ static int ceph_writepages_start(struct address_space *mapping,
 	unsigned wsize = 1 << inode->i_blkbits;
 	struct ceph_osd_request *req = NULL;
 	int do_sync;
-	u64 snap_size = 0;
+	u64 snap_size;
 
 	/*
 	 * Include a 'sync' in the OSD request if this is a data
@@ -699,6 +716,7 @@ static int ceph_writepages_start(struct address_space *mapping,
 retry:
 	/* find oldest snap context with dirty data */
 	ceph_put_snap_context(snapc);
+	snap_size = 0;
 	snapc = get_oldest_context(inode, &snap_size);
 	if (!snapc) {
 		/* hmm, why does writepages get called when there
@@ -706,6 +724,8 @@ retry:
 		dout(" no snap context with dirty data?\n");
 		goto out;
 	}
+	if (snap_size == 0)
+		snap_size = i_size_read(inode);
 	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
 	     snapc, snapc->seq, snapc->num_snaps);
 	if (last_snapc && snapc != last_snapc) {
@@ -718,10 +738,14 @@ retry:
 	last_snapc = snapc;
 
 	while (!done && index <= end) {
+		int num_ops = do_sync ? 2 : 1;
+		struct ceph_vino vino;
 		unsigned i;
 		int first;
 		pgoff_t next;
 		int pvec_pages, locked_pages;
+		struct page **pages = NULL;
+		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */
 		struct page *page;
 		int want;
 		u64 offset, len;
@@ -773,11 +797,8 @@ get_more_pages:
 				dout("waiting on writeback %p\n", page);
 				wait_on_page_writeback(page);
 			}
-			if ((snap_size && page_offset(page) > snap_size) ||
-			    (!snap_size &&
-			     page_offset(page) > i_size_read(inode))) {
-				dout("%p page eof %llu\n", page, snap_size ?
-				     snap_size : i_size_read(inode));
+			if (page_offset(page) >= snap_size) {
+				dout("%p page eof %llu\n", page, snap_size);
 				done = 1;
 				unlock_page(page);
 				break;
@@ -805,22 +826,23 @@ get_more_pages:
 				break;
 			}
 
-			/* ok */
+			/*
+			 * We have something to write.  If this is
+			 * the first locked page this time through,
+			 * allocate an osd request and a page array
+			 * that it will use.
+			 */
 			if (locked_pages == 0) {
+				size_t size;
+
+				BUG_ON(pages);
+
 				/* prepare async write request */
-				offset = (u64) page_offset(page);
+				offset = (u64)page_offset(page);
 				len = wsize;
-				req = ceph_osdc_new_request(&fsc->client->osdc,
-					    &ci->i_layout,
-					    ceph_vino(inode),
-					    offset, &len,
-					    CEPH_OSD_OP_WRITE,
-					    CEPH_OSD_FLAG_WRITE |
-						    CEPH_OSD_FLAG_ONDISK,
-					    snapc, do_sync,
-					    ci->i_truncate_seq,
-					    ci->i_truncate_size,
-					    &inode->i_mtime, true, 0);
+				req = ceph_writepages_osd_request(inode,
+							offset, &len, snapc,
+							num_ops);
 
 				if (IS_ERR(req)) {
 					rc = PTR_ERR(req);
@@ -828,11 +850,17 @@ get_more_pages:
 					break;
 				}
 
-				max_pages = req->r_num_pages;
-
-				alloc_page_vec(fsc, req);
 				req->r_callback = writepages_finish;
 				req->r_inode = inode;
+
+				max_pages = calc_pages_for(0, (u64)len);
+				size = max_pages * sizeof (*pages);
+				pages = kmalloc(size, GFP_NOFS);
+				if (!pages) {
+					pool = fsc->wb_pagevec_pool;
+					pages = mempool_alloc(pool, GFP_NOFS);
+					BUG_ON(!pages);
+				}
 			}
 
 			/* note position of first page in pvec */
@@ -850,7 +878,7 @@ get_more_pages:
 			}
 
 			set_page_writeback(page);
-			req->r_pages[locked_pages] = page;
+			pages[locked_pages] = page;
 			locked_pages++;
 			next = page->index + 1;
 		}
@@ -879,18 +907,27 @@ get_more_pages:
 			pvec.nr -= i-first;
 		}
 
-		/* submit the write */
-		offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
-		len = min((snap_size ? snap_size : i_size_read(inode)) - offset,
+		/* Format the osd request message and submit the write */
+
+		offset = page_offset(pages[0]);
+		len = min(snap_size - offset,
 			  (u64)locked_pages << PAGE_CACHE_SHIFT);
 		dout("writepages got %d pages at %llu~%llu\n",
 		     locked_pages, offset, len);
 
-		/* revise final length, page count */
-		req->r_num_pages = locked_pages;
-		req->r_request_ops[0].extent.length = cpu_to_le64(len);
-		req->r_request_ops[0].payload_len = cpu_to_le32(len);
-		req->r_request->hdr.data_len = cpu_to_le32(len);
+		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
+							!!pool, false);
+
+		pages = NULL;	/* request message now owns the pages array */
+		pool = NULL;
+
+		/* Update the write op length in case we changed it */
+
+		osd_req_op_extent_update(req, 0, len);
+
+		vino = ceph_vino(inode);
+		ceph_osdc_build_request(req, offset, snapc, vino.snap,
+					&inode->i_mtime);
 
 		rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
 		BUG_ON(rc);
@@ -1067,51 +1104,23 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
 			    struct page **pagep, void **fsdata)
 {
 	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_file_info *fi = file->private_data;
 	struct page *page;
 	pgoff_t index = pos >> PAGE_CACHE_SHIFT;
-	int r, want, got = 0;
-
-	if (fi->fmode & CEPH_FILE_MODE_LAZY)
-		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
-	else
-		want = CEPH_CAP_FILE_BUFFER;
-
-	dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
-	     inode, ceph_vinop(inode), pos, len, inode->i_size);
-	r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len);
-	if (r < 0)
-		return r;
-	dout("write_begin %p %llx.%llx %llu~%u  got cap refs on %s\n",
-	     inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
-	if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
-		ceph_put_cap_refs(ci, got);
-		return -EAGAIN;
-	}
+	int r;
 
 	do {
 		/* get a page */
 		page = grab_cache_page_write_begin(mapping, index, 0);
-		if (!page) {
-			r = -ENOMEM;
-			break;
-		}
+		if (!page)
+			return -ENOMEM;
+		*pagep = page;
 
 		dout("write_begin file %p inode %p page %p %d~%d\n", file,
 		     inode, page, (int)pos, (int)len);
 
 		r = ceph_update_writeable_page(file, pos, len, page);
-		if (r)
-			page_cache_release(page);
 	} while (r == -EAGAIN);
 
-	if (r) {
-		ceph_put_cap_refs(ci, got);
-	} else {
-		*pagep = page;
-		*(int *)fsdata = got;
-	}
 	return r;
 }
 
@@ -1125,12 +1134,10 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
 			  struct page *page, void *fsdata)
 {
 	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	unsigned from = pos & (PAGE_CACHE_SIZE - 1);
 	int check_cap = 0;
-	int got = (unsigned long)fsdata;
 
 	dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
 	     inode, page, (int)pos, (int)copied, (int)len);
@@ -1153,19 +1160,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
 	up_read(&mdsc->snap_rwsem);
 	page_cache_release(page);
 
-	if (copied > 0) {
-		int dirty;
-		spin_lock(&ci->i_ceph_lock);
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
-		spin_unlock(&ci->i_ceph_lock);
-		if (dirty)
-			__mark_inode_dirty(inode, dirty);
-	}
-
-	dout("write_end %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
-	     inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
-	ceph_put_cap_refs(ci, got);
-
 	if (check_cap)
 		ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
 

+ 20 - 13
fs/ceph/caps.c

@@ -490,15 +490,17 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
 		ci->i_rdcache_gen++;
 
 	/*
-	 * if we are newly issued FILE_SHARED, clear D_COMPLETE; we
+	 * if we are newly issued FILE_SHARED, mark dir not complete; we
 	 * don't know what happened to this directory while we didn't
 	 * have the cap.
 	 */
 	if ((issued & CEPH_CAP_FILE_SHARED) &&
 	    (had & CEPH_CAP_FILE_SHARED) == 0) {
 		ci->i_shared_gen++;
-		if (S_ISDIR(ci->vfs_inode.i_mode))
-			ceph_dir_clear_complete(&ci->vfs_inode);
+		if (S_ISDIR(ci->vfs_inode.i_mode)) {
+			dout(" marking %p NOT complete\n", &ci->vfs_inode);
+			__ceph_dir_clear_complete(ci);
+		}
 	}
 }
 
@@ -553,6 +555,7 @@ retry:
 		cap->implemented = 0;
 		cap->mds = mds;
 		cap->mds_wanted = 0;
+		cap->mseq = 0;
 
 		cap->ci = ci;
 		__insert_cap_node(ci, cap);
@@ -628,7 +631,10 @@ retry:
 	cap->cap_id = cap_id;
 	cap->issued = issued;
 	cap->implemented |= issued;
-	cap->mds_wanted |= wanted;
+	if (mseq > cap->mseq)
+		cap->mds_wanted = wanted;
+	else
+		cap->mds_wanted |= wanted;
 	cap->seq = seq;
 	cap->issue_seq = seq;
 	cap->mseq = mseq;
@@ -997,9 +1003,9 @@ static int send_cap_msg(struct ceph_mds_session *session,
 	return 0;
 }
 
-static void __queue_cap_release(struct ceph_mds_session *session,
-				u64 ino, u64 cap_id, u32 migrate_seq,
-				u32 issue_seq)
+void __queue_cap_release(struct ceph_mds_session *session,
+			 u64 ino, u64 cap_id, u32 migrate_seq,
+			 u32 issue_seq)
 {
 	struct ceph_msg *msg;
 	struct ceph_mds_cap_release *head;
@@ -2046,6 +2052,13 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
 		goto out;
 	}
 
+	/* finish pending truncate */
+	while (ci->i_truncate_pending) {
+		spin_unlock(&ci->i_ceph_lock);
+		__ceph_do_pending_vmtruncate(inode, !(need & CEPH_CAP_FILE_WR));
+		spin_lock(&ci->i_ceph_lock);
+	}
+
 	if (need & CEPH_CAP_FILE_WR) {
 		if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
 			dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
@@ -2067,12 +2080,6 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
 	}
 	have = __ceph_caps_issued(ci, &implemented);
 
-	/*
-	 * disallow writes while a truncate is pending
-	 */
-	if (ci->i_truncate_pending)
-		have &= ~CEPH_CAP_FILE_WR;
-
 	if ((have & need) == need) {
 		/*
 		 * Look at (implemented & ~have & not) so that we keep waiting

+ 13 - 52
fs/ceph/dir.c

@@ -107,7 +107,7 @@ static unsigned fpos_off(loff_t p)
  * falling back to a "normal" sync readdir if any dentries in the dir
  * are dropped.
  *
- * D_COMPLETE tells indicates we have all dentries in the dir.  It is
+ * Complete dir indicates that we have all dentries in the dir.  It is
  * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
  * the MDS if/when the directory is modified).
  */
@@ -198,8 +198,8 @@ more:
 	filp->f_pos++;
 
 	/* make sure a dentry wasn't dropped while we didn't have parent lock */
-	if (!ceph_dir_test_complete(dir)) {
-		dout(" lost D_COMPLETE on %p; falling back to mds\n", dir);
+	if (!ceph_dir_is_complete(dir)) {
+		dout(" lost dir complete on %p; falling back to mds\n", dir);
 		err = -EAGAIN;
 		goto out;
 	}
@@ -258,7 +258,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
 	if (filp->f_pos == 0) {
 		/* note dir version at start of readdir so we can tell
 		 * if any dentries get dropped */
-		fi->dir_release_count = ci->i_release_count;
+		fi->dir_release_count = atomic_read(&ci->i_release_count);
 
 		dout("readdir off 0 -> '.'\n");
 		if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0),
@@ -284,7 +284,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
 	if ((filp->f_pos == 2 || fi->dentry) &&
 	    !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
 	    ceph_snap(inode) != CEPH_SNAPDIR &&
-	    ceph_dir_test_complete(inode) &&
+	    __ceph_dir_is_complete(ci) &&
 	    __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
 		spin_unlock(&ci->i_ceph_lock);
 		err = __dcache_readdir(filp, dirent, filldir);
@@ -350,7 +350,8 @@ more:
 
 		if (!req->r_did_prepopulate) {
 			dout("readdir !did_prepopulate");
-			fi->dir_release_count--;    /* preclude D_COMPLETE */
+			/* preclude from marking dir complete */
+			fi->dir_release_count--;
 		}
 
 		/* note next offset and last dentry name */
@@ -428,8 +429,9 @@ more:
 	 * the complete dir contents in our cache.
 	 */
 	spin_lock(&ci->i_ceph_lock);
-	if (ci->i_release_count == fi->dir_release_count) {
-		ceph_dir_set_complete(inode);
+	if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
+		dout(" marking %p complete\n", inode);
+		__ceph_dir_set_complete(ci, fi->dir_release_count);
 		ci->i_max_offset = filp->f_pos;
 	}
 	spin_unlock(&ci->i_ceph_lock);
@@ -604,7 +606,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 			    fsc->mount_options->snapdir_name,
 			    dentry->d_name.len) &&
 		    !is_root_ceph_dentry(dir, dentry) &&
-		    ceph_dir_test_complete(dir) &&
+		    __ceph_dir_is_complete(ci) &&
 		    (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
 			spin_unlock(&ci->i_ceph_lock);
 			dout(" dir %p complete, -ENOENT\n", dir);
@@ -1064,44 +1066,6 @@ static int ceph_snapdir_d_revalidate(struct dentry *dentry,
 	return 1;
 }
 
-/*
- * Set/clear/test dir complete flag on the dir's dentry.
- */
-void ceph_dir_set_complete(struct inode *inode)
-{
-	struct dentry *dentry = d_find_any_alias(inode);
-	
-	if (dentry && ceph_dentry(dentry) &&
-	    ceph_test_mount_opt(ceph_sb_to_client(dentry->d_sb), DCACHE)) {
-		dout(" marking %p (%p) complete\n", inode, dentry);
-		set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
-	}
-	dput(dentry);
-}
-
-void ceph_dir_clear_complete(struct inode *inode)
-{
-	struct dentry *dentry = d_find_any_alias(inode);
-
-	if (dentry && ceph_dentry(dentry)) {
-		dout(" marking %p (%p) complete\n", inode, dentry);
-		set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
-	}
-	dput(dentry);
-}
-
-bool ceph_dir_test_complete(struct inode *inode)
-{
-	struct dentry *dentry = d_find_any_alias(inode);
-
-	if (dentry && ceph_dentry(dentry)) {
-		dout(" marking %p (%p) NOT complete\n", inode, dentry);
-		clear_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
-	}
-	dput(dentry);
-	return false;
-}
-
 /*
  * When the VFS prunes a dentry from the cache, we need to clear the
  * complete flag on the parent directory.
@@ -1110,15 +1074,13 @@ bool ceph_dir_test_complete(struct inode *inode)
  */
 static void ceph_d_prune(struct dentry *dentry)
 {
-	struct ceph_dentry_info *di;
-
 	dout("ceph_d_prune %p\n", dentry);
 
 	/* do we have a valid parent? */
 	if (IS_ROOT(dentry))
 		return;
 
-	/* if we are not hashed, we don't affect D_COMPLETE */
+	/* if we are not hashed, we don't affect dir's completeness */
 	if (d_unhashed(dentry))
 		return;
 
@@ -1126,8 +1088,7 @@ static void ceph_d_prune(struct dentry *dentry)
 	 * we hold d_lock, so d_parent is stable, and d_fsdata is never
 	 * cleared until d_release
 	 */
-	di = ceph_dentry(dentry->d_parent);
-	clear_bit(CEPH_D_COMPLETE, &di->flags);
+	ceph_dir_clear_complete(dentry->d_parent->d_inode);
 }
 
 /*

+ 134 - 107
fs/ceph/file.c

@@ -446,19 +446,35 @@ done:
 }
 
 /*
- * Write commit callback, called if we requested both an ACK and
- * ONDISK commit reply from the OSD.
+ * Write commit request unsafe callback, called to tell us when a
+ * request is unsafe (that is, in flight--has been handed to the
+ * messenger to send to its target osd).  It is called again when
+ * we've received a response message indicating the request is
+ * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request
+ * is completed early (and unsuccessfully) due to a timeout or
+ * interrupt.
+ *
+ * This is used if we requested both an ACK and ONDISK commit reply
+ * from the OSD.
  */
-static void sync_write_commit(struct ceph_osd_request *req,
-			      struct ceph_msg *msg)
+static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
 {
 	struct ceph_inode_info *ci = ceph_inode(req->r_inode);
 
-	dout("sync_write_commit %p tid %llu\n", req, req->r_tid);
-	spin_lock(&ci->i_unsafe_lock);
-	list_del_init(&req->r_unsafe_item);
-	spin_unlock(&ci->i_unsafe_lock);
-	ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+	dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid,
+		unsafe ? "un" : "");
+	if (unsafe) {
+		ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
+		spin_lock(&ci->i_unsafe_lock);
+		list_add_tail(&req->r_unsafe_item,
+			      &ci->i_unsafe_writes);
+		spin_unlock(&ci->i_unsafe_lock);
+	} else {
+		spin_lock(&ci->i_unsafe_lock);
+		list_del_init(&req->r_unsafe_item);
+		spin_unlock(&ci->i_unsafe_lock);
+		ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+	}
 }
 
 /*
@@ -470,36 +486,33 @@ static void sync_write_commit(struct ceph_osd_request *req,
  * objects, rollback on failure, etc.)
  */
 static ssize_t ceph_sync_write(struct file *file, const char __user *data,
-			       size_t left, loff_t *offset)
+			       size_t left, loff_t pos, loff_t *ppos)
 {
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_snap_context *snapc;
+	struct ceph_vino vino;
 	struct ceph_osd_request *req;
+	int num_ops = 1;
 	struct page **pages;
 	int num_pages;
-	long long unsigned pos;
 	u64 len;
 	int written = 0;
 	int flags;
-	int do_sync = 0;
 	int check_caps = 0;
 	int page_align, io_align;
 	unsigned long buf_align;
 	int ret;
 	struct timespec mtime = CURRENT_TIME;
+	bool own_pages = false;
 
 	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 		return -EROFS;
 
-	dout("sync_write on file %p %lld~%u %s\n", file, *offset,
+	dout("sync_write on file %p %lld~%u %s\n", file, pos,
 	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
 
-	if (file->f_flags & O_APPEND)
-		pos = i_size_read(inode);
-	else
-		pos = *offset;
-
 	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
 	if (ret < 0)
 		return ret;
@@ -516,7 +529,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
 	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
 		flags |= CEPH_OSD_FLAG_ACK;
 	else
-		do_sync = 1;
+		num_ops++;	/* Also include a 'startsync' command. */
 
 	/*
 	 * we may need to do multiple writes here if we span an object
@@ -526,25 +539,20 @@ more:
 	io_align = pos & ~PAGE_MASK;
 	buf_align = (unsigned long)data & ~PAGE_MASK;
 	len = left;
-	if (file->f_flags & O_DIRECT) {
-		/* write from beginning of first page, regardless of
-		   io alignment */
-		page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
-		num_pages = calc_pages_for((unsigned long)data, len);
-	} else {
-		page_align = pos & ~PAGE_MASK;
-		num_pages = calc_pages_for(pos, len);
-	}
+
+	snapc = ci->i_snap_realm->cached_context;
+	vino = ceph_vino(inode);
 	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-				    ceph_vino(inode), pos, &len,
-				    CEPH_OSD_OP_WRITE, flags,
-				    ci->i_snap_realm->cached_context,
-				    do_sync,
+				    vino, pos, &len, num_ops,
+				    CEPH_OSD_OP_WRITE, flags, snapc,
 				    ci->i_truncate_seq, ci->i_truncate_size,
-				    &mtime, false, page_align);
+				    false);
 	if (IS_ERR(req))
 		return PTR_ERR(req);
 
+	/* write from beginning of first page, regardless of io alignment */
+	page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
+	num_pages = calc_pages_for(page_align, len);
 	if (file->f_flags & O_DIRECT) {
 		pages = ceph_get_direct_page_vector(data, num_pages, false);
 		if (IS_ERR(pages)) {
@@ -572,36 +580,20 @@ more:
 
 		if ((file->f_flags & O_SYNC) == 0) {
 			/* get a second commit callback */
-			req->r_safe_callback = sync_write_commit;
-			req->r_own_pages = 1;
+			req->r_unsafe_callback = ceph_sync_write_unsafe;
+			req->r_inode = inode;
+			own_pages = true;
 		}
 	}
-	req->r_pages = pages;
-	req->r_num_pages = num_pages;
-	req->r_inode = inode;
+	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
+					false, own_pages);
+
+	/* BUG_ON(vino.snap != CEPH_NOSNAP); */
+	ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 
 	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
-	if (!ret) {
-		if (req->r_safe_callback) {
-			/*
-			 * Add to inode unsafe list only after we
-			 * start_request so that a tid has been assigned.
-			 */
-			spin_lock(&ci->i_unsafe_lock);
-			list_add_tail(&req->r_unsafe_item,
-				      &ci->i_unsafe_writes);
-			spin_unlock(&ci->i_unsafe_lock);
-			ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
-		}
-		
+	if (!ret)
 		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
-		if (ret < 0 && req->r_safe_callback) {
-			spin_lock(&ci->i_unsafe_lock);
-			list_del_init(&req->r_unsafe_item);
-			spin_unlock(&ci->i_unsafe_lock);
-			ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
-		}
-	}
 
 	if (file->f_flags & O_DIRECT)
 		ceph_put_page_vector(pages, num_pages, false);
@@ -614,12 +606,12 @@ out:
 		pos += len;
 		written += len;
 		left -= len;
-		data += written;
+		data += len;
 		if (left)
 			goto more;
 
 		ret = written;
-		*offset = pos;
+		*ppos = pos;
 		if (pos > i_size_read(inode))
 			check_caps = ceph_inode_set_size(inode, pos);
 		if (check_caps)
@@ -653,7 +645,6 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
 	dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
 	     inode, ceph_vinop(inode), pos, (unsigned)len, inode);
 again:
-	__ceph_do_pending_vmtruncate(inode);
 	if (fi->fmode & CEPH_FILE_MODE_LAZY)
 		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
 	else
@@ -717,55 +708,75 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_osd_client *osdc =
 		&ceph_sb_to_client(inode->i_sb)->client->osdc;
-	loff_t endoff = pos + iov->iov_len;
-	int got = 0;
-	int ret, err, written;
+	ssize_t count, written = 0;
+	int err, want, got;
+	bool hold_mutex;
 
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
-retry_snap:
-	written = 0;
-	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
-		return -ENOSPC;
-	__ceph_do_pending_vmtruncate(inode);
+	sb_start_write(inode->i_sb);
+	mutex_lock(&inode->i_mutex);
+	hold_mutex = true;
 
-	/*
-	 * try to do a buffered write.  if we don't have sufficient
-	 * caps, we'll get -EAGAIN from generic_file_aio_write, or a
-	 * short write if we only get caps for some pages.
-	 */
-	if (!(iocb->ki_filp->f_flags & O_DIRECT) &&
-	    !(inode->i_sb->s_flags & MS_SYNCHRONOUS) &&
-	    !(fi->flags & CEPH_F_SYNC)) {
-		ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
-		if (ret >= 0)
-			written = ret;
-
-		if ((ret >= 0 || ret == -EIOCBQUEUED) &&
-		    ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
-		     || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
-			err = vfs_fsync_range(file, pos, pos + written - 1, 1);
-			if (err < 0)
-				ret = err;
-		}
-		if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff)
-			goto out;
+	err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ);
+	if (err)
+		goto out;
+
+	/* We can write back this queue in page reclaim */
+	current->backing_dev_info = file->f_mapping->backing_dev_info;
+
+	err = generic_write_checks(file, &pos, &count, S_ISBLK(inode->i_mode));
+	if (err)
+		goto out;
+
+	if (count == 0)
+		goto out;
+
+	err = file_remove_suid(file);
+	if (err)
+		goto out;
+
+	err = file_update_time(file);
+	if (err)
+		goto out;
+
+retry_snap:
+	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
+		err = -ENOSPC;
+		goto out;
 	}
 
-	dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
-	     inode, ceph_vinop(inode), pos + written,
-	     (unsigned)iov->iov_len - written, inode->i_size);
-	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff);
-	if (ret < 0)
+	dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
+	     inode, ceph_vinop(inode), pos, count, inode->i_size);
+	if (fi->fmode & CEPH_FILE_MODE_LAZY)
+		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+	else
+		want = CEPH_CAP_FILE_BUFFER;
+	got = 0;
+	err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count);
+	if (err < 0)
 		goto out;
 
-	dout("aio_write %p %llx.%llx %llu~%u  got cap refs on %s\n",
-	     inode, ceph_vinop(inode), pos + written,
-	     (unsigned)iov->iov_len - written, ceph_cap_string(got));
-	ret = ceph_sync_write(file, iov->iov_base + written,
-			      iov->iov_len - written, &iocb->ki_pos);
-	if (ret >= 0) {
+	dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
+	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
+
+	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
+	    (iocb->ki_filp->f_flags & O_DIRECT) ||
+	    (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
+	    (fi->flags & CEPH_F_SYNC)) {
+		mutex_unlock(&inode->i_mutex);
+		written = ceph_sync_write(file, iov->iov_base, count,
+					  pos, &iocb->ki_pos);
+	} else {
+		written = generic_file_buffered_write(iocb, iov, nr_segs,
+						      pos, &iocb->ki_pos,
+						      count, 0);
+		mutex_unlock(&inode->i_mutex);
+	}
+	hold_mutex = false;
+
+	if (written >= 0) {
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
@@ -773,18 +784,34 @@ retry_snap:
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
 	}
+
 	dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
-	     inode, ceph_vinop(inode), pos + written,
-	     (unsigned)iov->iov_len - written, ceph_cap_string(got));
+	     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
+	     ceph_cap_string(got));
 	ceph_put_cap_refs(ci, got);
-out:
-	if (ret == -EOLDSNAPC) {
+
+	if (written >= 0 &&
+	    ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) ||
+	     ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
+		err = vfs_fsync_range(file, pos, pos + written - 1, 1);
+		if (err < 0)
+			written = err;
+	}
+
+	if (written == -EOLDSNAPC) {
 		dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
 		     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len);
+		mutex_lock(&inode->i_mutex);
+		hold_mutex = true;
 		goto retry_snap;
 	}
+out:
+	if (hold_mutex)
+		mutex_unlock(&inode->i_mutex);
+	sb_end_write(inode->i_sb);
+	current->backing_dev_info = NULL;
 
-	return ret;
+	return written ? written : err;
 }
 
 /*
@@ -796,7 +823,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
 	int ret;
 
 	mutex_lock(&inode->i_mutex);
-	__ceph_do_pending_vmtruncate(inode);
+	__ceph_do_pending_vmtruncate(inode, false);
 
 	if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
 		ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);

+ 31 - 28
fs/ceph/inode.c

@@ -302,7 +302,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 	ci->i_version = 0;
 	ci->i_time_warp_seq = 0;
 	ci->i_ceph_flags = 0;
-	ci->i_release_count = 0;
+	atomic_set(&ci->i_release_count, 1);
+	atomic_set(&ci->i_complete_count, 0);
 	ci->i_symlink = NULL;
 
 	memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
@@ -561,7 +562,6 @@ static int fill_inode(struct inode *inode,
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int i;
 	int issued = 0, implemented;
-	int updating_inode = 0;
 	struct timespec mtime, atime, ctime;
 	u32 nsplits;
 	struct ceph_buffer *xattr_blob = NULL;
@@ -601,7 +601,6 @@ static int fill_inode(struct inode *inode,
 	    (ci->i_version & ~1) >= le64_to_cpu(info->version))
 		goto no_change;
 	
-	updating_inode = 1;
 	issued = __ceph_caps_issued(ci, &implemented);
 	issued |= implemented | __ceph_caps_dirty(ci);
 
@@ -717,6 +716,17 @@ static int fill_inode(struct inode *inode,
 		       ceph_vinop(inode), inode->i_mode);
 	}
 
+	/* set dir completion flag? */
+	if (S_ISDIR(inode->i_mode) &&
+	    ci->i_files == 0 && ci->i_subdirs == 0 &&
+	    ceph_snap(inode) == CEPH_NOSNAP &&
+	    (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
+	    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
+	    !__ceph_dir_is_complete(ci)) {
+		dout(" marking %p complete (empty)\n", inode);
+		__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
+		ci->i_max_offset = 2;
+	}
 no_change:
 	spin_unlock(&ci->i_ceph_lock);
 
@@ -767,19 +777,6 @@ no_change:
 		__ceph_get_fmode(ci, cap_fmode);
 	}
 
-	/* set dir completion flag? */
-	if (S_ISDIR(inode->i_mode) &&
-	    updating_inode &&                 /* didn't jump to no_change */
-	    ci->i_files == 0 && ci->i_subdirs == 0 &&
-	    ceph_snap(inode) == CEPH_NOSNAP &&
-	    (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
-	    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
-	    !ceph_dir_test_complete(inode)) {
-		dout(" marking %p complete (empty)\n", inode);
-		ceph_dir_set_complete(inode);
-		ci->i_max_offset = 2;
-	}
-
 	/* update delegation info? */
 	if (dirinfo)
 		ceph_fill_dirfrag(inode, dirinfo);
@@ -861,7 +858,7 @@ static void ceph_set_dentry_offset(struct dentry *dn)
 	di = ceph_dentry(dn);
 
 	spin_lock(&ci->i_ceph_lock);
-	if (!ceph_dir_test_complete(inode)) {
+	if (!__ceph_dir_is_complete(ci)) {
 		spin_unlock(&ci->i_ceph_lock);
 		return;
 	}
@@ -1065,8 +1062,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 			/*
 			 * d_move() puts the renamed dentry at the end of
 			 * d_subdirs.  We need to assign it an appropriate
-			 * directory offset so we can behave when holding
-			 * D_COMPLETE.
+			 * directory offset so we can behave when dir is
+			 * complete.
 			 */
 			ceph_set_dentry_offset(req->r_old_dentry);
 			dout("dn %p gets new offset %lld\n", req->r_old_dentry, 
@@ -1457,7 +1454,7 @@ out:
 
 
 /*
- * called by trunc_wq; take i_mutex ourselves
+ * called by trunc_wq;
  *
  * We also truncate in a separate thread as well.
  */
@@ -1468,9 +1465,7 @@ static void ceph_vmtruncate_work(struct work_struct *work)
 	struct inode *inode = &ci->vfs_inode;
 
 	dout("vmtruncate_work %p\n", inode);
-	mutex_lock(&inode->i_mutex);
-	__ceph_do_pending_vmtruncate(inode);
-	mutex_unlock(&inode->i_mutex);
+	__ceph_do_pending_vmtruncate(inode, true);
 	iput(inode);
 }
 
@@ -1494,12 +1489,10 @@ void ceph_queue_vmtruncate(struct inode *inode)
 }
 
 /*
- * called with i_mutex held.
- *
  * Make sure any pending truncation is applied before doing anything
  * that may depend on it.
  */
-void __ceph_do_pending_vmtruncate(struct inode *inode)
+void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	u64 to;
@@ -1532,7 +1525,11 @@ retry:
 	     ci->i_truncate_pending, to);
 	spin_unlock(&ci->i_ceph_lock);
 
+	if (needlock)
+		mutex_lock(&inode->i_mutex);
 	truncate_inode_pages(inode->i_mapping, to);
+	if (needlock)
+		mutex_unlock(&inode->i_mutex);
 
 	spin_lock(&ci->i_ceph_lock);
 	if (to == ci->i_truncate_size) {
@@ -1563,6 +1560,12 @@ static void *ceph_sym_follow_link(struct dentry *dentry, struct nameidata *nd)
 static const struct inode_operations ceph_symlink_iops = {
 	.readlink = generic_readlink,
 	.follow_link = ceph_sym_follow_link,
+	.setattr = ceph_setattr,
+	.getattr = ceph_getattr,
+	.setxattr = ceph_setxattr,
+	.getxattr = ceph_getxattr,
+	.listxattr = ceph_listxattr,
+	.removexattr = ceph_removexattr,
 };
 
 /*
@@ -1585,7 +1588,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
-	__ceph_do_pending_vmtruncate(inode);
+	__ceph_do_pending_vmtruncate(inode, false);
 
 	err = inode_change_ok(inode, attr);
 	if (err != 0)
@@ -1767,7 +1770,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 	     ceph_cap_string(dirtied), mask);
 
 	ceph_mdsc_put_request(req);
-	__ceph_do_pending_vmtruncate(inode);
+	__ceph_do_pending_vmtruncate(inode, false);
 	return err;
 out:
 	spin_unlock(&ci->i_ceph_lock);

+ 3 - 2
fs/ceph/ioctl.c

@@ -208,8 +208,9 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
 
 	snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
 		 ceph_ino(inode), dl.object_no);
-	ceph_calc_object_layout(&pgid, dl.object_name, &ci->i_layout,
-				osdc->osdmap);
+
+	ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap,
+		ceph_file_layout_pg_pool(ci->i_layout));
 
 	dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
 	if (dl.osd >= 0) {

+ 55 - 24
fs/ceph/mds_client.c

@@ -265,7 +265,8 @@ static int parse_reply_info_extra(void **p, void *end,
 {
 	if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
 		return parse_reply_info_filelock(p, end, info, features);
-	else if (info->head->op == CEPH_MDS_OP_READDIR)
+	else if (info->head->op == CEPH_MDS_OP_READDIR ||
+		 info->head->op == CEPH_MDS_OP_LSSNAP)
 		return parse_reply_info_dir(p, end, info, features);
 	else if (info->head->op == CEPH_MDS_OP_CREATE)
 		return parse_reply_info_create(p, end, info, features);
@@ -364,9 +365,9 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
 	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
 	if (atomic_dec_and_test(&s->s_ref)) {
 		if (s->s_auth.authorizer)
-		     s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
-			     s->s_mdsc->fsc->client->monc.auth,
-			     s->s_auth.authorizer);
+			ceph_auth_destroy_authorizer(
+				s->s_mdsc->fsc->client->monc.auth,
+				s->s_auth.authorizer);
 		kfree(s);
 	}
 }
@@ -1196,6 +1197,8 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
 	session->s_trim_caps--;
 	if (oissued) {
 		/* we aren't the only cap.. just remove us */
+		__queue_cap_release(session, ceph_ino(inode), cap->cap_id,
+				    cap->mseq, cap->issue_seq);
 		__ceph_remove_cap(cap);
 	} else {
 		/* try to drop referring dentries */
@@ -1718,8 +1721,12 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 	msg->front.iov_len = p - msg->front.iov_base;
 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
-	msg->pages = req->r_pages;
-	msg->nr_pages = req->r_num_pages;
+	if (req->r_data_len) {
+		/* outbound data set only by ceph_sync_setxattr() */
+		BUG_ON(!req->r_pages);
+		ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0);
+	}
+
 	msg->hdr.data_len = cpu_to_le32(req->r_data_len);
 	msg->hdr.data_off = cpu_to_le16(0);
 
@@ -1913,6 +1920,7 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
 		req = list_entry(tmp_list.next,
 				 struct ceph_mds_request, r_wait);
 		list_del_init(&req->r_wait);
+		dout(" wake request %p tid %llu\n", req, req->r_tid);
 		__do_request(mdsc, req);
 	}
 }
@@ -2026,20 +2034,16 @@ out:
 }
 
 /*
- * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS
+ * Invalidate dir's completeness, dentry lease state on an aborted MDS
  * namespace request.
  */
 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
 {
 	struct inode *inode = req->r_locked_dir;
-	struct ceph_inode_info *ci = ceph_inode(inode);
 
-	dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode);
-	spin_lock(&ci->i_ceph_lock);
-	ceph_dir_clear_complete(inode);
-	ci->i_release_count++;
-	spin_unlock(&ci->i_ceph_lock);
+	dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
 
+	ceph_dir_clear_complete(inode);
 	if (req->r_dentry)
 		ceph_invalidate_dentry_lease(req->r_dentry);
 	if (req->r_old_dentry)
@@ -2599,11 +2603,13 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 			goto fail;
 	}
 
-	reply->pagelist = pagelist;
 	if (recon_state.flock)
 		reply->hdr.version = cpu_to_le16(2);
-	reply->hdr.data_len = cpu_to_le32(pagelist->length);
-	reply->nr_pages = calc_pages_for(0, pagelist->length);
+	if (pagelist->length) {
+		/* set up outbound data if we have any */
+		reply->hdr.data_len = cpu_to_le32(pagelist->length);
+		ceph_msg_data_add_pagelist(reply, pagelist);
+	}
 	ceph_con_send(&session->s_con, reply);
 
 	mutex_unlock(&session->s_mutex);
@@ -3433,13 +3439,17 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
 	struct ceph_auth_handshake *auth = &s->s_auth;
 
 	if (force_new && auth->authorizer) {
-		if (ac->ops && ac->ops->destroy_authorizer)
-			ac->ops->destroy_authorizer(ac, auth->authorizer);
+		ceph_auth_destroy_authorizer(ac, auth->authorizer);
 		auth->authorizer = NULL;
 	}
-	if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
-		int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
-							auth);
+	if (!auth->authorizer) {
+		int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
+						      auth);
+		if (ret)
+			return ERR_PTR(ret);
+	} else {
+		int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
+						      auth);
 		if (ret)
 			return ERR_PTR(ret);
 	}
@@ -3455,7 +3465,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
 	struct ceph_mds_client *mdsc = s->s_mdsc;
 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
 
-	return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len);
+	return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
 }
 
 static int invalidate_authorizer(struct ceph_connection *con)
@@ -3464,12 +3474,32 @@ static int invalidate_authorizer(struct ceph_connection *con)
 	struct ceph_mds_client *mdsc = s->s_mdsc;
 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
 
-	if (ac->ops->invalidate_authorizer)
-		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
+	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
 
 	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
 }
 
+static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
+				struct ceph_msg_header *hdr, int *skip)
+{
+	struct ceph_msg *msg;
+	int type = (int) le16_to_cpu(hdr->type);
+	int front_len = (int) le32_to_cpu(hdr->front_len);
+
+	if (con->in_msg)
+		return con->in_msg;
+
+	*skip = 0;
+	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
+	if (!msg) {
+		pr_err("unable to allocate msg type %d len %d\n",
+		       type, front_len);
+		return NULL;
+	}
+
+	return msg;
+}
+
 static const struct ceph_connection_operations mds_con_ops = {
 	.get = con_get,
 	.put = con_put,
@@ -3478,6 +3508,7 @@ static const struct ceph_connection_operations mds_con_ops = {
 	.verify_authorizer_reply = verify_authorizer_reply,
 	.invalidate_authorizer = invalidate_authorizer,
 	.peer_reset = peer_reset,
+	.alloc_msg = mds_alloc_msg,
 };
 
 /* eof */

+ 5 - 3
fs/ceph/mdsmap.c

@@ -20,7 +20,10 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
 {
 	int n = 0;
 	int i;
-	char r;
+
+	/* special case for one mds */
+	if (1 == m->m_max_mds && m->m_info[0].state > 0)
+		return 0;
 
 	/* count */
 	for (i = 0; i < m->m_max_mds; i++)
@@ -30,8 +33,7 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
 		return -1;
 
 	/* pick */
-	get_random_bytes(&r, 1);
-	n = r % n;
+	n = prandom_u32() % n;
 	i = 0;
 	for (i = 0; n > 0; i++, n--)
 		while (m->m_info[i].state <= 0)

+ 1 - 2
fs/ceph/snap.c

@@ -332,10 +332,9 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 	err = -ENOMEM;
 	if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
 		goto fail;
-	snapc = kzalloc(sizeof(*snapc) + num*sizeof(u64), GFP_NOFS);
+	snapc = ceph_create_snap_context(num, GFP_NOFS);
 	if (!snapc)
 		goto fail;
-	atomic_set(&snapc->nref, 1);
 
 	/* build (reverse sorted) snap vector */
 	num = 0;

+ 5 - 2
fs/ceph/super.c

@@ -479,6 +479,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 		CEPH_FEATURE_FLOCK |
 		CEPH_FEATURE_DIRLAYOUTHASH;
 	const unsigned required_features = 0;
+	int page_count;
+	size_t size;
 	int err = -ENOMEM;
 
 	fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
@@ -522,8 +524,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 
 	/* set up mempools */
 	err = -ENOMEM;
-	fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10,
-			      fsc->mount_options->wsize >> PAGE_CACHE_SHIFT);
+	page_count = fsc->mount_options->wsize >> PAGE_CACHE_SHIFT;
+	size = sizeof (struct page *) * (page_count ? page_count : 1);
+	fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
 	if (!fsc->wb_pagevec_pool)
 		goto fail_trunc_wq;
 

+ 23 - 42
fs/ceph/super.h

@@ -204,7 +204,6 @@ struct ceph_inode_xattr {
  * Ceph dentry state
  */
 struct ceph_dentry_info {
-	unsigned long flags;
 	struct ceph_mds_session *lease_session;
 	u32 lease_gen, lease_shared_gen;
 	u32 lease_seq;
@@ -215,18 +214,6 @@ struct ceph_dentry_info {
 	u64 offset;
 };
 
-/*
- * dentry flags
- *
- * The locking for D_COMPLETE is a bit odd:
- *  - we can clear it at almost any time (see ceph_d_prune)
- *  - it is only meaningful if:
- *    - we hold dir inode i_ceph_lock
- *    - we hold dir FILE_SHARED caps
- *    - the dentry D_COMPLETE is set
- */
-#define CEPH_D_COMPLETE 1  /* if set, d_u.d_subdirs is complete directory */
-
 struct ceph_inode_xattrs_info {
 	/*
 	 * (still encoded) xattr blob. we avoid the overhead of parsing
@@ -257,7 +244,8 @@ struct ceph_inode_info {
 	u32 i_time_warp_seq;
 
 	unsigned i_ceph_flags;
-	unsigned long i_release_count;
+	atomic_t i_release_count;
+	atomic_t i_complete_count;
 
 	struct ceph_dir_layout i_dir_layout;
 	struct ceph_file_layout i_layout;
@@ -267,7 +255,7 @@ struct ceph_inode_info {
 	struct timespec i_rctime;
 	u64 i_rbytes, i_rfiles, i_rsubdirs;
 	u64 i_files, i_subdirs;
-	u64 i_max_offset;  /* largest readdir offset, set with D_COMPLETE */
+	u64 i_max_offset;  /* largest readdir offset, set with complete dir */
 
 	struct rb_root i_fragtree;
 	struct mutex i_fragtree_mutex;
@@ -436,33 +424,31 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 #define CEPH_I_FLUSH     8  /* do not delay flush of dirty metadata */
 #define CEPH_I_NOFLUSH  16  /* do not flush dirty caps */
 
-static inline void ceph_i_clear(struct inode *inode, unsigned mask)
+static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
+					   int release_count)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-
-	spin_lock(&ci->i_ceph_lock);
-	ci->i_ceph_flags &= ~mask;
-	spin_unlock(&ci->i_ceph_lock);
+	atomic_set(&ci->i_complete_count, release_count);
 }
 
-static inline void ceph_i_set(struct inode *inode, unsigned mask)
+static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
+	atomic_inc(&ci->i_release_count);
+}
 
-	spin_lock(&ci->i_ceph_lock);
-	ci->i_ceph_flags |= mask;
-	spin_unlock(&ci->i_ceph_lock);
+static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
+{
+	return atomic_read(&ci->i_complete_count) ==
+		atomic_read(&ci->i_release_count);
 }
 
-static inline bool ceph_i_test(struct inode *inode, unsigned mask)
+static inline void ceph_dir_clear_complete(struct inode *inode)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	bool r;
+	__ceph_dir_clear_complete(ceph_inode(inode));
+}
 
-	spin_lock(&ci->i_ceph_lock);
-	r = (ci->i_ceph_flags & mask) == mask;
-	spin_unlock(&ci->i_ceph_lock);
-	return r;
+static inline bool ceph_dir_is_complete(struct inode *inode)
+{
+	return __ceph_dir_is_complete(ceph_inode(inode));
 }
 
 
@@ -488,13 +474,6 @@ static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
 	return ((loff_t)frag << 32) | (loff_t)off;
 }
 
-/*
- * set/clear directory D_COMPLETE flag
- */
-void ceph_dir_set_complete(struct inode *inode);
-void ceph_dir_clear_complete(struct inode *inode);
-bool ceph_dir_test_complete(struct inode *inode);
-
 /*
  * caps helpers
  */
@@ -584,7 +563,7 @@ struct ceph_file_info {
 	u64 next_offset;       /* offset of next chunk (last_name's + 1) */
 	char *last_name;       /* last entry in previous chunk */
 	struct dentry *dentry; /* next dentry (for dcache readdir) */
-	unsigned long dir_release_count;
+	int dir_release_count;
 
 	/* used for -o dirstat read() on directory thing */
 	char *dir_info;
@@ -713,7 +692,7 @@ extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 extern int ceph_inode_holds_cap(struct inode *inode, int mask);
 
 extern int ceph_inode_set_size(struct inode *inode, loff_t size);
-extern void __ceph_do_pending_vmtruncate(struct inode *inode);
+extern void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock);
 extern void ceph_queue_vmtruncate(struct inode *inode);
 
 extern void ceph_queue_invalidate(struct inode *inode);
@@ -755,6 +734,8 @@ static inline void ceph_remove_cap(struct ceph_cap *cap)
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
 			 struct ceph_cap *cap);
 
+extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino,
+				u64 cap_id, u32 migrate_seq, u32 issue_seq);
 extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
 extern int ceph_fsync(struct file *file, loff_t start, loff_t end,

+ 18 - 0
include/linux/ceph/auth.h

@@ -52,6 +52,9 @@ struct ceph_auth_client_ops {
 	 */
 	int (*create_authorizer)(struct ceph_auth_client *ac, int peer_type,
 				 struct ceph_auth_handshake *auth);
+	/* ensure that an existing authorizer is up to date */
+	int (*update_authorizer)(struct ceph_auth_client *ac, int peer_type,
+				 struct ceph_auth_handshake *auth);
 	int (*verify_authorizer_reply)(struct ceph_auth_client *ac,
 				       struct ceph_authorizer *a, size_t len);
 	void (*destroy_authorizer)(struct ceph_auth_client *ac,
@@ -75,6 +78,8 @@ struct ceph_auth_client {
 	u64 global_id;          /* our unique id in system */
 	const struct ceph_crypto_key *key;     /* our secret key */
 	unsigned want_keys;     /* which services we want */
+
+	struct mutex mutex;
 };
 
 extern struct ceph_auth_client *ceph_auth_init(const char *name,
@@ -94,5 +99,18 @@ extern int ceph_build_auth(struct ceph_auth_client *ac,
 		    void *msg_buf, size_t msg_len);
 
 extern int ceph_auth_is_authenticated(struct ceph_auth_client *ac);
+extern int ceph_auth_create_authorizer(struct ceph_auth_client *ac,
+				       int peer_type,
+				       struct ceph_auth_handshake *auth);
+extern void ceph_auth_destroy_authorizer(struct ceph_auth_client *ac,
+					 struct ceph_authorizer *a);
+extern int ceph_auth_update_authorizer(struct ceph_auth_client *ac,
+				       int peer_type,
+				       struct ceph_auth_handshake *a);
+extern int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
+					     struct ceph_authorizer *a,
+					     size_t len);
+extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac,
+					    int peer_type);
 
 #endif

+ 2 - 0
include/linux/ceph/ceph_features.h

@@ -41,6 +41,7 @@
  */
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  \
 	(CEPH_FEATURE_NOSRCADDR |		\
+	 CEPH_FEATURE_RECONNECT_SEQ |		\
 	 CEPH_FEATURE_PGID64 |			\
 	 CEPH_FEATURE_PGPOOL3 |			\
 	 CEPH_FEATURE_OSDENC |			\
@@ -51,6 +52,7 @@
 
 #define CEPH_FEATURES_REQUIRED_DEFAULT   \
 	(CEPH_FEATURE_NOSRCADDR |	 \
+	 CEPH_FEATURE_RECONNECT_SEQ |	 \
 	 CEPH_FEATURE_PGID64 |		 \
 	 CEPH_FEATURE_PGPOOL3 |		 \
 	 CEPH_FEATURE_OSDENC)

+ 26 - 4
include/linux/ceph/decode.h

@@ -8,6 +8,23 @@
 
 #include <linux/ceph/types.h>
 
+/* This seemed to be the easiest place to define these */
+
+#define	U8_MAX	((u8)(~0U))
+#define	U16_MAX	((u16)(~0U))
+#define	U32_MAX	((u32)(~0U))
+#define	U64_MAX	((u64)(~0ULL))
+
+#define	S8_MAX	((s8)(U8_MAX >> 1))
+#define	S16_MAX	((s16)(U16_MAX >> 1))
+#define	S32_MAX	((s32)(U32_MAX >> 1))
+#define	S64_MAX	((s64)(U64_MAX >> 1LL))
+
+#define	S8_MIN	((s8)(-S8_MAX - 1))
+#define	S16_MIN	((s16)(-S16_MAX - 1))
+#define	S32_MIN	((s32)(-S32_MAX - 1))
+#define	S64_MIN	((s64)(-S64_MAX - 1LL))
+
 /*
  * in all cases,
  *   void **p     pointer to position pointer
@@ -137,14 +154,19 @@ bad:
 static inline void ceph_decode_timespec(struct timespec *ts,
 					const struct ceph_timespec *tv)
 {
-	ts->tv_sec = le32_to_cpu(tv->tv_sec);
-	ts->tv_nsec = le32_to_cpu(tv->tv_nsec);
+	ts->tv_sec = (__kernel_time_t)le32_to_cpu(tv->tv_sec);
+	ts->tv_nsec = (long)le32_to_cpu(tv->tv_nsec);
 }
 static inline void ceph_encode_timespec(struct ceph_timespec *tv,
 					const struct timespec *ts)
 {
-	tv->tv_sec = cpu_to_le32(ts->tv_sec);
-	tv->tv_nsec = cpu_to_le32(ts->tv_nsec);
+	BUG_ON(ts->tv_sec < 0);
+	BUG_ON(ts->tv_sec > (__kernel_time_t)U32_MAX);
+	BUG_ON(ts->tv_nsec < 0);
+	BUG_ON(ts->tv_nsec > (long)U32_MAX);
+
+	tv->tv_sec = cpu_to_le32((u32)ts->tv_sec);
+	tv->tv_nsec = cpu_to_le32((u32)ts->tv_nsec);
 }
 
 /*

+ 6 - 25
include/linux/ceph/libceph.h

@@ -66,6 +66,7 @@ struct ceph_options {
 #define CEPH_OSD_IDLE_TTL_DEFAULT    60
 
 #define CEPH_MSG_MAX_FRONT_LEN	(16*1024*1024)
+#define CEPH_MSG_MAX_MIDDLE_LEN	(16*1024*1024)
 #define CEPH_MSG_MAX_DATA_LEN	(16*1024*1024)
 
 #define CEPH_AUTH_NAME_DEFAULT   "guest"
@@ -156,31 +157,11 @@ struct ceph_snap_context {
 	u64 snaps[];
 };
 
-static inline struct ceph_snap_context *
-ceph_get_snap_context(struct ceph_snap_context *sc)
-{
-	/*
-	printk("get_snap_context %p %d -> %d\n", sc, atomic_read(&sc->nref),
-	       atomic_read(&sc->nref)+1);
-	*/
-	if (sc)
-		atomic_inc(&sc->nref);
-	return sc;
-}
-
-static inline void ceph_put_snap_context(struct ceph_snap_context *sc)
-{
-	if (!sc)
-		return;
-	/*
-	printk("put_snap_context %p %d -> %d\n", sc, atomic_read(&sc->nref),
-	       atomic_read(&sc->nref)-1);
-	*/
-	if (atomic_dec_and_test(&sc->nref)) {
-		/*printk(" deleting snap_context %p\n", sc);*/
-		kfree(sc);
-	}
-}
+extern struct ceph_snap_context *ceph_create_snap_context(u32 snap_count,
+					gfp_t gfp_flags);
+extern struct ceph_snap_context *ceph_get_snap_context(
+					struct ceph_snap_context *sc);
+extern void ceph_put_snap_context(struct ceph_snap_context *sc);
 
 /*
  * calculate the number of pages a given length and offset map onto,

+ 85 - 19
include/linux/ceph/messenger.h

@@ -64,6 +64,77 @@ struct ceph_messenger {
 	u32 required_features;
 };
 
+enum ceph_msg_data_type {
+	CEPH_MSG_DATA_NONE,	/* message contains no data payload */
+	CEPH_MSG_DATA_PAGES,	/* data source/destination is a page array */
+	CEPH_MSG_DATA_PAGELIST,	/* data source/destination is a pagelist */
+#ifdef CONFIG_BLOCK
+	CEPH_MSG_DATA_BIO,	/* data source/destination is a bio list */
+#endif /* CONFIG_BLOCK */
+};
+
+static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
+{
+	switch (type) {
+	case CEPH_MSG_DATA_NONE:
+	case CEPH_MSG_DATA_PAGES:
+	case CEPH_MSG_DATA_PAGELIST:
+#ifdef CONFIG_BLOCK
+	case CEPH_MSG_DATA_BIO:
+#endif /* CONFIG_BLOCK */
+		return true;
+	default:
+		return false;
+	}
+}
+
+struct ceph_msg_data {
+	struct list_head		links;	/* ceph_msg->data */
+	enum ceph_msg_data_type		type;
+	union {
+#ifdef CONFIG_BLOCK
+		struct {
+			struct bio	*bio;
+			size_t		bio_length;
+		};
+#endif /* CONFIG_BLOCK */
+		struct {
+			struct page	**pages;	/* NOT OWNER. */
+			size_t		length;		/* total # bytes */
+			unsigned int	alignment;	/* first page */
+		};
+		struct ceph_pagelist	*pagelist;
+	};
+};
+
+struct ceph_msg_data_cursor {
+	size_t			total_resid;	/* across all data items */
+	struct list_head	*data_head;	/* = &ceph_msg->data */
+
+	struct ceph_msg_data	*data;		/* current data item */
+	size_t			resid;		/* bytes not yet consumed */
+	bool			last_piece;	/* current is last piece */
+	bool			need_crc;	/* crc update needed */
+	union {
+#ifdef CONFIG_BLOCK
+		struct {				/* bio */
+			struct bio	*bio;		/* bio from list */
+			unsigned int	vector_index;	/* vector from bio */
+			unsigned int	vector_offset;	/* bytes from vector */
+		};
+#endif /* CONFIG_BLOCK */
+		struct {				/* pages */
+			unsigned int	page_offset;	/* offset in page */
+			unsigned short	page_index;	/* index in array */
+			unsigned short	page_count;	/* pages in array */
+		};
+		struct {				/* pagelist */
+			struct page	*page;		/* page from list */
+			size_t		offset;		/* bytes from list */
+		};
+	};
+};
+
 /*
  * a single message.  it contains a header (src, dest, message type, etc.),
  * footer (crc values, mainly), a "front" message body, and possibly a
@@ -74,21 +145,15 @@ struct ceph_msg {
 	struct ceph_msg_footer footer;	/* footer */
 	struct kvec front;              /* unaligned blobs of message */
 	struct ceph_buffer *middle;
-	struct page **pages;            /* data payload.  NOT OWNER. */
-	unsigned nr_pages;              /* size of page array */
-	unsigned page_alignment;        /* io offset in first page */
-	struct ceph_pagelist *pagelist; /* instead of pages */
+
+	size_t				data_length;
+	struct list_head		data;
+	struct ceph_msg_data_cursor	cursor;
 
 	struct ceph_connection *con;
-	struct list_head list_head;
+	struct list_head list_head;	/* links for connection lists */
 
 	struct kref kref;
-#ifdef CONFIG_BLOCK
-	struct bio  *bio;		/* instead of pages/pagelist */
-	struct bio  *bio_iter;		/* bio iterator */
-	int bio_seg;			/* current bio segment */
-#endif /* CONFIG_BLOCK */
-	struct ceph_pagelist *trail;	/* the trailing part of the data */
 	bool front_is_vmalloc;
 	bool more_to_follow;
 	bool needs_out_seq;
@@ -98,12 +163,6 @@ struct ceph_msg {
 	struct ceph_msgpool *pool;
 };
 
-struct ceph_msg_pos {
-	int page, page_pos;  /* which page; offset in page */
-	int data_pos;        /* offset in data payload */
-	bool did_page_crc;   /* true if we've calculated crc for current page */
-};
-
 /* ceph connection fault delay defaults, for exponential backoff */
 #define BASE_DELAY_INTERVAL	(HZ/2)
 #define MAX_DELAY_INTERVAL	(5 * 60 * HZ)
@@ -161,7 +220,6 @@ struct ceph_connection {
 	struct ceph_msg *out_msg;        /* sending message (== tail of
 					    out_sent) */
 	bool out_msg_done;
-	struct ceph_msg_pos out_msg_pos;
 
 	struct kvec out_kvec[8],         /* sending header/footer data */
 		*out_kvec_cur;
@@ -175,7 +233,6 @@ struct ceph_connection {
 	/* message in temps */
 	struct ceph_msg_header in_hdr;
 	struct ceph_msg *in_msg;
-	struct ceph_msg_pos in_msg_pos;
 	u32 in_front_crc, in_middle_crc, in_data_crc;  /* calculated crc */
 
 	char in_tag;         /* protocol control byte */
@@ -218,6 +275,15 @@ extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
 
 extern void ceph_con_keepalive(struct ceph_connection *con);
 
+extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
+				size_t length, size_t alignment);
+extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
+				struct ceph_pagelist *pagelist);
+#ifdef CONFIG_BLOCK
+extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
+				size_t length);
+#endif /* CONFIG_BLOCK */
+
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 				     bool can_fail);
 extern void ceph_msg_kfree(struct ceph_msg *m);

+ 1 - 0
include/linux/ceph/msgr.h

@@ -87,6 +87,7 @@ struct ceph_entity_inst {
 #define CEPH_MSGR_TAG_BADPROTOVER  10  /* bad protocol version */
 #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
 #define CEPH_MSGR_TAG_FEATURES      12 /* insufficient features */
+#define CEPH_MSGR_TAG_SEQ           13 /* 64-bit int follows with seen seq number */
 
 
 /*

+ 135 - 69
include/linux/ceph/osd_client.h

@@ -29,6 +29,7 @@ struct ceph_authorizer;
  */
 typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *,
 				     struct ceph_msg *);
+typedef void (*ceph_osdc_unsafe_callback_t)(struct ceph_osd_request *, bool);
 
 /* a given osd we're communicating with */
 struct ceph_osd {
@@ -48,7 +49,67 @@ struct ceph_osd {
 };
 
 
-#define CEPH_OSD_MAX_OP 10
+#define CEPH_OSD_MAX_OP	2
+
+enum ceph_osd_data_type {
+	CEPH_OSD_DATA_TYPE_NONE = 0,
+	CEPH_OSD_DATA_TYPE_PAGES,
+	CEPH_OSD_DATA_TYPE_PAGELIST,
+#ifdef CONFIG_BLOCK
+	CEPH_OSD_DATA_TYPE_BIO,
+#endif /* CONFIG_BLOCK */
+};
+
+struct ceph_osd_data {
+	enum ceph_osd_data_type	type;
+	union {
+		struct {
+			struct page	**pages;
+			u64		length;
+			u32		alignment;
+			bool		pages_from_pool;
+			bool		own_pages;
+		};
+		struct ceph_pagelist	*pagelist;
+#ifdef CONFIG_BLOCK
+		struct {
+			struct bio	*bio;		/* list of bios */
+			size_t		bio_length;	/* total in list */
+		};
+#endif /* CONFIG_BLOCK */
+	};
+};
+
+struct ceph_osd_req_op {
+	u16 op;           /* CEPH_OSD_OP_* */
+	u32 payload_len;
+	union {
+		struct ceph_osd_data raw_data_in;
+		struct {
+			u64 offset, length;
+			u64 truncate_size;
+			u32 truncate_seq;
+			struct ceph_osd_data osd_data;
+		} extent;
+		struct {
+			const char *class_name;
+			const char *method_name;
+			struct ceph_osd_data request_info;
+			struct ceph_osd_data request_data;
+			struct ceph_osd_data response_data;
+			__u8 class_len;
+			__u8 method_len;
+			__u8 argc;
+		} cls;
+		struct {
+			u64 cookie;
+			u64 ver;
+			u32 prot_ver;
+			u32 timeout;
+			__u8 flag;
+		} watch;
+	};
+};
 
 /* an in-flight request */
 struct ceph_osd_request {
@@ -63,15 +124,14 @@ struct ceph_osd_request {
 	int              r_pg_osds[CEPH_PG_MAX_SIZE];
 	int              r_num_pg_osds;
 
-	struct ceph_connection *r_con_filling_msg;
-
 	struct ceph_msg  *r_request, *r_reply;
 	int               r_flags;     /* any additional flags for the osd */
 	u32               r_sent;      /* >0 if r_request is sending/sent */
-	int               r_num_ops;
 
-	/* encoded message content */
-	struct ceph_osd_op *r_request_ops;
+	/* request osd ops array  */
+	unsigned int		r_num_ops;
+	struct ceph_osd_req_op	r_ops[CEPH_OSD_MAX_OP];
+
 	/* these are updated on each send */
 	__le32           *r_request_osdmap_epoch;
 	__le32           *r_request_flags;
@@ -85,12 +145,14 @@ struct ceph_osd_request {
 	s32               r_reply_op_result[CEPH_OSD_MAX_OP];
 	int               r_got_reply;
 	int		  r_linger;
+	int		  r_completed;
 
 	struct ceph_osd_client *r_osdc;
 	struct kref       r_kref;
 	bool              r_mempool;
 	struct completion r_completion, r_safe_completion;
-	ceph_osdc_callback_t r_callback, r_safe_callback;
+	ceph_osdc_callback_t r_callback;
+	ceph_osdc_unsafe_callback_t r_unsafe_callback;
 	struct ceph_eversion r_reassert_version;
 	struct list_head  r_unsafe_item;
 
@@ -104,16 +166,6 @@ struct ceph_osd_request {
 
 	struct ceph_file_layout r_file_layout;
 	struct ceph_snap_context *r_snapc;    /* snap context for writes */
-	unsigned          r_num_pages;        /* size of page array (follows) */
-	unsigned          r_page_alignment;   /* io offset in first page */
-	struct page     **r_pages;            /* pages for data payload */
-	int               r_pages_from_pool;
-	int               r_own_pages;        /* if true, i own page list */
-#ifdef CONFIG_BLOCK
-	struct bio       *r_bio;	      /* instead of pages */
-#endif
-
-	struct ceph_pagelist r_trail;	      /* trailing part of the data */
 };
 
 struct ceph_osd_event {
@@ -172,48 +224,8 @@ struct ceph_osd_client {
 	struct workqueue_struct	*notify_wq;
 };
 
-struct ceph_osd_req_op {
-	u16 op;           /* CEPH_OSD_OP_* */
-	u32 payload_len;
-	union {
-		struct {
-			u64 offset, length;
-			u64 truncate_size;
-			u32 truncate_seq;
-		} extent;
-		struct {
-			const char *name;
-			const char  *val;
-			u32 name_len;
-			u32 value_len;
-			__u8 cmp_op;       /* CEPH_OSD_CMPXATTR_OP_* */
-			__u8 cmp_mode;     /* CEPH_OSD_CMPXATTR_MODE_* */
-		} xattr;
-		struct {
-			const char *class_name;
-			const char *method_name;
-			const char *indata;
-			u32 indata_len;
-			__u8 class_len;
-			__u8 method_len;
-			__u8 argc;
-		} cls;
-		struct {
-			u64 cookie;
-			u64 count;
-		} pgls;
-	        struct {
-		        u64 snapid;
-	        } snap;
-		struct {
-			u64 cookie;
-			u64 ver;
-			u32 prot_ver;
-			u32 timeout;
-			__u8 flag;
-		} watch;
-	};
-};
+extern int ceph_osdc_setup(void);
+extern void ceph_osdc_cleanup(void);
 
 extern int ceph_osdc_init(struct ceph_osd_client *osdc,
 			  struct ceph_client *client);
@@ -224,16 +236,71 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
 				 struct ceph_msg *msg);
 
+extern void osd_req_op_init(struct ceph_osd_request *osd_req,
+					unsigned int which, u16 opcode);
+
+extern void osd_req_op_raw_data_in_pages(struct ceph_osd_request *,
+					unsigned int which,
+					struct page **pages, u64 length,
+					u32 alignment, bool pages_from_pool,
+					bool own_pages);
+
+extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
+					unsigned int which, u16 opcode,
+					u64 offset, u64 length,
+					u64 truncate_size, u32 truncate_seq);
+extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
+					unsigned int which, u64 length);
+
+extern struct ceph_osd_data *osd_req_op_extent_osd_data(
+					struct ceph_osd_request *osd_req,
+					unsigned int which);
+extern struct ceph_osd_data *osd_req_op_cls_response_data(
+					struct ceph_osd_request *osd_req,
+					unsigned int which);
+
+extern void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *,
+					unsigned int which,
+					struct page **pages, u64 length,
+					u32 alignment, bool pages_from_pool,
+					bool own_pages);
+extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *,
+					unsigned int which,
+					struct ceph_pagelist *pagelist);
+#ifdef CONFIG_BLOCK
+extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *,
+					unsigned int which,
+					struct bio *bio, size_t bio_length);
+#endif /* CONFIG_BLOCK */
+
+extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
+					unsigned int which,
+					struct ceph_pagelist *pagelist);
+extern void osd_req_op_cls_request_data_pages(struct ceph_osd_request *,
+					unsigned int which,
+					struct page **pages, u64 length,
+					u32 alignment, bool pages_from_pool,
+					bool own_pages);
+extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
+					unsigned int which,
+					struct page **pages, u64 length,
+					u32 alignment, bool pages_from_pool,
+					bool own_pages);
+
+extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
+					unsigned int which, u16 opcode,
+					const char *class, const char *method);
+extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
+					unsigned int which, u16 opcode,
+					u64 cookie, u64 version, int flag);
+
 extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 					       struct ceph_snap_context *snapc,
-					       unsigned int num_op,
+					       unsigned int num_ops,
 					       bool use_mempool,
 					       gfp_t gfp_flags);
 
-extern void ceph_osdc_build_request(struct ceph_osd_request *req,
-				    u64 off, u64 len,
-				    unsigned int num_op,
-				    struct ceph_osd_req_op *src_ops,
+extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
 				    struct ceph_snap_context *snapc,
 				    u64 snap_id,
 				    struct timespec *mtime);
@@ -241,12 +308,11 @@ extern void ceph_osdc_build_request(struct ceph_osd_request *req,
 extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 				      struct ceph_file_layout *layout,
 				      struct ceph_vino vino,
-				      u64 offset, u64 *len, int op, int flags,
+				      u64 offset, u64 *len,
+				      int num_ops, int opcode, int flags,
 				      struct ceph_snap_context *snapc,
-				      int do_sync, u32 truncate_seq,
-				      u64 truncate_size,
-				      struct timespec *mtime,
-				      bool use_mempool, int page_align);
+				      u32 truncate_seq, u64 truncate_size,
+				      bool use_mempool);
 
 extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 					 struct ceph_osd_request *req);

+ 26 - 4
include/linux/ceph/osdmap.h

@@ -3,6 +3,7 @@
 
 #include <linux/rbtree.h>
 #include <linux/ceph/types.h>
+#include <linux/ceph/decode.h>
 #include <linux/ceph/ceph_fs.h>
 #include <linux/crush/crush.h>
 
@@ -119,6 +120,29 @@ static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
 	return &map->osd_addr[osd];
 }
 
+static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid)
+{
+	__u8 version;
+
+	if (!ceph_has_room(p, end, 1 + 8 + 4 + 4)) {
+		pr_warning("incomplete pg encoding");
+
+		return -EINVAL;
+	}
+	version = ceph_decode_8(p);
+	if (version > 1) {
+		pr_warning("do not understand pg encoding %d > 1",
+			(int)version);
+		return -EINVAL;
+	}
+
+	pgid->pool = ceph_decode_64(p);
+	pgid->seed = ceph_decode_32(p);
+	*p += 4;	/* skip deprecated preferred value */
+
+	return 0;
+}
+
 extern struct ceph_osdmap *osdmap_decode(void **p, void *end);
 extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 					    struct ceph_osdmap *map,
@@ -131,10 +155,8 @@ extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
 					 u64 *bno, u64 *oxoff, u64 *oxlen);
 
 /* calculate mapping of object to a placement group */
-extern int ceph_calc_object_layout(struct ceph_pg *pg,
-				   const char *oid,
-				   struct ceph_file_layout *fl,
-				   struct ceph_osdmap *osdmap);
+extern int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid,
+			  struct ceph_osdmap *osdmap, uint64_t pool);
 extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap,
 			       struct ceph_pg pgid,
 			       int *acting);

+ 1 - 1
net/ceph/Makefile

@@ -11,5 +11,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
 	crypto.o armor.o \
 	auth_x.o \
 	ceph_fs.o ceph_strings.o ceph_hash.o \
-	pagevec.o
+	pagevec.o snapshot.o
 

+ 99 - 18
net/ceph/auth.c

@@ -47,6 +47,7 @@ struct ceph_auth_client *ceph_auth_init(const char *name, const struct ceph_cryp
 	if (!ac)
 		goto out;
 
+	mutex_init(&ac->mutex);
 	ac->negotiating = true;
 	if (name)
 		ac->name = name;
@@ -73,10 +74,12 @@ void ceph_auth_destroy(struct ceph_auth_client *ac)
  */
 void ceph_auth_reset(struct ceph_auth_client *ac)
 {
+	mutex_lock(&ac->mutex);
 	dout("auth_reset %p\n", ac);
 	if (ac->ops && !ac->negotiating)
 		ac->ops->reset(ac);
 	ac->negotiating = true;
+	mutex_unlock(&ac->mutex);
 }
 
 int ceph_entity_name_encode(const char *name, void **p, void *end)
@@ -102,6 +105,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
 	int i, num;
 	int ret;
 
+	mutex_lock(&ac->mutex);
 	dout("auth_build_hello\n");
 	monhdr->have_version = 0;
 	monhdr->session_mon = cpu_to_le16(-1);
@@ -122,15 +126,19 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
 
 	ret = ceph_entity_name_encode(ac->name, &p, end);
 	if (ret < 0)
-		return ret;
+		goto out;
 	ceph_decode_need(&p, end, sizeof(u64), bad);
 	ceph_encode_64(&p, ac->global_id);
 
 	ceph_encode_32(&lenp, p - lenp - sizeof(u32));
-	return p - buf;
+	ret = p - buf;
+out:
+	mutex_unlock(&ac->mutex);
+	return ret;
 
 bad:
-	return -ERANGE;
+	ret = -ERANGE;
+	goto out;
 }
 
 static int ceph_build_auth_request(struct ceph_auth_client *ac,
@@ -151,11 +159,13 @@ static int ceph_build_auth_request(struct ceph_auth_client *ac,
 	if (ret < 0) {
 		pr_err("error %d building auth method %s request\n", ret,
 		       ac->ops->name);
-		return ret;
+		goto out;
 	}
 	dout(" built request %d bytes\n", ret);
 	ceph_encode_32(&p, ret);
-	return p + ret - msg_buf;
+	ret = p + ret - msg_buf;
+out:
+	return ret;
 }
 
 /*
@@ -176,6 +186,7 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac,
 	int result_msg_len;
 	int ret = -EINVAL;
 
+	mutex_lock(&ac->mutex);
 	dout("handle_auth_reply %p %p\n", p, end);
 	ceph_decode_need(&p, end, sizeof(u32) * 3 + sizeof(u64), bad);
 	protocol = ceph_decode_32(&p);
@@ -227,33 +238,103 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac,
 
 	ret = ac->ops->handle_reply(ac, result, payload, payload_end);
 	if (ret == -EAGAIN) {
-		return ceph_build_auth_request(ac, reply_buf, reply_len);
+		ret = ceph_build_auth_request(ac, reply_buf, reply_len);
 	} else if (ret) {
 		pr_err("auth method '%s' error %d\n", ac->ops->name, ret);
-		return ret;
 	}
-	return 0;
 
-bad:
-	pr_err("failed to decode auth msg\n");
 out:
+	mutex_unlock(&ac->mutex);
 	return ret;
+
+bad:
+	pr_err("failed to decode auth msg\n");
+	ret = -EINVAL;
+	goto out;
 }
 
 int ceph_build_auth(struct ceph_auth_client *ac,
 		    void *msg_buf, size_t msg_len)
 {
+	int ret = 0;
+
+	mutex_lock(&ac->mutex);
 	if (!ac->protocol)
-		return ceph_auth_build_hello(ac, msg_buf, msg_len);
-	BUG_ON(!ac->ops);
-	if (ac->ops->should_authenticate(ac))
-		return ceph_build_auth_request(ac, msg_buf, msg_len);
-	return 0;
+		ret = ceph_auth_build_hello(ac, msg_buf, msg_len);
+	else if (ac->ops->should_authenticate(ac))
+		ret = ceph_build_auth_request(ac, msg_buf, msg_len);
+	mutex_unlock(&ac->mutex);
+	return ret;
 }
 
 int ceph_auth_is_authenticated(struct ceph_auth_client *ac)
 {
-	if (!ac->ops)
-		return 0;
-	return ac->ops->is_authenticated(ac);
+	int ret = 0;
+
+	mutex_lock(&ac->mutex);
+	if (ac->ops)
+		ret = ac->ops->is_authenticated(ac);
+	mutex_unlock(&ac->mutex);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_auth_is_authenticated);
+
+int ceph_auth_create_authorizer(struct ceph_auth_client *ac,
+				int peer_type,
+				struct ceph_auth_handshake *auth)
+{
+	int ret = 0;
+
+	mutex_lock(&ac->mutex);
+	if (ac->ops && ac->ops->create_authorizer)
+		ret = ac->ops->create_authorizer(ac, peer_type, auth);
+	mutex_unlock(&ac->mutex);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_auth_create_authorizer);
+
+void ceph_auth_destroy_authorizer(struct ceph_auth_client *ac,
+				  struct ceph_authorizer *a)
+{
+	mutex_lock(&ac->mutex);
+	if (ac->ops && ac->ops->destroy_authorizer)
+		ac->ops->destroy_authorizer(ac, a);
+	mutex_unlock(&ac->mutex);
+}
+EXPORT_SYMBOL(ceph_auth_destroy_authorizer);
+
+int ceph_auth_update_authorizer(struct ceph_auth_client *ac,
+				int peer_type,
+				struct ceph_auth_handshake *a)
+{
+	int ret = 0;
+
+	mutex_lock(&ac->mutex);
+	if (ac->ops && ac->ops->update_authorizer)
+		ret = ac->ops->update_authorizer(ac, peer_type, a);
+	mutex_unlock(&ac->mutex);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_auth_update_authorizer);
+
+int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
+				      struct ceph_authorizer *a, size_t len)
+{
+	int ret = 0;
+
+	mutex_lock(&ac->mutex);
+	if (ac->ops && ac->ops->verify_authorizer_reply)
+		ret = ac->ops->verify_authorizer_reply(ac, a, len);
+	mutex_unlock(&ac->mutex);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_auth_verify_authorizer_reply);
+
+void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac, int peer_type)
+{
+	mutex_lock(&ac->mutex);
+	if (ac->ops && ac->ops->invalidate_authorizer)
+		ac->ops->invalidate_authorizer(ac, peer_type);
+	mutex_unlock(&ac->mutex);
 }
+EXPORT_SYMBOL(ceph_auth_invalidate_authorizer);

+ 23 - 1
net/ceph/auth_x.c

@@ -298,6 +298,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
 			return -ENOMEM;
 	}
 	au->service = th->service;
+	au->secret_id = th->secret_id;
 
 	msg_a = au->buf->vec.iov_base;
 	msg_a->struct_v = 1;
@@ -555,6 +556,26 @@ static int ceph_x_create_authorizer(
 	return 0;
 }
 
+static int ceph_x_update_authorizer(
+	struct ceph_auth_client *ac, int peer_type,
+	struct ceph_auth_handshake *auth)
+{
+	struct ceph_x_authorizer *au;
+	struct ceph_x_ticket_handler *th;
+
+	th = get_ticket_handler(ac, peer_type);
+	if (IS_ERR(th))
+		return PTR_ERR(th);
+
+	au = (struct ceph_x_authorizer *)auth->authorizer;
+	if (au->secret_id < th->secret_id) {
+		dout("ceph_x_update_authorizer service %u secret %llu < %llu\n",
+		     au->service, au->secret_id, th->secret_id);
+		return ceph_x_build_authorizer(ac, th, au);
+	}
+	return 0;
+}
+
 static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
 					  struct ceph_authorizer *a, size_t len)
 {
@@ -630,7 +651,7 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
 
 	th = get_ticket_handler(ac, peer_type);
 	if (!IS_ERR(th))
-		remove_ticket_handler(ac, th);
+		memset(&th->validity, 0, sizeof(th->validity));
 }
 
 
@@ -641,6 +662,7 @@ static const struct ceph_auth_client_ops ceph_x_ops = {
 	.build_request = ceph_x_build_request,
 	.handle_reply = ceph_x_handle_reply,
 	.create_authorizer = ceph_x_create_authorizer,
+	.update_authorizer = ceph_x_update_authorizer,
 	.verify_authorizer_reply = ceph_x_verify_authorizer_reply,
 	.destroy_authorizer = ceph_x_destroy_authorizer,
 	.invalidate_authorizer = ceph_x_invalidate_authorizer,

+ 1 - 0
net/ceph/auth_x.h

@@ -29,6 +29,7 @@ struct ceph_x_authorizer {
 	struct ceph_buffer *buf;
 	unsigned int service;
 	u64 nonce;
+	u64 secret_id;
 	char reply_buf[128];  /* big enough for encrypted blob */
 };
 

+ 7 - 0
net/ceph/ceph_common.c

@@ -606,11 +606,17 @@ static int __init init_ceph_lib(void)
 	if (ret < 0)
 		goto out_crypto;
 
+	ret = ceph_osdc_setup();
+	if (ret < 0)
+		goto out_msgr;
+
 	pr_info("loaded (mon/osd proto %d/%d)\n",
 		CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL);
 
 	return 0;
 
+out_msgr:
+	ceph_msgr_exit();
 out_crypto:
 	ceph_crypto_shutdown();
 out_debugfs:
@@ -622,6 +628,7 @@ out:
 static void __exit exit_ceph_lib(void)
 {
 	dout("exit_ceph_lib\n");
+	ceph_osdc_cleanup();
 	ceph_msgr_exit();
 	ceph_crypto_shutdown();
 	ceph_debugfs_cleanup();

+ 2 - 2
net/ceph/debugfs.c

@@ -123,8 +123,8 @@ static int osdc_show(struct seq_file *s, void *pp)
 	mutex_lock(&osdc->request_mutex);
 	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
 		struct ceph_osd_request *req;
+		unsigned int i;
 		int opcode;
-		int i;
 
 		req = rb_entry(p, struct ceph_osd_request, r_node);
 
@@ -142,7 +142,7 @@ static int osdc_show(struct seq_file *s, void *pp)
 			seq_printf(s, "\t");
 
 		for (i = 0; i < req->r_num_ops; i++) {
-			opcode = le16_to_cpu(req->r_request_ops[i].op);
+			opcode = req->r_ops[i].op;
 			seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
 		}
 

File diff suppressed because it is too large
+ 570 - 212
net/ceph/messenger.c


+ 3 - 4
net/ceph/mon_client.c

@@ -737,7 +737,7 @@ static void delayed_work(struct work_struct *work)
 
 		__validate_auth(monc);
 
-		if (monc->auth->ops->is_authenticated(monc->auth))
+		if (ceph_auth_is_authenticated(monc->auth))
 			__send_subscribe(monc);
 	}
 	__schedule_delayed(monc);
@@ -892,8 +892,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
 
 	mutex_lock(&monc->mutex);
 	had_debugfs_info = have_debugfs_info(monc);
-	if (monc->auth->ops)
-		was_auth = monc->auth->ops->is_authenticated(monc->auth);
+	was_auth = ceph_auth_is_authenticated(monc->auth);
 	monc->pending_auth = 0;
 	ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
 				     msg->front.iov_len,
@@ -904,7 +903,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
 		wake_up_all(&monc->client->auth_wq);
 	} else if (ret > 0) {
 		__send_prepared_auth_request(monc, ret);
-	} else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
+	} else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
 		dout("authenticated, starting session\n");
 
 		monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;

File diff suppressed because it is too large
+ 531 - 253
net/ceph/osd_client.c


+ 11 - 34
net/ceph/osdmap.c

@@ -654,24 +654,6 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
 	return 0;
 }
 
-static int __decode_pgid(void **p, void *end, struct ceph_pg *pg)
-{
-	u8 v;
-
-	ceph_decode_need(p, end, 1+8+4+4, bad);
-	v = ceph_decode_8(p);
-	if (v != 1)
-		goto bad;
-	pg->pool = ceph_decode_64(p);
-	pg->seed = ceph_decode_32(p);
-	*p += 4; /* skip preferred */
-	return 0;
-
-bad:
-	dout("error decoding pgid\n");
-	return -EINVAL;
-}
-
 /*
  * decode a full map.
  */
@@ -765,7 +747,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
 		struct ceph_pg pgid;
 		struct ceph_pg_mapping *pg;
 
-		err = __decode_pgid(p, end, &pgid);
+		err = ceph_decode_pgid(p, end, &pgid);
 		if (err)
 			goto bad;
 		ceph_decode_need(p, end, sizeof(u32), bad);
@@ -983,7 +965,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 		struct ceph_pg pgid;
 		u32 pglen;
 
-		err = __decode_pgid(p, end, &pgid);
+		err = ceph_decode_pgid(p, end, &pgid);
 		if (err)
 			goto bad;
 		ceph_decode_need(p, end, sizeof(u32), bad);
@@ -1111,27 +1093,22 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping);
  * calculate an object layout (i.e. pgid) from an oid,
  * file_layout, and osdmap
  */
-int ceph_calc_object_layout(struct ceph_pg *pg,
-			    const char *oid,
-			    struct ceph_file_layout *fl,
-			    struct ceph_osdmap *osdmap)
+int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid,
+			struct ceph_osdmap *osdmap, uint64_t pool)
 {
-	unsigned int num, num_mask;
-	struct ceph_pg_pool_info *pool;
+	struct ceph_pg_pool_info *pool_info;
 
 	BUG_ON(!osdmap);
-	pg->pool = le32_to_cpu(fl->fl_pg_pool);
-	pool = __lookup_pg_pool(&osdmap->pg_pools, pg->pool);
-	if (!pool)
+	pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool);
+	if (!pool_info)
 		return -EIO;
-	pg->seed = ceph_str_hash(pool->object_hash, oid, strlen(oid));
-	num = pool->pg_num;
-	num_mask = pool->pg_num_mask;
+	pg->pool = pool;
+	pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid));
 
-	dout("calc_object_layout '%s' pgid %lld.%x\n", oid, pg->pool, pg->seed);
+	dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed);
 	return 0;
 }
-EXPORT_SYMBOL(ceph_calc_object_layout);
+EXPORT_SYMBOL(ceph_calc_ceph_pg);
 
 /*
  * Calculate raw osd vector for the given pgid.  Return pointer to osd

+ 78 - 0
net/ceph/snapshot.c

@@ -0,0 +1,78 @@
+/*
+ * snapshot.c    Ceph snapshot context utility routines (part of libceph)
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * version 2 as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+#include <stddef.h>
+
+#include <linux/types.h>
+#include <linux/export.h>
+#include <linux/ceph/libceph.h>
+
+/*
+ * Ceph snapshot contexts are reference counted objects, and the
+ * returned structure holds a single reference.  Acquire additional
+ * references with ceph_get_snap_context(), and release them with
+ * ceph_put_snap_context().  When the reference count reaches zero
+ * the entire structure is freed.
+ */
+
+/*
+ * Create a new ceph snapshot context large enough to hold the
+ * indicated number of snapshot ids (which can be 0).  Caller has
+ * to fill in snapc->seq and snapc->snaps[0..snap_count-1].
+ *
+ * Returns a null pointer if an error occurs.
+ */
+struct ceph_snap_context *ceph_create_snap_context(u32 snap_count,
+						gfp_t gfp_flags)
+{
+	struct ceph_snap_context *snapc;
+	size_t size;
+
+	size = sizeof (struct ceph_snap_context);
+	size += snap_count * sizeof (snapc->snaps[0]);
+	snapc = kzalloc(size, gfp_flags);
+	if (!snapc)
+		return NULL;
+
+	atomic_set(&snapc->nref, 1);
+	snapc->num_snaps = snap_count;
+
+	return snapc;
+}
+EXPORT_SYMBOL(ceph_create_snap_context);
+
+struct ceph_snap_context *ceph_get_snap_context(struct ceph_snap_context *sc)
+{
+	if (sc)
+		atomic_inc(&sc->nref);
+	return sc;
+}
+EXPORT_SYMBOL(ceph_get_snap_context);
+
+void ceph_put_snap_context(struct ceph_snap_context *sc)
+{
+	if (!sc)
+		return;
+	if (atomic_dec_and_test(&sc->nref)) {
+		/*printk(" deleting snap_context %p\n", sc);*/
+		kfree(sc);
+	}
+}
+EXPORT_SYMBOL(ceph_put_snap_context);

Some files were not shown because too many files changed in this diff