Эх сурвалжийг харах

ceph: put unused osd connections on lru

Instead of removing osd connection immediately when the
requests list is empty, put the osd connection on an lru.
Only if that osd has not been used for more than a specified
time, will it be removed.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
Yehuda Sadeh 15 жил өмнө
parent
commit
f5a2041bd9

+ 67 - 9
fs/ceph/osd_client.c

@@ -389,6 +389,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 	atomic_set(&osd->o_ref, 1);
 	atomic_set(&osd->o_ref, 1);
 	osd->o_osdc = osdc;
 	osd->o_osdc = osdc;
 	INIT_LIST_HEAD(&osd->o_requests);
 	INIT_LIST_HEAD(&osd->o_requests);
+	INIT_LIST_HEAD(&osd->o_osd_lru);
 	osd->o_incarnation = 1;
 	osd->o_incarnation = 1;
 
 
 	ceph_con_init(osdc->client->msgr, &osd->o_con);
 	ceph_con_init(osdc->client->msgr, &osd->o_con);
@@ -422,25 +423,56 @@ static void put_osd(struct ceph_osd *osd)
 /*
 /*
  * remove an osd from our map
  * remove an osd from our map
  */
  */
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
 {
-	dout("remove_osd %p\n", osd);
+	dout("__remove_osd %p\n", osd);
 	BUG_ON(!list_empty(&osd->o_requests));
 	BUG_ON(!list_empty(&osd->o_requests));
 	rb_erase(&osd->o_node, &osdc->osds);
 	rb_erase(&osd->o_node, &osdc->osds);
+	list_del_init(&osd->o_osd_lru);
 	ceph_con_close(&osd->o_con);
 	ceph_con_close(&osd->o_con);
 	put_osd(osd);
 	put_osd(osd);
 }
 }
 
 
+static void __move_osd_to_lru(struct ceph_osd_client *osdc,
+			      struct ceph_osd *osd)
+{
+	dout("__move_osd_to_lru %p\n", osd);
+	BUG_ON(!list_empty(&osd->o_osd_lru));
+	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
+	osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
+}
+
+static void __remove_osd_from_lru(struct ceph_osd *osd)
+{
+	dout("__remove_osd_from_lru %p\n", osd);
+	if (!list_empty(&osd->o_osd_lru))
+		list_del_init(&osd->o_osd_lru);
+}
+
+static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
+{
+	struct ceph_osd *osd, *nosd;
+
+	dout("__remove_old_osds %p\n", osdc);
+	mutex_lock(&osdc->request_mutex);
+	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
+		if (!remove_all && time_before(jiffies, osd->lru_ttl))
+			break;
+		__remove_osd(osdc, osd);
+	}
+	mutex_unlock(&osdc->request_mutex);
+}
+
 /*
 /*
  * reset osd connect
  * reset osd connect
  */
  */
-static int reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
 {
 	int ret = 0;
 	int ret = 0;
 
 
-	dout("reset_osd %p osd%d\n", osd, osd->o_osd);
+	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
 	if (list_empty(&osd->o_requests)) {
 	if (list_empty(&osd->o_requests)) {
-		remove_osd(osdc, osd);
+		__remove_osd(osdc, osd);
 	} else {
 	} else {
 		ceph_con_close(&osd->o_con);
 		ceph_con_close(&osd->o_con);
 		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
 		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
@@ -533,7 +565,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
 
 		list_del_init(&req->r_osd_item);
 		list_del_init(&req->r_osd_item);
 		if (list_empty(&req->r_osd->o_requests))
 		if (list_empty(&req->r_osd->o_requests))
-			remove_osd(osdc, req->r_osd);
+			__move_osd_to_lru(osdc, req->r_osd);
 		req->r_osd = NULL;
 		req->r_osd = NULL;
 	}
 	}
 
 
@@ -611,7 +643,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
 		if (list_empty(&req->r_osd->o_requests)) {
 		if (list_empty(&req->r_osd->o_requests)) {
 			/* try to re-use r_osd if possible */
 			/* try to re-use r_osd if possible */
 			newosd = get_osd(req->r_osd);
 			newosd = get_osd(req->r_osd);
-			remove_osd(osdc, newosd);
+			__remove_osd(osdc, newosd);
 		}
 		}
 		req->r_osd = NULL;
 		req->r_osd = NULL;
 	}
 	}
@@ -636,8 +668,10 @@ static int __map_osds(struct ceph_osd_client *osdc,
 		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
 		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
 	}
 	}
 
 
-	if (req->r_osd)
+	if (req->r_osd) {
+		__remove_osd_from_lru(req->r_osd);
 		list_add(&req->r_osd_item, &req->r_osd->o_requests);
 		list_add(&req->r_osd_item, &req->r_osd->o_requests);
+	}
 	err = 1;   /* osd changed */
 	err = 1;   /* osd changed */
 
 
 out:
 out:
@@ -744,6 +778,23 @@ static void handle_timeout(struct work_struct *work)
 	up_read(&osdc->map_sem);
 	up_read(&osdc->map_sem);
 }
 }
 
 
+static void handle_osds_timeout(struct work_struct *work)
+{
+	struct ceph_osd_client *osdc =
+		container_of(work, struct ceph_osd_client,
+			     osds_timeout_work.work);
+	unsigned long delay =
+		osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
+
+	dout("osds timeout\n");
+	down_read(&osdc->map_sem);
+	remove_old_osds(osdc, 0);
+	up_read(&osdc->map_sem);
+
+	schedule_delayed_work(&osdc->osds_timeout_work,
+			      round_jiffies_relative(delay));
+}
+
 /*
 /*
  * handle osd op reply.  either call the callback if it is specified,
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
  * or do the completion to wake up the waiting thread.
@@ -881,7 +932,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
 				   ceph_osd_addr(osdc->osdmap,
 				   ceph_osd_addr(osdc->osdmap,
 						 osd->o_osd),
 						 osd->o_osd),
 				   sizeof(struct ceph_entity_addr)) != 0)
 				   sizeof(struct ceph_entity_addr)) != 0)
-				reset_osd(osdc, osd);
+				__reset_osd(osdc, osd);
 		}
 		}
 	}
 	}
 
 
@@ -1195,9 +1246,14 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 	osdc->timeout_tid = 0;
 	osdc->timeout_tid = 0;
 	osdc->last_tid = 0;
 	osdc->last_tid = 0;
 	osdc->osds = RB_ROOT;
 	osdc->osds = RB_ROOT;
+	INIT_LIST_HEAD(&osdc->osd_lru);
 	osdc->requests = RB_ROOT;
 	osdc->requests = RB_ROOT;
 	osdc->num_requests = 0;
 	osdc->num_requests = 0;
 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
+	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+
+	schedule_delayed_work(&osdc->osds_timeout_work,
+	   round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
 
 
 	err = -ENOMEM;
 	err = -ENOMEM;
 	osdc->req_mempool = mempool_create_kmalloc_pool(10,
 	osdc->req_mempool = mempool_create_kmalloc_pool(10,
@@ -1219,10 +1275,12 @@ out:
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
 {
 	cancel_delayed_work_sync(&osdc->timeout_work);
 	cancel_delayed_work_sync(&osdc->timeout_work);
+	cancel_delayed_work_sync(&osdc->osds_timeout_work);
 	if (osdc->osdmap) {
 	if (osdc->osdmap) {
 		ceph_osdmap_destroy(osdc->osdmap);
 		ceph_osdmap_destroy(osdc->osdmap);
 		osdc->osdmap = NULL;
 		osdc->osdmap = NULL;
 	}
 	}
+	remove_old_osds(osdc, 1);
 	mempool_destroy(osdc->req_mempool);
 	mempool_destroy(osdc->req_mempool);
 	ceph_msgpool_destroy(&osdc->msgpool_op);
 	ceph_msgpool_destroy(&osdc->msgpool_op);
 }
 }

+ 4 - 0
fs/ceph/osd_client.h

@@ -31,9 +31,11 @@ struct ceph_osd {
 	struct rb_node o_node;
 	struct rb_node o_node;
 	struct ceph_connection o_con;
 	struct ceph_connection o_con;
 	struct list_head o_requests;
 	struct list_head o_requests;
+	struct list_head o_osd_lru;
 	struct ceph_authorizer *o_authorizer;
 	struct ceph_authorizer *o_authorizer;
 	void *o_authorizer_buf, *o_authorizer_reply_buf;
 	void *o_authorizer_buf, *o_authorizer_reply_buf;
 	size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
 	size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
+	unsigned long lru_ttl;
 };
 };
 
 
 /* an in-flight request */
 /* an in-flight request */
@@ -90,11 +92,13 @@ struct ceph_osd_client {
 
 
 	struct mutex           request_mutex;
 	struct mutex           request_mutex;
 	struct rb_root         osds;          /* osds */
 	struct rb_root         osds;          /* osds */
+	struct list_head       osd_lru;       /* idle osds */
 	u64                    timeout_tid;   /* tid of timeout triggering rq */
 	u64                    timeout_tid;   /* tid of timeout triggering rq */
 	u64                    last_tid;      /* tid of last request */
 	u64                    last_tid;      /* tid of last request */
 	struct rb_root         requests;      /* pending requests */
 	struct rb_root         requests;      /* pending requests */
 	int                    num_requests;
 	int                    num_requests;
 	struct delayed_work    timeout_work;
 	struct delayed_work    timeout_work;
+	struct delayed_work    osds_timeout_work;
 #ifdef CONFIG_DEBUG_FS
 #ifdef CONFIG_DEBUG_FS
 	struct dentry 	       *debugfs_file;
 	struct dentry 	       *debugfs_file;
 #endif
 #endif

+ 3 - 0
fs/ceph/super.c

@@ -293,6 +293,7 @@ enum {
 	Opt_rsize,
 	Opt_rsize,
 	Opt_osdtimeout,
 	Opt_osdtimeout,
 	Opt_mount_timeout,
 	Opt_mount_timeout,
+	Opt_osd_idle_ttl,
 	Opt_caps_wanted_delay_min,
 	Opt_caps_wanted_delay_min,
 	Opt_caps_wanted_delay_max,
 	Opt_caps_wanted_delay_max,
 	Opt_readdir_max_entries,
 	Opt_readdir_max_entries,
@@ -322,6 +323,7 @@ static match_table_t arg_tokens = {
 	{Opt_rsize, "rsize=%d"},
 	{Opt_rsize, "rsize=%d"},
 	{Opt_osdtimeout, "osdtimeout=%d"},
 	{Opt_osdtimeout, "osdtimeout=%d"},
 	{Opt_mount_timeout, "mount_timeout=%d"},
 	{Opt_mount_timeout, "mount_timeout=%d"},
+	{Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
 	{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
 	{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
 	{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
 	{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
 	{Opt_readdir_max_entries, "readdir_max_entries=%d"},
 	{Opt_readdir_max_entries, "readdir_max_entries=%d"},
@@ -367,6 +369,7 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
 	args->flags = CEPH_OPT_DEFAULT;
 	args->flags = CEPH_OPT_DEFAULT;
 	args->osd_timeout = 5;    /* seconds */
 	args->osd_timeout = 5;    /* seconds */
 	args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
 	args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
+	args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;   /* seconds */
 	args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
 	args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
 	args->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
 	args->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
 	args->rsize = CEPH_MOUNT_RSIZE_DEFAULT;
 	args->rsize = CEPH_MOUNT_RSIZE_DEFAULT;

+ 2 - 0
fs/ceph/super.h

@@ -53,6 +53,7 @@ struct ceph_mount_args {
 	struct ceph_entity_addr *mon_addr;
 	struct ceph_entity_addr *mon_addr;
 	int flags;
 	int flags;
 	int mount_timeout;
 	int mount_timeout;
+	int osd_idle_ttl;
 	int caps_wanted_delay_min, caps_wanted_delay_max;
 	int caps_wanted_delay_min, caps_wanted_delay_max;
 	struct ceph_fsid fsid;
 	struct ceph_fsid fsid;
 	struct ceph_entity_addr my_addr;
 	struct ceph_entity_addr my_addr;
@@ -71,6 +72,7 @@ struct ceph_mount_args {
  * defaults
  * defaults
  */
  */
 #define CEPH_MOUNT_TIMEOUT_DEFAULT  60
 #define CEPH_MOUNT_TIMEOUT_DEFAULT  60
+#define CEPH_OSD_IDLE_TTL_DEFAULT    60
 #define CEPH_MOUNT_RSIZE_DEFAULT    (512*1024) /* readahead */
 #define CEPH_MOUNT_RSIZE_DEFAULT    (512*1024) /* readahead */
 
 
 #define CEPH_MSG_MAX_FRONT_LEN	(16*1024*1024)
 #define CEPH_MSG_MAX_FRONT_LEN	(16*1024*1024)