Pārlūkot izejas kodu

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph changes from Sage Weil:
 "Lots of stuff this time around:

   - lots of cleanup and refactoring in the libceph messenger code, and
     many hard to hit races and bugs closed as a result.
   - lots of cleanup and refactoring in the rbd code from Alex Elder,
     mostly in preparation for the layering functionality that will be
     coming in 3.7.
   - some misc rbd cleanups from Josh Durgin that are finally going
     upstream
   - support for CRUSH tunables (used by newer clusters to improve the
     data placement)
   - some cleanup in our use of d_parent that Al brought up a while back
   - a random collection of fixes across the tree

  There is another patch coming that fixes up our ->atomic_open()
  behavior, but I'm going to hammer on it a bit more before sending it."

Fix up conflicts due to commits that were already committed earlier in
drivers/block/rbd.c, net/ceph/{messenger.c, osd_client.c}

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (132 commits)
  rbd: create rbd_refresh_helper()
  rbd: return obj version in __rbd_refresh_header()
  rbd: fixes in rbd_header_from_disk()
  rbd: always pass ops array to rbd_req_sync_op()
  rbd: pass null version pointer in add_snap()
  rbd: make rbd_create_rw_ops() return a pointer
  rbd: have __rbd_add_snap_dev() return a pointer
  libceph: recheck con state after allocating incoming message
  libceph: change ceph_con_in_msg_alloc convention to be less weird
  libceph: avoid dropping con mutex before fault
  libceph: verify state after retaking con lock after dispatch
  libceph: revoke mon_client messages on session restart
  libceph: fix handling of immediate socket connect failure
  ceph: update MAINTAINERS file
  libceph: be less chatty about stray replies
  libceph: clear all flags on con_close
  libceph: clean up con flags
  libceph: replace connection state bits with states
  libceph: drop unnecessary CLOSED check in socket state change callback
  libceph: close socket directly from ceph_con_close()
  ...
Linus Torvalds 13 gadi atpakaļ
vecāks
revīzija
cc8362b1f6

+ 8 - 2
Documentation/ABI/testing/sysfs-bus-rbd

@@ -35,8 +35,14 @@ name
 
 pool
 
-	The pool where this rbd image resides. The pool-name pair is unique
-	per rados system.
+	The name of the storage pool where this rbd image resides.
+	An rbd image name is unique within its pool.
+
+pool_id
+
+	The unique identifier for the rbd image's pool.  This is
+	a permanent attribute of the pool.  A pool's id will never
+	change.
 
 size
 

+ 8 - 5
MAINTAINERS

@@ -1789,15 +1789,16 @@ F:	arch/powerpc/oprofile/*cell*
 F:	arch/powerpc/platforms/cell/
 
 CEPH DISTRIBUTED FILE SYSTEM CLIENT
-M:	Sage Weil <sage@newdream.net>
+M:	Sage Weil <sage@inktank.com>
 L:	ceph-devel@vger.kernel.org
-W:	http://ceph.newdream.net/
+W:	http://ceph.com/
 T:	git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
 S:	Supported
 F:	Documentation/filesystems/ceph.txt
 F:	fs/ceph
 F:	net/ceph
 F:	include/linux/ceph
+F:	include/linux/crush
 
 CERTIFIED WIRELESS USB (WUSB) SUBSYSTEM:
 L:	linux-usb@vger.kernel.org
@@ -5639,10 +5640,12 @@ S:	Supported
 F:	arch/hexagon/
 
 RADOS BLOCK DEVICE (RBD)
-F:	include/linux/qnxtypes.h
-M:	Yehuda Sadeh <yehuda@hq.newdream.net>
-M:	Sage Weil <sage@newdream.net>
+M:	Yehuda Sadeh <yehuda@inktank.com>
+M:	Sage Weil <sage@inktank.com>
+M:	Alex Elder <elder@inktank.com>
 M:	ceph-devel@vger.kernel.org
+W:	http://ceph.com/
+T:	git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
 S:	Supported
 F:	drivers/block/rbd.c
 F:	drivers/block/rbd_types.h

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 232 - 227
drivers/block/rbd.c


+ 0 - 1
drivers/block/rbd_types.h

@@ -31,7 +31,6 @@
 #define RBD_MIN_OBJ_ORDER       16
 #define RBD_MAX_OBJ_ORDER       30
 
-#define RBD_MAX_OBJ_NAME_LEN	96
 #define RBD_MAX_SEG_NAME_LEN	128
 
 #define RBD_COMP_NONE		0

+ 3 - 4
fs/ceph/dir.c

@@ -51,8 +51,7 @@ int ceph_init_dentry(struct dentry *dentry)
 		goto out_unlock;
 	}
 
-	if (dentry->d_parent == NULL ||   /* nfs fh_to_dentry */
-	    ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
+	if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
 		d_set_d_op(dentry, &ceph_dentry_ops);
 	else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR)
 		d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
@@ -79,7 +78,7 @@ struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
 		return NULL;
 
 	spin_lock(&dentry->d_lock);
-	if (dentry->d_parent) {
+	if (!IS_ROOT(dentry)) {
 		inode = dentry->d_parent->d_inode;
 		ihold(inode);
 	}
@@ -1154,7 +1153,7 @@ static void ceph_d_prune(struct dentry *dentry)
 	dout("ceph_d_prune %p\n", dentry);
 
 	/* do we have a valid parent? */
-	if (!dentry->d_parent || IS_ROOT(dentry))
+	if (IS_ROOT(dentry))
 		return;
 
 	/* if we are not hashed, we don't affect D_COMPLETE */

+ 6 - 17
fs/ceph/mds_client.c

@@ -10,6 +10,7 @@
 #include "super.h"
 #include "mds_client.h"
 
+#include <linux/ceph/ceph_features.h>
 #include <linux/ceph/messenger.h>
 #include <linux/ceph/decode.h>
 #include <linux/ceph/pagelist.h>
@@ -394,11 +395,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	s->s_seq = 0;
 	mutex_init(&s->s_mutex);
 
-	ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
-	s->s_con.private = s;
-	s->s_con.ops = &mds_con_ops;
-	s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
-	s->s_con.peer_name.num = cpu_to_le64(mds);
+	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 
 	spin_lock_init(&s->s_gen_ttl_lock);
 	s->s_cap_gen = 0;
@@ -440,7 +437,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	mdsc->sessions[mds] = s;
 	atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 
-	ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
+	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
+		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 
 	return s;
 
@@ -1472,11 +1470,6 @@ retry:
 		else
 			len += 1 + temp->d_name.len;
 		temp = temp->d_parent;
-		if (temp == NULL) {
-			rcu_read_unlock();
-			pr_err("build_path corrupt dentry %p\n", dentry);
-			return ERR_PTR(-EINVAL);
-		}
 	}
 	rcu_read_unlock();
 	if (len)
@@ -1513,12 +1506,6 @@ retry:
 		if (pos)
 			path[--pos] = '/';
 		temp = temp->d_parent;
-		if (temp == NULL) {
-			rcu_read_unlock();
-			pr_err("build_path corrupt dentry\n");
-			kfree(path);
-			return ERR_PTR(-EINVAL);
-		}
 	}
 	rcu_read_unlock();
 	if (pos != 0 || read_seqretry(&rename_lock, seq)) {
@@ -2531,7 +2518,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
 	session->s_seq = 0;
 
+	ceph_con_close(&session->s_con);
 	ceph_con_open(&session->s_con,
+		      CEPH_ENTITY_TYPE_MDS, mds,
 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 
 	/* replay unsafe requests */

+ 10 - 8
fs/ceph/snap.c

@@ -296,8 +296,7 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 	struct ceph_snap_realm *parent = realm->parent;
 	struct ceph_snap_context *snapc;
 	int err = 0;
-	int i;
-	int num = realm->num_prior_parent_snaps + realm->num_snaps;
+	u32 num = realm->num_prior_parent_snaps + realm->num_snaps;
 
 	/*
 	 * build parent context, if it hasn't been built.
@@ -321,11 +320,11 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 	    realm->cached_context->seq == realm->seq &&
 	    (!parent ||
 	     realm->cached_context->seq >= parent->cached_context->seq)) {
-		dout("build_snap_context %llx %p: %p seq %lld (%d snaps)"
+		dout("build_snap_context %llx %p: %p seq %lld (%u snaps)"
 		     " (unchanged)\n",
 		     realm->ino, realm, realm->cached_context,
 		     realm->cached_context->seq,
-		     realm->cached_context->num_snaps);
+		     (unsigned int) realm->cached_context->num_snaps);
 		return 0;
 	}
 
@@ -342,6 +341,8 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 	num = 0;
 	snapc->seq = realm->seq;
 	if (parent) {
+		u32 i;
+
 		/* include any of parent's snaps occurring _after_ my
 		   parent became my parent */
 		for (i = 0; i < parent->cached_context->num_snaps; i++)
@@ -361,8 +362,9 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 
 	sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
 	snapc->num_snaps = num;
-	dout("build_snap_context %llx %p: %p seq %lld (%d snaps)\n",
-	     realm->ino, realm, snapc, snapc->seq, snapc->num_snaps);
+	dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n",
+	     realm->ino, realm, snapc, snapc->seq,
+	     (unsigned int) snapc->num_snaps);
 
 	if (realm->cached_context)
 		ceph_put_snap_context(realm->cached_context);
@@ -402,9 +404,9 @@ static void rebuild_snap_realms(struct ceph_snap_realm *realm)
  * helper to allocate and decode an array of snapids.  free prior
  * instance, if any.
  */
-static int dup_array(u64 **dst, __le64 *src, int num)
+static int dup_array(u64 **dst, __le64 *src, u32 num)
 {
-	int i;
+	u32 i;
 
 	kfree(*dst);
 	if (num) {

+ 1 - 0
fs/ceph/super.c

@@ -18,6 +18,7 @@
 #include "super.h"
 #include "mds_client.h"
 
+#include <linux/ceph/ceph_features.h>
 #include <linux/ceph/decode.h>
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/auth.h>

+ 2 - 2
fs/ceph/super.h

@@ -612,9 +612,9 @@ struct ceph_snap_realm {
 	u64 parent_since;   /* snapid when our current parent became so */
 
 	u64 *prior_parent_snaps;      /* snaps inherited from any parents we */
-	int num_prior_parent_snaps;   /*  had prior to parent_since */
+	u32 num_prior_parent_snaps;   /*  had prior to parent_since */
 	u64 *snaps;                   /* snaps specific to this realm */
-	int num_snaps;
+	u32 num_snaps;
 
 	struct ceph_snap_realm *parent;
 	struct list_head children;       /* list of child realms */

+ 1 - 0
fs/ceph/xattr.c

@@ -457,6 +457,7 @@ start:
 			for (i = 0; i < numattr; i++)
 				kfree(xattrs[i]);
 			kfree(xattrs);
+			xattrs = NULL;
 			goto start;
 		}
 		err = -EIO;

+ 27 - 0
include/linux/ceph/ceph_features.h

@@ -0,0 +1,27 @@
+#ifndef __CEPH_FEATURES
+#define __CEPH_FEATURES
+
+/*
+ * feature bits
+ */
+#define CEPH_FEATURE_UID            (1<<0)
+#define CEPH_FEATURE_NOSRCADDR      (1<<1)
+#define CEPH_FEATURE_MONCLOCKCHECK  (1<<2)
+#define CEPH_FEATURE_FLOCK          (1<<3)
+#define CEPH_FEATURE_SUBSCRIBE2     (1<<4)
+#define CEPH_FEATURE_MONNAMES       (1<<5)
+#define CEPH_FEATURE_RECONNECT_SEQ  (1<<6)
+#define CEPH_FEATURE_DIRLAYOUTHASH  (1<<7)
+/* bits 8-17 defined by user-space; not supported yet here */
+#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18)
+
+/*
+ * Features supported.
+ */
+#define CEPH_FEATURES_SUPPORTED_DEFAULT  \
+	(CEPH_FEATURE_NOSRCADDR |	 \
+	 CEPH_FEATURE_CRUSH_TUNABLES)
+
+#define CEPH_FEATURES_REQUIRED_DEFAULT   \
+	(CEPH_FEATURE_NOSRCADDR)
+#endif

+ 0 - 14
include/linux/ceph/ceph_fs.h

@@ -35,20 +35,6 @@
 /* arbitrary limit on max # of monitors (cluster of 3 is typical) */
 #define CEPH_MAX_MON   31
 
-
-/*
- * feature bits
- */
-#define CEPH_FEATURE_UID            (1<<0)
-#define CEPH_FEATURE_NOSRCADDR      (1<<1)
-#define CEPH_FEATURE_MONCLOCKCHECK  (1<<2)
-#define CEPH_FEATURE_FLOCK          (1<<3)
-#define CEPH_FEATURE_SUBSCRIBE2     (1<<4)
-#define CEPH_FEATURE_MONNAMES       (1<<5)
-#define CEPH_FEATURE_RECONNECT_SEQ  (1<<6)
-#define CEPH_FEATURE_DIRLAYOUTHASH  (1<<7)
-
-
 /*
  * ceph_file_layout - describe data layout for a file/inode
  */

+ 48 - 1
include/linux/ceph/decode.h

@@ -1,6 +1,7 @@
 #ifndef __CEPH_DECODE_H
 #define __CEPH_DECODE_H
 
+#include <linux/err.h>
 #include <linux/bug.h>
 #include <linux/time.h>
 #include <asm/unaligned.h>
@@ -84,6 +85,52 @@ static inline int ceph_has_room(void **p, void *end, size_t n)
 		ceph_decode_copy(p, pv, n);			\
 	} while (0)
 
+/*
+ * Allocate a buffer big enough to hold the wire-encoded string, and
+ * decode the string into it.  The resulting string will always be
+ * terminated with '\0'.  If successful, *p will be advanced
+ * past the decoded data.  Also, if lenp is not a null pointer, the
+ * length (not including the terminating '\0') will be recorded in
+ * *lenp.  Note that a zero-length string is a valid return value.
+ *
+ * Returns a pointer to the newly-allocated string buffer, or a
+ * pointer-coded errno if an error occurs.  Neither *p nor *lenp
+ * will have been updated if an error is returned.
+ *
+ * There are two possible failures:
+ *   - converting the string would require accessing memory at or
+ *     beyond the "end" pointer provided (-E
+ *   - memory could not be allocated for the result
+ */
+static inline char *ceph_extract_encoded_string(void **p, void *end,
+						size_t *lenp, gfp_t gfp)
+{
+	u32 len;
+	void *sp = *p;
+	char *buf;
+
+	ceph_decode_32_safe(&sp, end, len, bad);
+	if (!ceph_has_room(&sp, end, len))
+		goto bad;
+
+	buf = kmalloc(len + 1, gfp);
+	if (!buf)
+		return ERR_PTR(-ENOMEM);
+
+	if (len)
+		memcpy(buf, sp, len);
+	buf[len] = '\0';
+
+	*p = (char *) *p + sizeof (u32) + len;
+	if (lenp)
+		*lenp = (size_t) len;
+
+	return buf;
+
+bad:
+	return ERR_PTR(-ERANGE);
+}
+
 /*
  * struct ceph_timespec <-> struct timespec
  */
@@ -151,7 +198,7 @@ static inline void ceph_encode_filepath(void **p, void *end,
 					u64 ino, const char *path)
 {
 	u32 len = path ? strlen(path) : 0;
-	BUG_ON(*p + sizeof(ino) + sizeof(len) + len > end);
+	BUG_ON(*p + 1 + sizeof(ino) + sizeof(len) + len > end);
 	ceph_encode_8(p, 1);
 	ceph_encode_64(p, ino);
 	ceph_encode_32(p, len);

+ 2 - 8
include/linux/ceph/libceph.h

@@ -22,12 +22,6 @@
 #include "osd_client.h"
 #include "ceph_fs.h"
 
-/*
- * Supported features
- */
-#define CEPH_FEATURE_SUPPORTED_DEFAULT CEPH_FEATURE_NOSRCADDR
-#define CEPH_FEATURE_REQUIRED_DEFAULT  CEPH_FEATURE_NOSRCADDR
-
 /*
  * mount options
  */
@@ -132,7 +126,7 @@ struct ceph_client {
 	u32 supported_features;
 	u32 required_features;
 
-	struct ceph_messenger *msgr;   /* messenger instance */
+	struct ceph_messenger msgr;   /* messenger instance */
 	struct ceph_mon_client monc;
 	struct ceph_osd_client osdc;
 
@@ -160,7 +154,7 @@ struct ceph_client {
 struct ceph_snap_context {
 	atomic_t nref;
 	u64 seq;
-	int num_snaps;
+	u32 num_snaps;
 	u64 snaps[];
 };
 

+ 25 - 35
include/linux/ceph/messenger.h

@@ -31,9 +31,6 @@ struct ceph_connection_operations {
 	int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
 	int (*invalidate_authorizer)(struct ceph_connection *con);
 
-	/* protocol version mismatch */
-	void (*bad_proto) (struct ceph_connection *con);
-
 	/* there was some error on the socket (disconnect, whatever) */
 	void (*fault) (struct ceph_connection *con);
 
@@ -53,6 +50,7 @@ struct ceph_messenger {
 	struct ceph_entity_inst inst;    /* my name+address */
 	struct ceph_entity_addr my_enc_addr;
 
+	atomic_t stopping;
 	bool nocrc;
 
 	/*
@@ -80,7 +78,10 @@ struct ceph_msg {
 	unsigned nr_pages;              /* size of page array */
 	unsigned page_alignment;        /* io offset in first page */
 	struct ceph_pagelist *pagelist; /* instead of pages */
+
+	struct ceph_connection *con;
 	struct list_head list_head;
+
 	struct kref kref;
 	struct bio  *bio;		/* instead of pages/pagelist */
 	struct bio  *bio_iter;		/* bio iterator */
@@ -105,23 +106,6 @@ struct ceph_msg_pos {
 #define BASE_DELAY_INTERVAL	(HZ/2)
 #define MAX_DELAY_INTERVAL	(5 * 60 * HZ)
 
-/*
- * ceph_connection state bit flags
- */
-#define LOSSYTX         0  /* we can close channel or drop messages on errors */
-#define CONNECTING	1
-#define NEGOTIATING	2
-#define KEEPALIVE_PENDING      3
-#define WRITE_PENDING	4  /* we have data ready to send */
-#define STANDBY		8  /* no outgoing messages, socket closed.  we keep
-			    * the ceph_connection around to maintain shared
-			    * state with the peer. */
-#define CLOSED		10 /* we've closed the connection */
-#define SOCK_CLOSED	11 /* socket state changed to closed */
-#define OPENING         13 /* open connection w/ (possibly new) peer */
-#define DEAD            14 /* dead, about to kfree */
-#define BACKOFF         15
-
 /*
  * A single connection with another host.
  *
@@ -131,18 +115,22 @@ struct ceph_msg_pos {
  */
 struct ceph_connection {
 	void *private;
-	atomic_t nref;
 
 	const struct ceph_connection_operations *ops;
 
 	struct ceph_messenger *msgr;
+
+	atomic_t sock_state;
 	struct socket *sock;
-	unsigned long state;	/* connection state (see flags above) */
+	struct ceph_entity_addr peer_addr; /* peer address */
+	struct ceph_entity_addr peer_addr_for_me;
+
+	unsigned long flags;
+	unsigned long state;
 	const char *error_msg;  /* error message, if any */
 
-	struct ceph_entity_addr peer_addr; /* peer address */
 	struct ceph_entity_name peer_name; /* peer name */
-	struct ceph_entity_addr peer_addr_for_me;
+
 	unsigned peer_features;
 	u32 connect_seq;      /* identify the most recent connection
 				 attempt for this connection, client */
@@ -207,24 +195,26 @@ extern int ceph_msgr_init(void);
 extern void ceph_msgr_exit(void);
 extern void ceph_msgr_flush(void);
 
-extern struct ceph_messenger *ceph_messenger_create(
-	struct ceph_entity_addr *myaddr,
-	u32 features, u32 required);
-extern void ceph_messenger_destroy(struct ceph_messenger *);
+extern void ceph_messenger_init(struct ceph_messenger *msgr,
+			struct ceph_entity_addr *myaddr,
+			u32 supported_features,
+			u32 required_features,
+			bool nocrc);
 
-extern void ceph_con_init(struct ceph_messenger *msgr,
-			  struct ceph_connection *con);
+extern void ceph_con_init(struct ceph_connection *con, void *private,
+			const struct ceph_connection_operations *ops,
+			struct ceph_messenger *msgr);
 extern void ceph_con_open(struct ceph_connection *con,
+			  __u8 entity_type, __u64 entity_num,
 			  struct ceph_entity_addr *addr);
 extern bool ceph_con_opened(struct ceph_connection *con);
 extern void ceph_con_close(struct ceph_connection *con);
 extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
-extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
-extern void ceph_con_revoke_message(struct ceph_connection *con,
-				  struct ceph_msg *msg);
+
+extern void ceph_msg_revoke(struct ceph_msg *msg);
+extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
+
 extern void ceph_con_keepalive(struct ceph_connection *con);
-extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
-extern void ceph_con_put(struct ceph_connection *con);
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 				     bool can_fail);

+ 1 - 1
include/linux/ceph/mon_client.h

@@ -70,7 +70,7 @@ struct ceph_mon_client {
 	bool hunting;
 	int cur_mon;                       /* last monitor i contacted */
 	unsigned long sub_sent, sub_renew_after;
-	struct ceph_connection *con;
+	struct ceph_connection con;
 	bool have_fsid;
 
 	/* pending generic requests */

+ 2 - 1
include/linux/ceph/msgpool.h

@@ -11,10 +11,11 @@
 struct ceph_msgpool {
 	const char *name;
 	mempool_t *pool;
+	int type;               /* preallocated message type */
 	int front_len;          /* preallocated payload size */
 };
 
-extern int ceph_msgpool_init(struct ceph_msgpool *pool,
+extern int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
 			     int front_len, int size, bool blocking,
 			     const char *name);
 extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);

+ 8 - 0
include/linux/crush/crush.h

@@ -154,6 +154,14 @@ struct crush_map {
 	__s32 max_buckets;
 	__u32 max_rules;
 	__s32 max_devices;
+
+	/* choose local retries before re-descent */
+	__u32 choose_local_tries;
+	/* choose local attempts using a fallback permutation before
+	 * re-descent */
+	__u32 choose_local_fallback_tries;
+	/* choose attempts before giving up */ 
+	__u32 choose_total_tries;
 };
 
 

+ 10 - 15
net/ceph/ceph_common.c

@@ -17,6 +17,7 @@
 #include <linux/string.h>
 
 
+#include <linux/ceph/ceph_features.h>
 #include <linux/ceph/libceph.h>
 #include <linux/ceph/debugfs.h>
 #include <linux/ceph/decode.h>
@@ -460,27 +461,23 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
 	client->auth_err = 0;
 
 	client->extra_mon_dispatch = NULL;
-	client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT |
+	client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
 		supported_features;
-	client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT |
+	client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT |
 		required_features;
 
 	/* msgr */
 	if (ceph_test_opt(client, MYIP))
 		myaddr = &client->options->my_addr;
-	client->msgr = ceph_messenger_create(myaddr,
-					     client->supported_features,
-					     client->required_features);
-	if (IS_ERR(client->msgr)) {
-		err = PTR_ERR(client->msgr);
-		goto fail;
-	}
-	client->msgr->nocrc = ceph_test_opt(client, NOCRC);
+	ceph_messenger_init(&client->msgr, myaddr,
+		client->supported_features,
+		client->required_features,
+		ceph_test_opt(client, NOCRC));
 
 	/* subsystems */
 	err = ceph_monc_init(&client->monc, client);
 	if (err < 0)
-		goto fail_msgr;
+		goto fail;
 	err = ceph_osdc_init(&client->osdc, client);
 	if (err < 0)
 		goto fail_monc;
@@ -489,8 +486,6 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
 
 fail_monc:
 	ceph_monc_stop(&client->monc);
-fail_msgr:
-	ceph_messenger_destroy(client->msgr);
 fail:
 	kfree(client);
 	return ERR_PTR(err);
@@ -501,6 +496,8 @@ void ceph_destroy_client(struct ceph_client *client)
 {
 	dout("destroy_client %p\n", client);
 
+	atomic_set(&client->msgr.stopping, 1);
+
 	/* unmount */
 	ceph_osdc_stop(&client->osdc);
 
@@ -508,8 +505,6 @@ void ceph_destroy_client(struct ceph_client *client)
 
 	ceph_debugfs_client_cleanup(client);
 
-	ceph_messenger_destroy(client->msgr);
-
 	ceph_destroy_options(client->options);
 
 	kfree(client);

+ 7 - 6
net/ceph/crush/mapper.c

@@ -306,7 +306,6 @@ static int crush_choose(const struct crush_map *map,
 	int item = 0;
 	int itemtype;
 	int collide, reject;
-	const unsigned int orig_tries = 5; /* attempts before we fall back to search */
 
 	dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
 		bucket->id, x, outpos, numrep);
@@ -351,8 +350,9 @@ static int crush_choose(const struct crush_map *map,
 					reject = 1;
 					goto reject;
 				}
-				if (flocal >= (in->size>>1) &&
-				    flocal > orig_tries)
+				if (map->choose_local_fallback_tries > 0 &&
+				    flocal >= (in->size>>1) &&
+				    flocal > map->choose_local_fallback_tries)
 					item = bucket_perm_choose(in, x, r);
 				else
 					item = crush_bucket_choose(in, x, r);
@@ -422,13 +422,14 @@ reject:
 					ftotal++;
 					flocal++;
 
-					if (collide && flocal < 3)
+					if (collide && flocal <= map->choose_local_tries)
 						/* retry locally a few times */
 						retry_bucket = 1;
-					else if (flocal <= in->size + orig_tries)
+					else if (map->choose_local_fallback_tries > 0 &&
+						 flocal <= in->size + map->choose_local_fallback_tries)
 						/* exhaustive bucket search */
 						retry_bucket = 1;
-					else if (ftotal < 20)
+					else if (ftotal <= map->choose_total_tries)
 						/* then retry descent */
 						retry_descent = 1;
 					else

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 353 - 209
net/ceph/messenger.c


+ 43 - 33
net/ceph/mon_client.c

@@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
 	monc->pending_auth = 1;
 	monc->m_auth->front.iov_len = len;
 	monc->m_auth->hdr.front_len = cpu_to_le32(len);
-	ceph_con_revoke(monc->con, monc->m_auth);
+	ceph_msg_revoke(monc->m_auth);
 	ceph_msg_get(monc->m_auth);  /* keep our ref */
-	ceph_con_send(monc->con, monc->m_auth);
+	ceph_con_send(&monc->con, monc->m_auth);
 }
 
 /*
@@ -117,8 +117,11 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
 static void __close_session(struct ceph_mon_client *monc)
 {
 	dout("__close_session closing mon%d\n", monc->cur_mon);
-	ceph_con_revoke(monc->con, monc->m_auth);
-	ceph_con_close(monc->con);
+	ceph_msg_revoke(monc->m_auth);
+	ceph_msg_revoke_incoming(monc->m_auth_reply);
+	ceph_msg_revoke(monc->m_subscribe);
+	ceph_msg_revoke_incoming(monc->m_subscribe_ack);
+	ceph_con_close(&monc->con);
 	monc->cur_mon = -1;
 	monc->pending_auth = 0;
 	ceph_auth_reset(monc->auth);
@@ -142,9 +145,8 @@ static int __open_session(struct ceph_mon_client *monc)
 		monc->want_next_osdmap = !!monc->want_next_osdmap;
 
 		dout("open_session mon%d opening\n", monc->cur_mon);
-		monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
-		monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
-		ceph_con_open(monc->con,
+		ceph_con_open(&monc->con,
+			      CEPH_ENTITY_TYPE_MON, monc->cur_mon,
 			      &monc->monmap->mon_inst[monc->cur_mon].addr);
 
 		/* initiatiate authentication handshake */
@@ -226,8 +228,8 @@ static void __send_subscribe(struct ceph_mon_client *monc)
 
 		msg->front.iov_len = p - msg->front.iov_base;
 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-		ceph_con_revoke(monc->con, msg);
-		ceph_con_send(monc->con, ceph_msg_get(msg));
+		ceph_msg_revoke(msg);
+		ceph_con_send(&monc->con, ceph_msg_get(msg));
 
 		monc->sub_sent = jiffies | 1;  /* never 0 */
 	}
@@ -247,7 +249,7 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
 	if (monc->hunting) {
 		pr_info("mon%d %s session established\n",
 			monc->cur_mon,
-			ceph_pr_addr(&monc->con->peer_addr.in_addr));
+			ceph_pr_addr(&monc->con.peer_addr.in_addr));
 		monc->hunting = false;
 	}
 	dout("handle_subscribe_ack after %d seconds\n", seconds);
@@ -439,6 +441,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
 		m = NULL;
 	} else {
 		dout("get_generic_reply %lld got %p\n", tid, req->reply);
+		*skip = 0;
 		m = ceph_msg_get(req->reply);
 		/*
 		 * we don't need to track the connection reading into
@@ -461,7 +464,7 @@ static int do_generic_request(struct ceph_mon_client *monc,
 	req->request->hdr.tid = cpu_to_le64(req->tid);
 	__insert_generic_request(monc, req);
 	monc->num_generic_requests++;
-	ceph_con_send(monc->con, ceph_msg_get(req->request));
+	ceph_con_send(&monc->con, ceph_msg_get(req->request));
 	mutex_unlock(&monc->mutex);
 
 	err = wait_for_completion_interruptible(&req->completion);
@@ -684,8 +687,9 @@ static void __resend_generic_request(struct ceph_mon_client *monc)
 
 	for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
 		req = rb_entry(p, struct ceph_mon_generic_request, node);
-		ceph_con_revoke(monc->con, req->request);
-		ceph_con_send(monc->con, ceph_msg_get(req->request));
+		ceph_msg_revoke(req->request);
+		ceph_msg_revoke_incoming(req->reply);
+		ceph_con_send(&monc->con, ceph_msg_get(req->request));
 	}
 }
 
@@ -705,7 +709,7 @@ static void delayed_work(struct work_struct *work)
 		__close_session(monc);
 		__open_session(monc);  /* continue hunting */
 	} else {
-		ceph_con_keepalive(monc->con);
+		ceph_con_keepalive(&monc->con);
 
 		__validate_auth(monc);
 
@@ -760,19 +764,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
 		goto out;
 
 	/* connection */
-	monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
-	if (!monc->con)
-		goto out_monmap;
-	ceph_con_init(monc->client->msgr, monc->con);
-	monc->con->private = monc;
-	monc->con->ops = &mon_con_ops;
-
 	/* authentication */
 	monc->auth = ceph_auth_init(cl->options->name,
 				    cl->options->key);
 	if (IS_ERR(monc->auth)) {
 		err = PTR_ERR(monc->auth);
-		goto out_con;
+		goto out_monmap;
 	}
 	monc->auth->want_keys =
 		CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
@@ -801,6 +798,9 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
 	if (!monc->m_auth)
 		goto out_auth_reply;
 
+	ceph_con_init(&monc->con, monc, &mon_con_ops,
+		      &monc->client->msgr);
+
 	monc->cur_mon = -1;
 	monc->hunting = true;
 	monc->sub_renew_after = jiffies;
@@ -824,8 +824,6 @@ out_subscribe_ack:
 	ceph_msg_put(monc->m_subscribe_ack);
 out_auth:
 	ceph_auth_destroy(monc->auth);
-out_con:
-	monc->con->ops->put(monc->con);
 out_monmap:
 	kfree(monc->monmap);
 out:
@@ -841,10 +839,6 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
 	mutex_lock(&monc->mutex);
 	__close_session(monc);
 
-	monc->con->private = NULL;
-	monc->con->ops->put(monc->con);
-	monc->con = NULL;
-
 	mutex_unlock(&monc->mutex);
 
 	/*
@@ -888,8 +882,8 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
 	} else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
 		dout("authenticated, starting session\n");
 
-		monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
-		monc->client->msgr->inst.name.num =
+		monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
+		monc->client->msgr.inst.name.num =
 					cpu_to_le64(monc->auth->global_id);
 
 		__send_subscribe(monc);
@@ -1000,6 +994,8 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
 	case CEPH_MSG_MDS_MAP:
 	case CEPH_MSG_OSD_MAP:
 		m = ceph_msg_new(type, front_len, GFP_NOFS, false);
+		if (!m)
+			return NULL;	/* ENOMEM--return skip == 0 */
 		break;
 	}
 
@@ -1029,7 +1025,7 @@ static void mon_fault(struct ceph_connection *con)
 	if (!monc->hunting)
 		pr_info("mon%d %s session lost, "
 			"hunting for new mon\n", monc->cur_mon,
-			ceph_pr_addr(&monc->con->peer_addr.in_addr));
+			ceph_pr_addr(&monc->con.peer_addr.in_addr));
 
 	__close_session(monc);
 	if (!monc->hunting) {
@@ -1044,9 +1040,23 @@ out:
 	mutex_unlock(&monc->mutex);
 }
 
+/*
+ * We can ignore refcounting on the connection struct, as all references
+ * will come from the messenger workqueue, which is drained prior to
+ * mon_client destruction.
+ */
+static struct ceph_connection *con_get(struct ceph_connection *con)
+{
+	return con;
+}
+
+static void con_put(struct ceph_connection *con)
+{
+}
+
 static const struct ceph_connection_operations mon_con_ops = {
-	.get = ceph_con_get,
-	.put = ceph_con_put,
+	.get = con_get,
+	.put = con_put,
 	.dispatch = dispatch,
 	.fault = mon_fault,
 	.alloc_msg = mon_alloc_msg,

+ 4 - 3
net/ceph/msgpool.c

@@ -12,7 +12,7 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
 	struct ceph_msgpool *pool = arg;
 	struct ceph_msg *msg;
 
-	msg = ceph_msg_new(0, pool->front_len, gfp_mask, true);
+	msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true);
 	if (!msg) {
 		dout("msgpool_alloc %s failed\n", pool->name);
 	} else {
@@ -32,10 +32,11 @@ static void msgpool_free(void *element, void *arg)
 	ceph_msg_put(msg);
 }
 
-int ceph_msgpool_init(struct ceph_msgpool *pool,
+int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
 		      int front_len, int size, bool blocking, const char *name)
 {
 	dout("msgpool %s init\n", name);
+	pool->type = type;
 	pool->front_len = front_len;
 	pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool);
 	if (!pool->pool)
@@ -61,7 +62,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
 		WARN_ON(1);
 
 		/* try to alloc a fresh message */
-		return ceph_msg_new(0, front_len, GFP_NOFS, false);
+		return ceph_msg_new(pool->type, front_len, GFP_NOFS, false);
 	}
 
 	msg = mempool_alloc(pool->pool, GFP_NOFS);

+ 48 - 29
net/ceph/osd_client.c

@@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref)
 	if (req->r_request)
 		ceph_msg_put(req->r_request);
 	if (req->r_con_filling_msg) {
-		dout("release_request revoking pages %p from con %p\n",
+		dout("%s revoking pages %p from con %p\n", __func__,
 		     req->r_pages, req->r_con_filling_msg);
-		ceph_con_revoke_message(req->r_con_filling_msg,
-				      req->r_reply);
+		ceph_msg_revoke_incoming(req->r_reply);
 		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
 	}
 	if (req->r_reply)
@@ -214,10 +213,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	kref_init(&req->r_kref);
 	init_completion(&req->r_completion);
 	init_completion(&req->r_safe_completion);
+	rb_init_node(&req->r_node);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 	INIT_LIST_HEAD(&req->r_linger_item);
 	INIT_LIST_HEAD(&req->r_linger_osd);
 	INIT_LIST_HEAD(&req->r_req_lru_item);
+	INIT_LIST_HEAD(&req->r_osd_item);
+
 	req->r_flags = flags;
 
 	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -243,6 +245,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 		}
 		ceph_pagelist_init(req->r_trail);
 	}
+
 	/* create request message; allow space for oid */
 	msg_size += MAX_OBJ_NAME_SIZE;
 	if (snapc)
@@ -256,7 +259,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 		return NULL;
 	}
 
-	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
 	memset(msg->front.iov_base, 0, msg->front.iov_len);
 
 	req->r_request = msg;
@@ -624,7 +626,7 @@ static void osd_reset(struct ceph_connection *con)
 /*
  * Track open sessions with osds.
  */
-static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
+static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 {
 	struct ceph_osd *osd;
 
@@ -634,15 +636,13 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 
 	atomic_set(&osd->o_ref, 1);
 	osd->o_osdc = osdc;
+	osd->o_osd = onum;
 	INIT_LIST_HEAD(&osd->o_requests);
 	INIT_LIST_HEAD(&osd->o_linger_requests);
 	INIT_LIST_HEAD(&osd->o_osd_lru);
 	osd->o_incarnation = 1;
 
-	ceph_con_init(osdc->client->msgr, &osd->o_con);
-	osd->o_con.private = osd;
-	osd->o_con.ops = &osd_con_ops;
-	osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
 
 	INIT_LIST_HEAD(&osd->o_keepalive_item);
 	return osd;
@@ -688,7 +688,7 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 
 static void remove_all_osds(struct ceph_osd_client *osdc)
 {
-	dout("__remove_old_osds %p\n", osdc);
+	dout("%s %p\n", __func__, osdc);
 	mutex_lock(&osdc->request_mutex);
 	while (!RB_EMPTY_ROOT(&osdc->osds)) {
 		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
@@ -752,7 +752,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 		ret = -EAGAIN;
 	} else {
 		ceph_con_close(&osd->o_con);
-		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
+		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
+			      &osdc->osdmap->osd_addr[osd->o_osd]);
 		osd->o_incarnation++;
 	}
 	return ret;
@@ -853,7 +854,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
 	if (req->r_osd) {
 		/* make sure the original request isn't in flight. */
-		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+		ceph_msg_revoke(req->r_request);
 
 		list_del_init(&req->r_osd_item);
 		if (list_empty(&req->r_osd->o_requests) &&
@@ -880,7 +881,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 static void __cancel_request(struct ceph_osd_request *req)
 {
 	if (req->r_sent && req->r_osd) {
-		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+		ceph_msg_revoke(req->r_request);
 		req->r_sent = 0;
 	}
 }
@@ -890,7 +891,9 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
 {
 	dout("__register_linger_request %p\n", req);
 	list_add_tail(&req->r_linger_item, &osdc->req_linger);
-	list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
+	if (req->r_osd)
+		list_add_tail(&req->r_linger_osd,
+			      &req->r_osd->o_linger_requests);
 }
 
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
@@ -998,18 +1001,18 @@ static int __map_request(struct ceph_osd_client *osdc,
 	req->r_osd = __lookup_osd(osdc, o);
 	if (!req->r_osd && o >= 0) {
 		err = -ENOMEM;
-		req->r_osd = create_osd(osdc);
+		req->r_osd = create_osd(osdc, o);
 		if (!req->r_osd) {
 			list_move(&req->r_req_lru_item, &osdc->req_notarget);
 			goto out;
 		}
 
 		dout("map_request osd %p is osd%d\n", req->r_osd, o);
-		req->r_osd->o_osd = o;
-		req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
 		__insert_osd(osdc, req->r_osd);
 
-		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
+		ceph_con_open(&req->r_osd->o_con,
+			      CEPH_ENTITY_TYPE_OSD, o,
+			      &osdc->osdmap->osd_addr[o]);
 	}
 
 	if (req->r_osd) {
@@ -1304,8 +1307,9 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 
 	dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
 	mutex_lock(&osdc->request_mutex);
-	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+	for (p = rb_first(&osdc->requests); p; ) {
 		req = rb_entry(p, struct ceph_osd_request, r_node);
+		p = rb_next(p);
 		err = __map_request(osdc, req, force_resend);
 		if (err < 0)
 			continue;  /* error */
@@ -1313,10 +1317,23 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 			dout("%p tid %llu maps to no osd\n", req, req->r_tid);
 			needmap++;  /* request a newer map */
 		} else if (err > 0) {
-			dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
-			     req->r_osd ? req->r_osd->o_osd : -1);
-			if (!req->r_linger)
+			if (!req->r_linger) {
+				dout("%p tid %llu requeued on osd%d\n", req,
+				     req->r_tid,
+				     req->r_osd ? req->r_osd->o_osd : -1);
 				req->r_flags |= CEPH_OSD_FLAG_RETRY;
+			}
+		}
+		if (req->r_linger && list_empty(&req->r_linger_item)) {
+			/*
+			 * register as a linger so that we will
+			 * re-submit below and get a new tid
+			 */
+			dout("%p tid %llu restart on osd%d\n",
+			     req, req->r_tid,
+			     req->r_osd ? req->r_osd->o_osd : -1);
+			__register_linger_request(osdc, req);
+			__unregister_request(osdc, req);
 		}
 	}
 
@@ -1391,7 +1408,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 			     epoch, maplen);
 			newmap = osdmap_apply_incremental(&p, next,
 							  osdc->osdmap,
-							  osdc->client->msgr);
+							  &osdc->client->msgr);
 			if (IS_ERR(newmap)) {
 				err = PTR_ERR(newmap);
 				goto bad;
@@ -1839,11 +1856,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 	if (!osdc->req_mempool)
 		goto out;
 
-	err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
+	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
+				OSD_OP_FRONT_LEN, 10, true,
 				"osd_op");
 	if (err < 0)
 		goto out_mempool;
-	err = ceph_msgpool_init(&osdc->msgpool_op_reply,
+	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
 				OSD_OPREPLY_FRONT_LEN, 10, true,
 				"osd_op_reply");
 	if (err < 0)
@@ -2019,15 +2037,15 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 	if (!req) {
 		*skip = 1;
 		m = NULL;
-		pr_info("get_reply unknown tid %llu from osd%d\n", tid,
-			osd->o_osd);
+		dout("get_reply unknown tid %llu from osd%d\n", tid,
+		     osd->o_osd);
 		goto out;
 	}
 
 	if (req->r_con_filling_msg) {
-		dout("get_reply revoking msg %p from old con %p\n",
+		dout("%s revoking msg %p from old con %p\n", __func__,
 		     req->r_reply, req->r_con_filling_msg);
-		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+		ceph_msg_revoke_incoming(req->r_reply);
 		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
 		req->r_con_filling_msg = NULL;
 	}
@@ -2080,6 +2098,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 	int type = le16_to_cpu(hdr->type);
 	int front = le32_to_cpu(hdr->front_len);
 
+	*skip = 0;
 	switch (type) {
 	case CEPH_MSG_OSD_MAP:
 	case CEPH_MSG_WATCH_NOTIFY:

+ 53 - 6
net/ceph/osdmap.c

@@ -135,6 +135,21 @@ bad:
 	return -EINVAL;
 }
 
+static int skip_name_map(void **p, void *end)
+{
+        int len;
+        ceph_decode_32_safe(p, end, len ,bad);
+        while (len--) {
+                int strlen;
+                *p += sizeof(u32);
+                ceph_decode_32_safe(p, end, strlen, bad);
+                *p += strlen;
+}
+        return 0;
+bad:
+        return -EINVAL;
+}
+
 static struct crush_map *crush_decode(void *pbyval, void *end)
 {
 	struct crush_map *c;
@@ -143,6 +158,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
 	void **p = &pbyval;
 	void *start = pbyval;
 	u32 magic;
+	u32 num_name_maps;
 
 	dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
 
@@ -150,6 +166,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
 	if (c == NULL)
 		return ERR_PTR(-ENOMEM);
 
+        /* set tunables to default values */
+        c->choose_local_tries = 2;
+        c->choose_local_fallback_tries = 5;
+        c->choose_total_tries = 19;
+
 	ceph_decode_need(p, end, 4*sizeof(u32), bad);
 	magic = ceph_decode_32(p);
 	if (magic != CRUSH_MAGIC) {
@@ -297,7 +318,25 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
 	}
 
 	/* ignore trailing name maps. */
+        for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) {
+                err = skip_name_map(p, end);
+                if (err < 0)
+                        goto done;
+        }
+
+        /* tunables */
+        ceph_decode_need(p, end, 3*sizeof(u32), done);
+        c->choose_local_tries = ceph_decode_32(p);
+        c->choose_local_fallback_tries =  ceph_decode_32(p);
+        c->choose_total_tries = ceph_decode_32(p);
+        dout("crush decode tunable choose_local_tries = %d",
+             c->choose_local_tries);
+        dout("crush decode tunable choose_local_fallback_tries = %d",
+             c->choose_local_fallback_tries);
+        dout("crush decode tunable choose_total_tries = %d",
+             c->choose_total_tries);
 
+done:
 	dout("crush_decode success\n");
 	return c;
 
@@ -488,15 +527,16 @@ static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 		ceph_decode_32_safe(p, end, pool, bad);
 		ceph_decode_32_safe(p, end, len, bad);
 		dout("  pool %d len %d\n", pool, len);
+		ceph_decode_need(p, end, len, bad);
 		pi = __lookup_pg_pool(&map->pg_pools, pool);
 		if (pi) {
+			char *name = kstrndup(*p, len, GFP_NOFS);
+
+			if (!name)
+				return -ENOMEM;
 			kfree(pi->name);
-			pi->name = kmalloc(len + 1, GFP_NOFS);
-			if (pi->name) {
-				memcpy(pi->name, *p, len);
-				pi->name[len] = '\0';
-				dout("  name is %s\n", pi->name);
-			}
+			pi->name = name;
+			dout("  name is %s\n", pi->name);
 		}
 		*p += len;
 	}
@@ -666,6 +706,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
 		ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad);
 		ceph_decode_copy(p, &pgid, sizeof(pgid));
 		n = ceph_decode_32(p);
+		err = -EINVAL;
+		if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
+			goto bad;
 		ceph_decode_need(p, end, n * sizeof(u32), bad);
 		err = -ENOMEM;
 		pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
@@ -889,6 +932,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 			(void) __remove_pg_mapping(&map->pg_temp, pgid);
 
 			/* insert */
+			if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
+				err = -EINVAL;
+				goto bad;
+			}
 			pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
 			if (!pg) {
 				err = -ENOMEM;

Daži faili netika attēloti, jo izmaiņu fails ir pārāk liels