浏览代码

Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2

* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2: (56 commits)
  [PATCH] fs/ocfs2/dlm/: cleanups
  ocfs2: fix compiler warnings in dlm_convert_lock_handler()
  ocfs2: dlm_print_one_mle() needs to be defined
  ocfs2: remove whitespace in dlmunlock.c
  ocfs2: move dlm work to a private work queue
  ocfs2: fix incorrect error returns
  ocfs2: tune down some noisy messages during dlm recovery
  ocfs2: display message before waiting for recovery to complete
  ocfs2: mlog in dlm_convert_lock_handler() should be ML_ERROR
  ocfs2: retry operations when a lock is marked in recovery
  ocfs2: use cond_resched() in dlm_thread()
  ocfs2: use GFP_NOFS in some dlm operations
  ocfs2: wait for recovery when starting lock mastery
  ocfs2: continue recovery when a dead node is encountered
  ocfs2: remove unneccesary spin_unlock() in dlm_remaster_locks()
  ocfs2: dlm_remaster_locks() should never exit without completing
  ocfs2: special case recovery lock in dlmlock_remote()
  ocfs2: pending mastery asserts and migrations should block each other
  ocfs2: temporarily disable automatic lock migration
  ocfs2: do not unconditionally purge the lockres in dlmlock_remote()
  ...
Linus Torvalds 19 年之前
父节点
当前提交
eb99adde31

+ 7 - 5
fs/ocfs2/dlm/dlmast.c

@@ -197,12 +197,14 @@ static void dlm_update_lvb(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
 				  lock->ml.node == dlm->node_num ? "master" :
 				  lock->ml.node == dlm->node_num ? "master" :
 				  "remote");
 				  "remote");
 			memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
 			memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
-		} else if (lksb->flags & DLM_LKSB_PUT_LVB) {
-			mlog(0, "setting lvb from lockres for %s node\n",
-				  lock->ml.node == dlm->node_num ? "master" :
-				  "remote");
-			memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
 		}
 		}
+		/* Do nothing for lvb put requests - they should be done in
+ 		 * place when the lock is downconverted - otherwise we risk
+ 		 * racing gets and puts which could result in old lvb data
+ 		 * being propagated. We leave the put flag set and clear it
+ 		 * here. In the future we might want to clear it at the time
+ 		 * the put is actually done.
+		 */
 		spin_unlock(&res->spinlock);
 		spin_unlock(&res->spinlock);
 	}
 	}
 
 

+ 53 - 10
fs/ocfs2/dlm/dlmcommon.h

@@ -37,7 +37,17 @@
 #define DLM_THREAD_SHUFFLE_INTERVAL    5     // flush everything every 5 passes
 #define DLM_THREAD_SHUFFLE_INTERVAL    5     // flush everything every 5 passes
 #define DLM_THREAD_MS                  200   // flush at least every 200 ms
 #define DLM_THREAD_MS                  200   // flush at least every 200 ms
 
 
-#define DLM_HASH_BUCKETS     (PAGE_SIZE / sizeof(struct hlist_head))
+#define DLM_HASH_SIZE_DEFAULT	(1 << 14)
+#if DLM_HASH_SIZE_DEFAULT < PAGE_SIZE
+# define DLM_HASH_PAGES		1
+#else
+# define DLM_HASH_PAGES		(DLM_HASH_SIZE_DEFAULT / PAGE_SIZE)
+#endif
+#define DLM_BUCKETS_PER_PAGE	(PAGE_SIZE / sizeof(struct hlist_head))
+#define DLM_HASH_BUCKETS	(DLM_HASH_PAGES * DLM_BUCKETS_PER_PAGE)
+
+/* Intended to make it easier for us to switch out hash functions */
+#define dlm_lockid_hash(_n, _l) full_name_hash(_n, _l)
 
 
 enum dlm_ast_type {
 enum dlm_ast_type {
 	DLM_AST = 0,
 	DLM_AST = 0,
@@ -61,7 +71,8 @@ static inline int dlm_is_recovery_lock(const char *lock_name, int name_len)
 	return 0;
 	return 0;
 }
 }
 
 
-#define DLM_RECO_STATE_ACTIVE  0x0001
+#define DLM_RECO_STATE_ACTIVE    0x0001
+#define DLM_RECO_STATE_FINALIZE  0x0002
 
 
 struct dlm_recovery_ctxt
 struct dlm_recovery_ctxt
 {
 {
@@ -85,7 +96,7 @@ enum dlm_ctxt_state {
 struct dlm_ctxt
 struct dlm_ctxt
 {
 {
 	struct list_head list;
 	struct list_head list;
-	struct hlist_head *lockres_hash;
+	struct hlist_head **lockres_hash;
 	struct list_head dirty_list;
 	struct list_head dirty_list;
 	struct list_head purge_list;
 	struct list_head purge_list;
 	struct list_head pending_asts;
 	struct list_head pending_asts;
@@ -120,6 +131,7 @@ struct dlm_ctxt
 	struct o2hb_callback_func dlm_hb_down;
 	struct o2hb_callback_func dlm_hb_down;
 	struct task_struct *dlm_thread_task;
 	struct task_struct *dlm_thread_task;
 	struct task_struct *dlm_reco_thread_task;
 	struct task_struct *dlm_reco_thread_task;
+	struct workqueue_struct *dlm_worker;
 	wait_queue_head_t dlm_thread_wq;
 	wait_queue_head_t dlm_thread_wq;
 	wait_queue_head_t dlm_reco_thread_wq;
 	wait_queue_head_t dlm_reco_thread_wq;
 	wait_queue_head_t ast_wq;
 	wait_queue_head_t ast_wq;
@@ -132,6 +144,11 @@ struct dlm_ctxt
 	struct list_head	dlm_eviction_callbacks;
 	struct list_head	dlm_eviction_callbacks;
 };
 };
 
 
+static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned i)
+{
+	return dlm->lockres_hash[(i / DLM_BUCKETS_PER_PAGE) % DLM_HASH_PAGES] + (i % DLM_BUCKETS_PER_PAGE);
+}
+
 /* these keventd work queue items are for less-frequently
 /* these keventd work queue items are for less-frequently
  * called functions that cannot be directly called from the
  * called functions that cannot be directly called from the
  * net message handlers for some reason, usually because
  * net message handlers for some reason, usually because
@@ -216,20 +233,29 @@ struct dlm_lock_resource
 	/* WARNING: Please see the comment in dlm_init_lockres before
 	/* WARNING: Please see the comment in dlm_init_lockres before
 	 * adding fields here. */
 	 * adding fields here. */
 	struct hlist_node hash_node;
 	struct hlist_node hash_node;
+	struct qstr lockname;
 	struct kref      refs;
 	struct kref      refs;
 
 
-	/* please keep these next 3 in this order
-	 * some funcs want to iterate over all lists */
+	/*
+	 * Please keep granted, converting, and blocked in this order,
+	 * as some funcs want to iterate over all lists.
+	 *
+	 * All four lists are protected by the hash's reference.
+	 */
 	struct list_head granted;
 	struct list_head granted;
 	struct list_head converting;
 	struct list_head converting;
 	struct list_head blocked;
 	struct list_head blocked;
+	struct list_head purge;
 
 
+	/*
+	 * These two lists require you to hold an additional reference
+	 * while they are on the list.
+	 */
 	struct list_head dirty;
 	struct list_head dirty;
 	struct list_head recovering; // dlm_recovery_ctxt.resources list
 	struct list_head recovering; // dlm_recovery_ctxt.resources list
 
 
 	/* unused lock resources have their last_used stamped and are
 	/* unused lock resources have their last_used stamped and are
 	 * put on a list for the dlm thread to run. */
 	 * put on a list for the dlm thread to run. */
-	struct list_head purge;
 	unsigned long    last_used;
 	unsigned long    last_used;
 
 
 	unsigned migration_pending:1;
 	unsigned migration_pending:1;
@@ -238,7 +264,6 @@ struct dlm_lock_resource
 	wait_queue_head_t wq;
 	wait_queue_head_t wq;
 	u8  owner;              //node which owns the lock resource, or unknown
 	u8  owner;              //node which owns the lock resource, or unknown
 	u16 state;
 	u16 state;
-	struct qstr lockname;
 	char lvb[DLM_LVB_LEN];
 	char lvb[DLM_LVB_LEN];
 };
 };
 
 
@@ -300,6 +325,15 @@ enum dlm_lockres_list {
 	DLM_BLOCKED_LIST
 	DLM_BLOCKED_LIST
 };
 };
 
 
+static inline int dlm_lvb_is_empty(char *lvb)
+{
+	int i;
+	for (i=0; i<DLM_LVB_LEN; i++)
+		if (lvb[i])
+			return 0;
+	return 1;
+}
+
 static inline struct list_head *
 static inline struct list_head *
 dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx)
 dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx)
 {
 {
@@ -609,7 +643,8 @@ struct dlm_finalize_reco
 {
 {
 	u8 node_idx;
 	u8 node_idx;
 	u8 dead_node;
 	u8 dead_node;
-	__be16 pad1;
+	u8 flags;
+	u8 pad1;
 	__be32 pad2;
 	__be32 pad2;
 };
 };
 
 
@@ -676,6 +711,7 @@ void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
 void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
 void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
 int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
 int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
+int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout);
 
 
 void dlm_put(struct dlm_ctxt *dlm);
 void dlm_put(struct dlm_ctxt *dlm);
 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
@@ -687,14 +723,20 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 			    struct dlm_lock_resource *res);
 			    struct dlm_lock_resource *res);
 void dlm_purge_lockres(struct dlm_ctxt *dlm,
 void dlm_purge_lockres(struct dlm_ctxt *dlm,
 		       struct dlm_lock_resource *lockres);
 		       struct dlm_lock_resource *lockres);
-void dlm_lockres_get(struct dlm_lock_resource *res);
+static inline void dlm_lockres_get(struct dlm_lock_resource *res)
+{
+	/* This is called on every lookup, so it might be worth
+	 * inlining. */
+	kref_get(&res->refs);
+}
 void dlm_lockres_put(struct dlm_lock_resource *res);
 void dlm_lockres_put(struct dlm_lock_resource *res);
 void __dlm_unhash_lockres(struct dlm_lock_resource *res);
 void __dlm_unhash_lockres(struct dlm_lock_resource *res);
 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
 			  struct dlm_lock_resource *res);
 			  struct dlm_lock_resource *res);
 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 						const char *name,
 						const char *name,
-						unsigned int len);
+						unsigned int len,
+						unsigned int hash);
 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 					      const char *name,
 					      const char *name,
 					      unsigned int len);
 					      unsigned int len);
@@ -819,6 +861,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm,
 			   u8 dead_node);
 			   u8 dead_node);
 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
 
 
+int __dlm_lockres_unused(struct dlm_lock_resource *res);
 
 
 static inline const char * dlm_lock_mode_name(int mode)
 static inline const char * dlm_lock_mode_name(int mode)
 {
 {

+ 24 - 0
fs/ocfs2/dlm/dlmconvert.c

@@ -214,6 +214,9 @@ grant:
 	if (lock->ml.node == dlm->node_num)
 	if (lock->ml.node == dlm->node_num)
 		mlog(0, "doing in-place convert for nonlocal lock\n");
 		mlog(0, "doing in-place convert for nonlocal lock\n");
 	lock->ml.type = type;
 	lock->ml.type = type;
+	if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
+		memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
+
 	status = DLM_NORMAL;
 	status = DLM_NORMAL;
 	*call_ast = 1;
 	*call_ast = 1;
 	goto unlock_exit;
 	goto unlock_exit;
@@ -461,6 +464,12 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
 	}
 	}
 
 
 	spin_lock(&res->spinlock);
 	spin_lock(&res->spinlock);
+	status = __dlm_lockres_state_to_status(res);
+	if (status != DLM_NORMAL) {
+		spin_unlock(&res->spinlock);
+		dlm_error(status);
+		goto leave;
+	}
 	list_for_each(iter, &res->granted) {
 	list_for_each(iter, &res->granted) {
 		lock = list_entry(iter, struct dlm_lock, list);
 		lock = list_entry(iter, struct dlm_lock, list);
 		if (lock->ml.cookie == cnv->cookie &&
 		if (lock->ml.cookie == cnv->cookie &&
@@ -470,6 +479,21 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
 		}
 		}
 		lock = NULL;
 		lock = NULL;
 	}
 	}
+	if (!lock) {
+		__dlm_print_one_lock_resource(res);
+		list_for_each(iter, &res->granted) {
+			lock = list_entry(iter, struct dlm_lock, list);
+			if (lock->ml.node == cnv->node_idx) {
+				mlog(ML_ERROR, "There is something here "
+				     "for node %u, lock->ml.cookie=%llu, "
+				     "cnv->cookie=%llu\n", cnv->node_idx,
+				     (unsigned long long)lock->ml.cookie,
+				     (unsigned long long)cnv->cookie);
+				break;
+			}
+		}
+		lock = NULL;
+	}
 	spin_unlock(&res->spinlock);
 	spin_unlock(&res->spinlock);
 	if (!lock) {
 	if (!lock) {
 		status = DLM_IVLOCKID;
 		status = DLM_IVLOCKID;

+ 3 - 3
fs/ocfs2/dlm/dlmdebug.c

@@ -37,10 +37,8 @@
 
 
 #include "dlmapi.h"
 #include "dlmapi.h"
 #include "dlmcommon.h"
 #include "dlmcommon.h"
-#include "dlmdebug.h"
 
 
 #include "dlmdomain.h"
 #include "dlmdomain.h"
-#include "dlmdebug.h"
 
 
 #define MLOG_MASK_PREFIX ML_DLM
 #define MLOG_MASK_PREFIX ML_DLM
 #include "cluster/masklog.h"
 #include "cluster/masklog.h"
@@ -120,6 +118,7 @@ void dlm_print_one_lock(struct dlm_lock *lockid)
 }
 }
 EXPORT_SYMBOL_GPL(dlm_print_one_lock);
 EXPORT_SYMBOL_GPL(dlm_print_one_lock);
 
 
+#if 0
 void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
 void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
 {
 {
 	struct dlm_lock_resource *res;
 	struct dlm_lock_resource *res;
@@ -136,12 +135,13 @@ void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
 
 
 	spin_lock(&dlm->spinlock);
 	spin_lock(&dlm->spinlock);
 	for (i=0; i<DLM_HASH_BUCKETS; i++) {
 	for (i=0; i<DLM_HASH_BUCKETS; i++) {
-		bucket = &(dlm->lockres_hash[i]);
+		bucket = dlm_lockres_hash(dlm, i);
 		hlist_for_each_entry(res, iter, bucket, hash_node)
 		hlist_for_each_entry(res, iter, bucket, hash_node)
 			dlm_print_one_lock_resource(res);
 			dlm_print_one_lock_resource(res);
 	}
 	}
 	spin_unlock(&dlm->spinlock);
 	spin_unlock(&dlm->spinlock);
 }
 }
+#endif  /*  0  */
 
 
 static const char *dlm_errnames[] = {
 static const char *dlm_errnames[] = {
 	[DLM_NORMAL] =			"DLM_NORMAL",
 	[DLM_NORMAL] =			"DLM_NORMAL",

+ 0 - 30
fs/ocfs2/dlm/dlmdebug.h

@@ -1,30 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; -*-
- * vim: noexpandtab sw=8 ts=8 sts=0:
- *
- * dlmdebug.h
- *
- * Copyright (C) 2004 Oracle.  All rights reserved.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public
- * License along with this program; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 021110-1307, USA.
- *
- */
-
-#ifndef DLMDEBUG_H
-#define DLMDEBUG_H
-
-void dlm_dump_lock_resources(struct dlm_ctxt *dlm);
-
-#endif

+ 72 - 29
fs/ocfs2/dlm/dlmdomain.c

@@ -41,7 +41,6 @@
 #include "dlmapi.h"
 #include "dlmapi.h"
 #include "dlmcommon.h"
 #include "dlmcommon.h"
 
 
-#include "dlmdebug.h"
 #include "dlmdomain.h"
 #include "dlmdomain.h"
 
 
 #include "dlmver.h"
 #include "dlmver.h"
@@ -49,6 +48,33 @@
 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
 #include "cluster/masklog.h"
 #include "cluster/masklog.h"
 
 
+static void dlm_free_pagevec(void **vec, int pages)
+{
+	while (pages--)
+		free_page((unsigned long)vec[pages]);
+	kfree(vec);
+}
+
+static void **dlm_alloc_pagevec(int pages)
+{
+	void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
+	int i;
+
+	if (!vec)
+		return NULL;
+
+	for (i = 0; i < pages; i++)
+		if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
+			goto out_free;
+
+	mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
+	     pages, DLM_HASH_PAGES, (unsigned long)DLM_BUCKETS_PER_PAGE);
+	return vec;
+out_free:
+	dlm_free_pagevec(vec, i);
+	return NULL;
+}
+
 /*
 /*
  *
  *
  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
@@ -90,8 +116,7 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
 	assert_spin_locked(&dlm->spinlock);
 	assert_spin_locked(&dlm->spinlock);
 
 
 	q = &res->lockname;
 	q = &res->lockname;
-	q->hash = full_name_hash(q->name, q->len);
-	bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]);
+	bucket = dlm_lockres_hash(dlm, q->hash);
 
 
 	/* get a reference for our hashtable */
 	/* get a reference for our hashtable */
 	dlm_lockres_get(res);
 	dlm_lockres_get(res);
@@ -100,34 +125,32 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
 }
 }
 
 
 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
-					 const char *name,
-					 unsigned int len)
+						const char *name,
+						unsigned int len,
+						unsigned int hash)
 {
 {
-	unsigned int hash;
-	struct hlist_node *iter;
-	struct dlm_lock_resource *tmpres=NULL;
 	struct hlist_head *bucket;
 	struct hlist_head *bucket;
+	struct hlist_node *list;
 
 
 	mlog_entry("%.*s\n", len, name);
 	mlog_entry("%.*s\n", len, name);
 
 
 	assert_spin_locked(&dlm->spinlock);
 	assert_spin_locked(&dlm->spinlock);
 
 
-	hash = full_name_hash(name, len);
-
-	bucket = &(dlm->lockres_hash[hash % DLM_HASH_BUCKETS]);
-
-	/* check for pre-existing lock */
-	hlist_for_each(iter, bucket) {
-		tmpres = hlist_entry(iter, struct dlm_lock_resource, hash_node);
-		if (tmpres->lockname.len == len &&
-		    memcmp(tmpres->lockname.name, name, len) == 0) {
-			dlm_lockres_get(tmpres);
-			break;
-		}
+	bucket = dlm_lockres_hash(dlm, hash);
 
 
-		tmpres = NULL;
+	hlist_for_each(list, bucket) {
+		struct dlm_lock_resource *res = hlist_entry(list,
+			struct dlm_lock_resource, hash_node);
+		if (res->lockname.name[0] != name[0])
+			continue;
+		if (unlikely(res->lockname.len != len))
+			continue;
+		if (memcmp(res->lockname.name + 1, name + 1, len - 1))
+			continue;
+		dlm_lockres_get(res);
+		return res;
 	}
 	}
-	return tmpres;
+	return NULL;
 }
 }
 
 
 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
@@ -135,9 +158,10 @@ struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 				    unsigned int len)
 				    unsigned int len)
 {
 {
 	struct dlm_lock_resource *res;
 	struct dlm_lock_resource *res;
+	unsigned int hash = dlm_lockid_hash(name, len);
 
 
 	spin_lock(&dlm->spinlock);
 	spin_lock(&dlm->spinlock);
-	res = __dlm_lookup_lockres(dlm, name, len);
+	res = __dlm_lookup_lockres(dlm, name, len, hash);
 	spin_unlock(&dlm->spinlock);
 	spin_unlock(&dlm->spinlock);
 	return res;
 	return res;
 }
 }
@@ -194,7 +218,7 @@ static int dlm_wait_on_domain_helper(const char *domain)
 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
 {
 {
 	if (dlm->lockres_hash)
 	if (dlm->lockres_hash)
-		free_page((unsigned long) dlm->lockres_hash);
+		dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
 
 
 	if (dlm->name)
 	if (dlm->name)
 		kfree(dlm->name);
 		kfree(dlm->name);
@@ -278,11 +302,21 @@ int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
 	return ret;
 	return ret;
 }
 }
 
 
+static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
+{
+	if (dlm->dlm_worker) {
+		flush_workqueue(dlm->dlm_worker);
+		destroy_workqueue(dlm->dlm_worker);
+		dlm->dlm_worker = NULL;
+	}
+}
+
 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
 {
 {
 	dlm_unregister_domain_handlers(dlm);
 	dlm_unregister_domain_handlers(dlm);
 	dlm_complete_thread(dlm);
 	dlm_complete_thread(dlm);
 	dlm_complete_recovery_thread(dlm);
 	dlm_complete_recovery_thread(dlm);
+	dlm_destroy_dlm_worker(dlm);
 
 
 	/* We've left the domain. Now we can take ourselves out of the
 	/* We've left the domain. Now we can take ourselves out of the
 	 * list and allow the kref stuff to help us free the
 	 * list and allow the kref stuff to help us free the
@@ -304,8 +338,8 @@ static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
 restart:
 restart:
 	spin_lock(&dlm->spinlock);
 	spin_lock(&dlm->spinlock);
 	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
 	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
-		while (!hlist_empty(&dlm->lockres_hash[i])) {
-			res = hlist_entry(dlm->lockres_hash[i].first,
+		while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
+			res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
 					  struct dlm_lock_resource, hash_node);
 					  struct dlm_lock_resource, hash_node);
 			/* need reference when manually grabbing lockres */
 			/* need reference when manually grabbing lockres */
 			dlm_lockres_get(res);
 			dlm_lockres_get(res);
@@ -1126,6 +1160,13 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
 		goto bail;
 		goto bail;
 	}
 	}
 
 
+	dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
+	if (!dlm->dlm_worker) {
+		status = -ENOMEM;
+		mlog_errno(status);
+		goto bail;
+	}
+
 	do {
 	do {
 		unsigned int backoff;
 		unsigned int backoff;
 		status = dlm_try_to_join_domain(dlm);
 		status = dlm_try_to_join_domain(dlm);
@@ -1166,6 +1207,7 @@ bail:
 		dlm_unregister_domain_handlers(dlm);
 		dlm_unregister_domain_handlers(dlm);
 		dlm_complete_thread(dlm);
 		dlm_complete_thread(dlm);
 		dlm_complete_recovery_thread(dlm);
 		dlm_complete_recovery_thread(dlm);
+		dlm_destroy_dlm_worker(dlm);
 	}
 	}
 
 
 	return status;
 	return status;
@@ -1191,7 +1233,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
 		goto leave;
 		goto leave;
 	}
 	}
 
 
-	dlm->lockres_hash = (struct hlist_head *) __get_free_page(GFP_KERNEL);
+	dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
 	if (!dlm->lockres_hash) {
 	if (!dlm->lockres_hash) {
 		mlog_errno(-ENOMEM);
 		mlog_errno(-ENOMEM);
 		kfree(dlm->name);
 		kfree(dlm->name);
@@ -1200,8 +1242,8 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
 		goto leave;
 		goto leave;
 	}
 	}
 
 
-	for (i=0; i<DLM_HASH_BUCKETS; i++)
-		INIT_HLIST_HEAD(&dlm->lockres_hash[i]);
+	for (i = 0; i < DLM_HASH_BUCKETS; i++)
+		INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
 
 
 	strcpy(dlm->name, domain);
 	strcpy(dlm->name, domain);
 	dlm->key = key;
 	dlm->key = key;
@@ -1231,6 +1273,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
 
 
 	dlm->dlm_thread_task = NULL;
 	dlm->dlm_thread_task = NULL;
 	dlm->dlm_reco_thread_task = NULL;
 	dlm->dlm_reco_thread_task = NULL;
+	dlm->dlm_worker = NULL;
 	init_waitqueue_head(&dlm->dlm_thread_wq);
 	init_waitqueue_head(&dlm->dlm_thread_wq);
 	init_waitqueue_head(&dlm->dlm_reco_thread_wq);
 	init_waitqueue_head(&dlm->dlm_reco_thread_wq);
 	init_waitqueue_head(&dlm->reco.event);
 	init_waitqueue_head(&dlm->reco.event);

+ 3 - 3
fs/ocfs2/dlm/dlmfs.c

@@ -116,7 +116,7 @@ static int dlmfs_file_open(struct inode *inode,
 	 * doesn't make sense for LVB writes. */
 	 * doesn't make sense for LVB writes. */
 	file->f_flags &= ~O_APPEND;
 	file->f_flags &= ~O_APPEND;
 
 
-	fp = kmalloc(sizeof(*fp), GFP_KERNEL);
+	fp = kmalloc(sizeof(*fp), GFP_NOFS);
 	if (!fp) {
 	if (!fp) {
 		status = -ENOMEM;
 		status = -ENOMEM;
 		goto bail;
 		goto bail;
@@ -196,7 +196,7 @@ static ssize_t dlmfs_file_read(struct file *filp,
 	else
 	else
 		readlen = count - *ppos;
 		readlen = count - *ppos;
 
 
-	lvb_buf = kmalloc(readlen, GFP_KERNEL);
+	lvb_buf = kmalloc(readlen, GFP_NOFS);
 	if (!lvb_buf)
 	if (!lvb_buf)
 		return -ENOMEM;
 		return -ENOMEM;
 
 
@@ -240,7 +240,7 @@ static ssize_t dlmfs_file_write(struct file *filp,
 	else
 	else
 		writelen = count - *ppos;
 		writelen = count - *ppos;
 
 
-	lvb_buf = kmalloc(writelen, GFP_KERNEL);
+	lvb_buf = kmalloc(writelen, GFP_NOFS);
 	if (!lvb_buf)
 	if (!lvb_buf)
 		return -ENOMEM;
 		return -ENOMEM;
 
 

+ 52 - 16
fs/ocfs2/dlm/dlmlock.c

@@ -201,6 +201,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
 				      struct dlm_lock *lock, int flags)
 				      struct dlm_lock *lock, int flags)
 {
 {
 	enum dlm_status status = DLM_DENIED;
 	enum dlm_status status = DLM_DENIED;
+	int lockres_changed = 1;
 
 
 	mlog_entry("type=%d\n", lock->ml.type);
 	mlog_entry("type=%d\n", lock->ml.type);
 	mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len,
 	mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len,
@@ -226,8 +227,25 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 	lock->lock_pending = 0;
 	lock->lock_pending = 0;
 	if (status != DLM_NORMAL) {
 	if (status != DLM_NORMAL) {
-		if (status != DLM_NOTQUEUED)
+		if (status == DLM_RECOVERING &&
+		    dlm_is_recovery_lock(res->lockname.name,
+					 res->lockname.len)) {
+			/* recovery lock was mastered by dead node.
+			 * we need to have calc_usage shoot down this
+			 * lockres and completely remaster it. */
+			mlog(0, "%s: recovery lock was owned by "
+			     "dead node %u, remaster it now.\n",
+			     dlm->name, res->owner);
+		} else if (status != DLM_NOTQUEUED) {
+			/*
+			 * DO NOT call calc_usage, as this would unhash
+			 * the remote lockres before we ever get to use
+			 * it.  treat as if we never made any change to
+			 * the lockres.
+			 */
+			lockres_changed = 0;
 			dlm_error(status);
 			dlm_error(status);
+		}
 		dlm_revert_pending_lock(res, lock);
 		dlm_revert_pending_lock(res, lock);
 		dlm_lock_put(lock);
 		dlm_lock_put(lock);
 	} else if (dlm_is_recovery_lock(res->lockname.name, 
 	} else if (dlm_is_recovery_lock(res->lockname.name, 
@@ -243,7 +261,8 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
 	}
 	}
 	spin_unlock(&res->spinlock);
 	spin_unlock(&res->spinlock);
 
 
-	dlm_lockres_calc_usage(dlm, res);
+	if (lockres_changed)
+		dlm_lockres_calc_usage(dlm, res);
 
 
 	wake_up(&res->wq);
 	wake_up(&res->wq);
 	return status;
 	return status;
@@ -280,6 +299,14 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
 	if (tmpret >= 0) {
 	if (tmpret >= 0) {
 		// successfully sent and received
 		// successfully sent and received
 		ret = status;  // this is already a dlm_status
 		ret = status;  // this is already a dlm_status
+		if (ret == DLM_REJECTED) {
+			mlog(ML_ERROR, "%s:%.*s: BUG.  this is a stale lockres "
+			     "no longer owned by %u.  that node is coming back "
+			     "up currently.\n", dlm->name, create.namelen,
+			     create.name, res->owner);
+			dlm_print_one_lock_resource(res);
+			BUG();
+		}
 	} else {
 	} else {
 		mlog_errno(tmpret);
 		mlog_errno(tmpret);
 		if (dlm_is_host_down(tmpret)) {
 		if (dlm_is_host_down(tmpret)) {
@@ -381,13 +408,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
 	struct dlm_lock *lock;
 	struct dlm_lock *lock;
 	int kernel_allocated = 0;
 	int kernel_allocated = 0;
 
 
-	lock = kcalloc(1, sizeof(*lock), GFP_KERNEL);
+	lock = kcalloc(1, sizeof(*lock), GFP_NOFS);
 	if (!lock)
 	if (!lock)
 		return NULL;
 		return NULL;
 
 
 	if (!lksb) {
 	if (!lksb) {
 		/* zero memory only if kernel-allocated */
 		/* zero memory only if kernel-allocated */
-		lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL);
+		lksb = kcalloc(1, sizeof(*lksb), GFP_NOFS);
 		if (!lksb) {
 		if (!lksb) {
 			kfree(lock);
 			kfree(lock);
 			return NULL;
 			return NULL;
@@ -428,11 +455,16 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
 	if (!dlm_grab(dlm))
 	if (!dlm_grab(dlm))
 		return DLM_REJECTED;
 		return DLM_REJECTED;
 
 
-	mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
-			"Domain %s not fully joined!\n", dlm->name);
-
 	name = create->name;
 	name = create->name;
 	namelen = create->namelen;
 	namelen = create->namelen;
+	status = DLM_REJECTED;
+	if (!dlm_domain_fully_joined(dlm)) {
+		mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
+		     "sending a create_lock message for lock %.*s!\n",
+		     dlm->name, create->node_idx, namelen, name);
+		dlm_error(status);
+		goto leave;
+	}
 
 
 	status = DLM_IVBUFLEN;
 	status = DLM_IVBUFLEN;
 	if (namelen > DLM_LOCKID_NAME_MAX) {
 	if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -668,18 +700,22 @@ retry_lock:
 			msleep(100);
 			msleep(100);
 			/* no waiting for dlm_reco_thread */
 			/* no waiting for dlm_reco_thread */
 			if (recovery) {
 			if (recovery) {
-				if (status == DLM_RECOVERING) {
-					mlog(0, "%s: got RECOVERING "
-					     "for $REOCVERY lock, master "
-					     "was %u\n", dlm->name, 
-					     res->owner);
-					dlm_wait_for_node_death(dlm, res->owner, 
-							DLM_NODE_DEATH_WAIT_MAX);
-				}
+				if (status != DLM_RECOVERING)
+					goto retry_lock;
+
+				mlog(0, "%s: got RECOVERING "
+				     "for $RECOVERY lock, master "
+				     "was %u\n", dlm->name,
+				     res->owner);
+				/* wait to see the node go down, then
+				 * drop down and allow the lockres to
+				 * get cleaned up.  need to remaster. */
+				dlm_wait_for_node_death(dlm, res->owner,
+						DLM_NODE_DEATH_WAIT_MAX);
 			} else {
 			} else {
 				dlm_wait_for_recovery(dlm);
 				dlm_wait_for_recovery(dlm);
+				goto retry_lock;
 			}
 			}
-			goto retry_lock;
 		}
 		}
 
 
 		if (status != DLM_NORMAL) {
 		if (status != DLM_NORMAL) {

+ 309 - 139
fs/ocfs2/dlm/dlmmaster.c

@@ -47,7 +47,6 @@
 
 
 #include "dlmapi.h"
 #include "dlmapi.h"
 #include "dlmcommon.h"
 #include "dlmcommon.h"
-#include "dlmdebug.h"
 #include "dlmdomain.h"
 #include "dlmdomain.h"
 
 
 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
@@ -74,6 +73,7 @@ struct dlm_master_list_entry
 	wait_queue_head_t wq;
 	wait_queue_head_t wq;
 	atomic_t woken;
 	atomic_t woken;
 	struct kref mle_refs;
 	struct kref mle_refs;
+	int inuse;
 	unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
@@ -127,18 +127,30 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
 	return 1;
 	return 1;
 }
 }
 
 
-#if 0
-/* Code here is included but defined out as it aids debugging */
+#define dlm_print_nodemap(m)  _dlm_print_nodemap(m,#m)
+static void _dlm_print_nodemap(unsigned long *map, const char *mapname)
+{
+	int i;
+	printk("%s=[ ", mapname);
+	for (i=0; i<O2NM_MAX_NODES; i++)
+		if (test_bit(i, map))
+			printk("%d ", i);
+	printk("]");
+}
 
 
-void dlm_print_one_mle(struct dlm_master_list_entry *mle)
+static void dlm_print_one_mle(struct dlm_master_list_entry *mle)
 {
 {
-	int i = 0, refs;
+	int refs;
 	char *type;
 	char *type;
 	char attached;
 	char attached;
 	u8 master;
 	u8 master;
 	unsigned int namelen;
 	unsigned int namelen;
 	const char *name;
 	const char *name;
 	struct kref *k;
 	struct kref *k;
+	unsigned long *maybe = mle->maybe_map,
+		      *vote = mle->vote_map,
+		      *resp = mle->response_map,
+		      *node = mle->node_map;
 
 
 	k = &mle->mle_refs;
 	k = &mle->mle_refs;
 	if (mle->type == DLM_MLE_BLOCK)
 	if (mle->type == DLM_MLE_BLOCK)
@@ -159,18 +171,29 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle)
 		name = mle->u.res->lockname.name;
 		name = mle->u.res->lockname.name;
 	}
 	}
 
 
-	mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %3u %c    (%d)%.*s\n",
-		  i, type, refs, master, mle->new_master, attached,
-		  namelen, namelen, name);
+	mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
+		  namelen, name, type, refs, master, mle->new_master, attached,
+		  mle->inuse);
+	dlm_print_nodemap(maybe);
+	printk(", ");
+	dlm_print_nodemap(vote);
+	printk(", ");
+	dlm_print_nodemap(resp);
+	printk(", ");
+	dlm_print_nodemap(node);
+	printk(", ");
+	printk("\n");
 }
 }
 
 
+#if 0
+/* Code here is included but defined out as it aids debugging */
+
 static void dlm_dump_mles(struct dlm_ctxt *dlm)
 static void dlm_dump_mles(struct dlm_ctxt *dlm)
 {
 {
 	struct dlm_master_list_entry *mle;
 	struct dlm_master_list_entry *mle;
 	struct list_head *iter;
 	struct list_head *iter;
 	
 	
 	mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
 	mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
-	mlog(ML_NOTICE, "  ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
 	spin_lock(&dlm->master_lock);
 	spin_lock(&dlm->master_lock);
 	list_for_each(iter, &dlm->master_list) {
 	list_for_each(iter, &dlm->master_list) {
 		mle = list_entry(iter, struct dlm_master_list_entry, list);
 		mle = list_entry(iter, struct dlm_master_list_entry, list);
@@ -314,6 +337,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
 	spin_unlock(&dlm->spinlock);
 	spin_unlock(&dlm->spinlock);
 }
 }
 
 
+static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
+{
+	struct dlm_ctxt *dlm;
+	dlm = mle->dlm;
+
+	assert_spin_locked(&dlm->spinlock);
+	assert_spin_locked(&dlm->master_lock);
+	mle->inuse++;
+	kref_get(&mle->mle_refs);
+}
+
+static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
+{
+	struct dlm_ctxt *dlm;
+	dlm = mle->dlm;
+
+	spin_lock(&dlm->spinlock);
+	spin_lock(&dlm->master_lock);
+	mle->inuse--;
+	__dlm_put_mle(mle);
+	spin_unlock(&dlm->master_lock);
+	spin_unlock(&dlm->spinlock);
+
+}
+
 /* remove from list and free */
 /* remove from list and free */
 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 {
 {
@@ -322,9 +370,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 
 
 	assert_spin_locked(&dlm->spinlock);
 	assert_spin_locked(&dlm->spinlock);
 	assert_spin_locked(&dlm->master_lock);
 	assert_spin_locked(&dlm->master_lock);
-	BUG_ON(!atomic_read(&mle->mle_refs.refcount));
-
-	kref_put(&mle->mle_refs, dlm_mle_release);
+	if (!atomic_read(&mle->mle_refs.refcount)) {
+		/* this may or may not crash, but who cares.
+		 * it's a BUG. */
+		mlog(ML_ERROR, "bad mle: %p\n", mle);
+		dlm_print_one_mle(mle);
+		BUG();
+	} else
+		kref_put(&mle->mle_refs, dlm_mle_release);
 }
 }
 
 
 
 
@@ -367,6 +420,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
 	memset(mle->response_map, 0, sizeof(mle->response_map));
 	memset(mle->response_map, 0, sizeof(mle->response_map));
 	mle->master = O2NM_MAX_NODES;
 	mle->master = O2NM_MAX_NODES;
 	mle->new_master = O2NM_MAX_NODES;
 	mle->new_master = O2NM_MAX_NODES;
+	mle->inuse = 0;
 
 
 	if (mle->type == DLM_MLE_MASTER) {
 	if (mle->type == DLM_MLE_MASTER) {
 		BUG_ON(!res);
 		BUG_ON(!res);
@@ -564,6 +618,28 @@ static void dlm_lockres_release(struct kref *kref)
 	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
 	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
 	     res->lockname.name);
 	     res->lockname.name);
 
 
+	if (!hlist_unhashed(&res->hash_node) ||
+	    !list_empty(&res->granted) ||
+	    !list_empty(&res->converting) ||
+	    !list_empty(&res->blocked) ||
+	    !list_empty(&res->dirty) ||
+	    !list_empty(&res->recovering) ||
+	    !list_empty(&res->purge)) {
+		mlog(ML_ERROR,
+		     "Going to BUG for resource %.*s."
+		     "  We're on a list! [%c%c%c%c%c%c%c]\n",
+		     res->lockname.len, res->lockname.name,
+		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
+		     !list_empty(&res->granted) ? 'G' : ' ',
+		     !list_empty(&res->converting) ? 'C' : ' ',
+		     !list_empty(&res->blocked) ? 'B' : ' ',
+		     !list_empty(&res->dirty) ? 'D' : ' ',
+		     !list_empty(&res->recovering) ? 'R' : ' ',
+		     !list_empty(&res->purge) ? 'P' : ' ');
+
+		dlm_print_one_lock_resource(res);
+	}
+
 	/* By the time we're ready to blow this guy away, we shouldn't
 	/* By the time we're ready to blow this guy away, we shouldn't
 	 * be on any lists. */
 	 * be on any lists. */
 	BUG_ON(!hlist_unhashed(&res->hash_node));
 	BUG_ON(!hlist_unhashed(&res->hash_node));
@@ -579,11 +655,6 @@ static void dlm_lockres_release(struct kref *kref)
 	kfree(res);
 	kfree(res);
 }
 }
 
 
-void dlm_lockres_get(struct dlm_lock_resource *res)
-{
-	kref_get(&res->refs);
-}
-
 void dlm_lockres_put(struct dlm_lock_resource *res)
 void dlm_lockres_put(struct dlm_lock_resource *res)
 {
 {
 	kref_put(&res->refs, dlm_lockres_release);
 	kref_put(&res->refs, dlm_lockres_release);
@@ -603,7 +674,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
 	memcpy(qname, name, namelen);
 	memcpy(qname, name, namelen);
 
 
 	res->lockname.len = namelen;
 	res->lockname.len = namelen;
-	res->lockname.hash = full_name_hash(name, namelen);
+	res->lockname.hash = dlm_lockid_hash(name, namelen);
 
 
 	init_waitqueue_head(&res->wq);
 	init_waitqueue_head(&res->wq);
 	spin_lock_init(&res->spinlock);
 	spin_lock_init(&res->spinlock);
@@ -637,11 +708,11 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
 {
 {
 	struct dlm_lock_resource *res;
 	struct dlm_lock_resource *res;
 
 
-	res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
+	res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS);
 	if (!res)
 	if (!res)
 		return NULL;
 		return NULL;
 
 
-	res->lockname.name = kmalloc(namelen, GFP_KERNEL);
+	res->lockname.name = kmalloc(namelen, GFP_NOFS);
 	if (!res->lockname.name) {
 	if (!res->lockname.name) {
 		kfree(res);
 		kfree(res);
 		return NULL;
 		return NULL;
@@ -677,19 +748,20 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
 	int blocked = 0;
 	int blocked = 0;
 	int ret, nodenum;
 	int ret, nodenum;
 	struct dlm_node_iter iter;
 	struct dlm_node_iter iter;
-	unsigned int namelen;
+	unsigned int namelen, hash;
 	int tries = 0;
 	int tries = 0;
 	int bit, wait_on_recovery = 0;
 	int bit, wait_on_recovery = 0;
 
 
 	BUG_ON(!lockid);
 	BUG_ON(!lockid);
 
 
 	namelen = strlen(lockid);
 	namelen = strlen(lockid);
+	hash = dlm_lockid_hash(lockid, namelen);
 
 
 	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
 	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
 
 
 lookup:
 lookup:
 	spin_lock(&dlm->spinlock);
 	spin_lock(&dlm->spinlock);
-	tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);
+	tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
 	if (tmpres) {
 	if (tmpres) {
 		spin_unlock(&dlm->spinlock);
 		spin_unlock(&dlm->spinlock);
 		mlog(0, "found in hash!\n");
 		mlog(0, "found in hash!\n");
@@ -704,7 +776,7 @@ lookup:
 		mlog(0, "allocating a new resource\n");
 		mlog(0, "allocating a new resource\n");
 		/* nothing found and we need to allocate one. */
 		/* nothing found and we need to allocate one. */
 		alloc_mle = (struct dlm_master_list_entry *)
 		alloc_mle = (struct dlm_master_list_entry *)
-			kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
+			kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
 		if (!alloc_mle)
 		if (!alloc_mle)
 			goto leave;
 			goto leave;
 		res = dlm_new_lockres(dlm, lockid, namelen);
 		res = dlm_new_lockres(dlm, lockid, namelen);
@@ -790,10 +862,11 @@ lookup:
 	 * if so, the creator of the BLOCK may try to put the last
 	 * if so, the creator of the BLOCK may try to put the last
 	 * ref at this time in the assert master handler, so we
 	 * ref at this time in the assert master handler, so we
 	 * need an extra one to keep from a bad ptr deref. */
 	 * need an extra one to keep from a bad ptr deref. */
-	dlm_get_mle(mle);
+	dlm_get_mle_inuse(mle);
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->spinlock);
 	spin_unlock(&dlm->spinlock);
 
 
+redo_request:
 	while (wait_on_recovery) {
 	while (wait_on_recovery) {
 		/* any cluster changes that occurred after dropping the
 		/* any cluster changes that occurred after dropping the
 		 * dlm spinlock would be detectable be a change on the mle,
 		 * dlm spinlock would be detectable be a change on the mle,
@@ -812,7 +885,7 @@ lookup:
 		} 
 		} 
 
 
 		dlm_kick_recovery_thread(dlm);
 		dlm_kick_recovery_thread(dlm);
-		msleep(100);
+		msleep(1000);
 		dlm_wait_for_recovery(dlm);
 		dlm_wait_for_recovery(dlm);
 
 
 		spin_lock(&dlm->spinlock);
 		spin_lock(&dlm->spinlock);
@@ -825,13 +898,15 @@ lookup:
 		} else
 		} else
 			wait_on_recovery = 0;
 			wait_on_recovery = 0;
 		spin_unlock(&dlm->spinlock);
 		spin_unlock(&dlm->spinlock);
+
+		if (wait_on_recovery)
+			dlm_wait_for_node_recovery(dlm, bit, 10000);
 	}
 	}
 
 
 	/* must wait for lock to be mastered elsewhere */
 	/* must wait for lock to be mastered elsewhere */
 	if (blocked)
 	if (blocked)
 		goto wait;
 		goto wait;
 
 
-redo_request:
 	ret = -EINVAL;
 	ret = -EINVAL;
 	dlm_node_iter_init(mle->vote_map, &iter);
 	dlm_node_iter_init(mle->vote_map, &iter);
 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
@@ -856,6 +931,7 @@ wait:
 	/* keep going until the response map includes all nodes */
 	/* keep going until the response map includes all nodes */
 	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
 	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
 	if (ret < 0) {
 	if (ret < 0) {
+		wait_on_recovery = 1;
 		mlog(0, "%s:%.*s: node map changed, redo the "
 		mlog(0, "%s:%.*s: node map changed, redo the "
 		     "master request now, blocked=%d\n",
 		     "master request now, blocked=%d\n",
 		     dlm->name, res->lockname.len,
 		     dlm->name, res->lockname.len,
@@ -866,7 +942,7 @@ wait:
 			     dlm->name, res->lockname.len, 
 			     dlm->name, res->lockname.len, 
 			     res->lockname.name, blocked);
 			     res->lockname.name, blocked);
 			dlm_print_one_lock_resource(res);
 			dlm_print_one_lock_resource(res);
-			/* dlm_print_one_mle(mle); */
+			dlm_print_one_mle(mle);
 			tries = 0;
 			tries = 0;
 		}
 		}
 		goto redo_request;
 		goto redo_request;
@@ -880,7 +956,7 @@ wait:
 	dlm_mle_detach_hb_events(dlm, mle);
 	dlm_mle_detach_hb_events(dlm, mle);
 	dlm_put_mle(mle);
 	dlm_put_mle(mle);
 	/* put the extra ref */
 	/* put the extra ref */
-	dlm_put_mle(mle);
+	dlm_put_mle_inuse(mle);
 
 
 wake_waiters:
 wake_waiters:
 	spin_lock(&res->spinlock);
 	spin_lock(&res->spinlock);
@@ -921,12 +997,14 @@ recheck:
 		spin_unlock(&res->spinlock);
 		spin_unlock(&res->spinlock);
 		/* this will cause the master to re-assert across
 		/* this will cause the master to re-assert across
 		 * the whole cluster, freeing up mles */
 		 * the whole cluster, freeing up mles */
-		ret = dlm_do_master_request(mle, res->owner);
-		if (ret < 0) {
-			/* give recovery a chance to run */
-			mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
-			msleep(500);
-			goto recheck;
+		if (res->owner != dlm->node_num) {
+			ret = dlm_do_master_request(mle, res->owner);
+			if (ret < 0) {
+				/* give recovery a chance to run */
+				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
+				msleep(500);
+				goto recheck;
+			}
 		}
 		}
 		ret = 0;
 		ret = 0;
 		goto leave;
 		goto leave;
@@ -962,6 +1040,12 @@ recheck:
 		     "rechecking now\n", dlm->name, res->lockname.len,
 		     "rechecking now\n", dlm->name, res->lockname.len,
 		     res->lockname.name);
 		     res->lockname.name);
 		goto recheck;
 		goto recheck;
+	} else {
+		if (!voting_done) {
+			mlog(0, "map not changed and voting not done "
+			     "for %s:%.*s\n", dlm->name, res->lockname.len,
+			     res->lockname.name);
+		}
 	}
 	}
 
 
 	if (m != O2NM_MAX_NODES) {
 	if (m != O2NM_MAX_NODES) {
@@ -1129,18 +1213,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
 			set_bit(node, mle->vote_map);
 			set_bit(node, mle->vote_map);
 		} else {
 		} else {
 			mlog(ML_ERROR, "node down! %d\n", node);
 			mlog(ML_ERROR, "node down! %d\n", node);
-
-			/* if the node wasn't involved in mastery skip it,
-			 * but clear it out from the maps so that it will
-			 * not affect mastery of this lockres */
-			clear_bit(node, mle->response_map);
-			clear_bit(node, mle->vote_map);
-			if (!test_bit(node, mle->maybe_map))
-				goto next;
-
-			/* if we're already blocked on lock mastery, and the
-			 * dead node wasn't the expected master, or there is
-			 * another node in the maybe_map, keep waiting */
 			if (blocked) {
 			if (blocked) {
 				int lowest = find_next_bit(mle->maybe_map,
 				int lowest = find_next_bit(mle->maybe_map,
 						       O2NM_MAX_NODES, 0);
 						       O2NM_MAX_NODES, 0);
@@ -1148,54 +1220,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
 				/* act like it was never there */
 				/* act like it was never there */
 				clear_bit(node, mle->maybe_map);
 				clear_bit(node, mle->maybe_map);
 
 
-			       	if (node != lowest)
-					goto next;
-
-				mlog(ML_ERROR, "expected master %u died while "
-				     "this node was blocked waiting on it!\n",
-				     node);
-				lowest = find_next_bit(mle->maybe_map,
-						       O2NM_MAX_NODES,
-						       lowest+1);
-				if (lowest < O2NM_MAX_NODES) {
-					mlog(0, "still blocked. waiting "
-					     "on %u now\n", lowest);
-					goto next;
+			       	if (node == lowest) {
+					mlog(0, "expected master %u died"
+					    " while this node was blocked "
+					    "waiting on it!\n", node);
+					lowest = find_next_bit(mle->maybe_map,
+						       	O2NM_MAX_NODES,
+						       	lowest+1);
+					if (lowest < O2NM_MAX_NODES) {
+						mlog(0, "%s:%.*s:still "
+						     "blocked. waiting on %u "
+						     "now\n", dlm->name,
+						     res->lockname.len,
+						     res->lockname.name,
+						     lowest);
+					} else {
+						/* mle is an MLE_BLOCK, but
+						 * there is now nothing left to
+						 * block on.  we need to return
+						 * all the way back out and try
+						 * again with an MLE_MASTER.
+						 * dlm_do_local_recovery_cleanup
+						 * has already run, so the mle
+						 * refcount is ok */
+						mlog(0, "%s:%.*s: no "
+						     "longer blocking. try to "
+						     "master this here\n",
+						     dlm->name,
+						     res->lockname.len,
+						     res->lockname.name);
+						mle->type = DLM_MLE_MASTER;
+						mle->u.res = res;
+					}
 				}
 				}
-
-				/* mle is an MLE_BLOCK, but there is now
-				 * nothing left to block on.  we need to return
-				 * all the way back out and try again with
-				 * an MLE_MASTER. dlm_do_local_recovery_cleanup
-				 * has already run, so the mle refcount is ok */
-				mlog(0, "no longer blocking. we can "
-				     "try to master this here\n");
-				mle->type = DLM_MLE_MASTER;
-				memset(mle->maybe_map, 0,
-				       sizeof(mle->maybe_map));
-				memset(mle->response_map, 0,
-				       sizeof(mle->maybe_map));
-				memcpy(mle->vote_map, mle->node_map,
-				       sizeof(mle->node_map));
-				mle->u.res = res;
-				set_bit(dlm->node_num, mle->maybe_map);
-
-				ret = -EAGAIN;
-				goto next;
 			}
 			}
 
 
-			clear_bit(node, mle->maybe_map);
-			if (node > dlm->node_num)
-				goto next;
-
-			mlog(0, "dead node in map!\n");
-			/* yuck. go back and re-contact all nodes
-			 * in the vote_map, removing this node. */
-			memset(mle->response_map, 0,
-			       sizeof(mle->response_map));
+			/* now blank out everything, as if we had never
+			 * contacted anyone */
+			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
+			memset(mle->response_map, 0, sizeof(mle->response_map));
+			/* reset the vote_map to the current node_map */
+			memcpy(mle->vote_map, mle->node_map,
+			       sizeof(mle->node_map));
+			/* put myself into the maybe map */
+			if (mle->type != DLM_MLE_BLOCK)
+				set_bit(dlm->node_num, mle->maybe_map);
 		}
 		}
 		ret = -EAGAIN;
 		ret = -EAGAIN;
-next:
 		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
 		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
 	}
 	}
 	return ret;
 	return ret;
@@ -1316,7 +1387,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
 	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
 	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
 	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
 	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
 	char *name;
 	char *name;
-	unsigned int namelen;
+	unsigned int namelen, hash;
 	int found, ret;
 	int found, ret;
 	int set_maybe;
 	int set_maybe;
 	int dispatch_assert = 0;
 	int dispatch_assert = 0;
@@ -1331,6 +1402,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
 
 
 	name = request->name;
 	name = request->name;
 	namelen = request->namelen;
 	namelen = request->namelen;
+	hash = dlm_lockid_hash(name, namelen);
 
 
 	if (namelen > DLM_LOCKID_NAME_MAX) {
 	if (namelen > DLM_LOCKID_NAME_MAX) {
 		response = DLM_IVBUFLEN;
 		response = DLM_IVBUFLEN;
@@ -1339,7 +1411,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
 
 
 way_up_top:
 way_up_top:
 	spin_lock(&dlm->spinlock);
 	spin_lock(&dlm->spinlock);
-	res = __dlm_lookup_lockres(dlm, name, namelen);
+	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
 	if (res) {
 	if (res) {
 		spin_unlock(&dlm->spinlock);
 		spin_unlock(&dlm->spinlock);
 
 
@@ -1459,21 +1531,18 @@ way_up_top:
 			spin_unlock(&dlm->spinlock);
 			spin_unlock(&dlm->spinlock);
 
 
 			mle = (struct dlm_master_list_entry *)
 			mle = (struct dlm_master_list_entry *)
-				kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
+				kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
 			if (!mle) {
 			if (!mle) {
 				response = DLM_MASTER_RESP_ERROR;
 				response = DLM_MASTER_RESP_ERROR;
 				mlog_errno(-ENOMEM);
 				mlog_errno(-ENOMEM);
 				goto send_response;
 				goto send_response;
 			}
 			}
-			spin_lock(&dlm->spinlock);
-			dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
-					 name, namelen);
-			spin_unlock(&dlm->spinlock);
 			goto way_up_top;
 			goto way_up_top;
 		}
 		}
 
 
 		// mlog(0, "this is second time thru, already allocated, "
 		// mlog(0, "this is second time thru, already allocated, "
 		// "add the block.\n");
 		// "add the block.\n");
+		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
 		set_bit(request->node_idx, mle->maybe_map);
 		set_bit(request->node_idx, mle->maybe_map);
 		list_add(&mle->list, &dlm->master_list);
 		list_add(&mle->list, &dlm->master_list);
 		response = DLM_MASTER_RESP_NO;
 		response = DLM_MASTER_RESP_NO;
@@ -1556,6 +1625,8 @@ again:
 	dlm_node_iter_init(nodemap, &iter);
 	dlm_node_iter_init(nodemap, &iter);
 	while ((to = dlm_node_iter_next(&iter)) >= 0) {
 	while ((to = dlm_node_iter_next(&iter)) >= 0) {
 		int r = 0;
 		int r = 0;
+		struct dlm_master_list_entry *mle = NULL;
+
 		mlog(0, "sending assert master to %d (%.*s)\n", to,
 		mlog(0, "sending assert master to %d (%.*s)\n", to,
 		     namelen, lockname);
 		     namelen, lockname);
 		memset(&assert, 0, sizeof(assert));
 		memset(&assert, 0, sizeof(assert));
@@ -1567,20 +1638,28 @@ again:
 		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
 		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
 					    &assert, sizeof(assert), to, &r);
 					    &assert, sizeof(assert), to, &r);
 		if (tmpret < 0) {
 		if (tmpret < 0) {
-			mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
+			mlog(0, "assert_master returned %d!\n", tmpret);
 			if (!dlm_is_host_down(tmpret)) {
 			if (!dlm_is_host_down(tmpret)) {
-				mlog(ML_ERROR, "unhandled error!\n");
+				mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
 				BUG();
 				BUG();
 			}
 			}
 			/* a node died.  finish out the rest of the nodes. */
 			/* a node died.  finish out the rest of the nodes. */
-			mlog(ML_ERROR, "link to %d went down!\n", to);
+			mlog(0, "link to %d went down!\n", to);
 			/* any nonzero status return will do */
 			/* any nonzero status return will do */
 			ret = tmpret;
 			ret = tmpret;
 		} else if (r < 0) {
 		} else if (r < 0) {
 			/* ok, something horribly messed.  kill thyself. */
 			/* ok, something horribly messed.  kill thyself. */
 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
 			     "got %d.\n", namelen, lockname, to, r);
 			     "got %d.\n", namelen, lockname, to, r);
-			dlm_dump_lock_resources(dlm);
+			spin_lock(&dlm->spinlock);
+			spin_lock(&dlm->master_lock);
+			if (dlm_find_mle(dlm, &mle, (char *)lockname,
+					 namelen)) {
+				dlm_print_one_mle(mle);
+				__dlm_put_mle(mle);
+			}
+			spin_unlock(&dlm->master_lock);
+			spin_unlock(&dlm->spinlock);
 			BUG();
 			BUG();
 		} else if (r == EAGAIN) {
 		} else if (r == EAGAIN) {
 			mlog(0, "%.*s: node %u create mles on other "
 			mlog(0, "%.*s: node %u create mles on other "
@@ -1612,7 +1691,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
 	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
 	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
 	struct dlm_lock_resource *res = NULL;
 	struct dlm_lock_resource *res = NULL;
 	char *name;
 	char *name;
-	unsigned int namelen;
+	unsigned int namelen, hash;
 	u32 flags;
 	u32 flags;
 	int master_request = 0;
 	int master_request = 0;
 	int ret = 0;
 	int ret = 0;
@@ -1622,6 +1701,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
 
 
 	name = assert->name;
 	name = assert->name;
 	namelen = assert->namelen;
 	namelen = assert->namelen;
+	hash = dlm_lockid_hash(name, namelen);
 	flags = be32_to_cpu(assert->flags);
 	flags = be32_to_cpu(assert->flags);
 
 
 	if (namelen > DLM_LOCKID_NAME_MAX) {
 	if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -1646,7 +1726,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
 		if (bit >= O2NM_MAX_NODES) {
 		if (bit >= O2NM_MAX_NODES) {
 			/* not necessarily an error, though less likely.
 			/* not necessarily an error, though less likely.
 			 * could be master just re-asserting. */
 			 * could be master just re-asserting. */
-			mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
+			mlog(0, "no bits set in the maybe_map, but %u "
 			     "is asserting! (%.*s)\n", assert->node_idx,
 			     "is asserting! (%.*s)\n", assert->node_idx,
 			     namelen, name);
 			     namelen, name);
 		} else if (bit != assert->node_idx) {
 		} else if (bit != assert->node_idx) {
@@ -1658,19 +1738,36 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
 				 * number winning the mastery will respond
 				 * number winning the mastery will respond
 				 * YES to mastery requests, but this node
 				 * YES to mastery requests, but this node
 				 * had no way of knowing.  let it pass. */
 				 * had no way of knowing.  let it pass. */
-				mlog(ML_ERROR, "%u is the lowest node, "
+				mlog(0, "%u is the lowest node, "
 				     "%u is asserting. (%.*s)  %u must "
 				     "%u is asserting. (%.*s)  %u must "
 				     "have begun after %u won.\n", bit,
 				     "have begun after %u won.\n", bit,
 				     assert->node_idx, namelen, name, bit,
 				     assert->node_idx, namelen, name, bit,
 				     assert->node_idx);
 				     assert->node_idx);
 			}
 			}
 		}
 		}
+		if (mle->type == DLM_MLE_MIGRATION) {
+			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
+				mlog(0, "%s:%.*s: got cleanup assert"
+				     " from %u for migration\n",
+				     dlm->name, namelen, name,
+				     assert->node_idx);
+			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
+				mlog(0, "%s:%.*s: got unrelated assert"
+				     " from %u for migration, ignoring\n",
+				     dlm->name, namelen, name,
+				     assert->node_idx);
+				__dlm_put_mle(mle);
+				spin_unlock(&dlm->master_lock);
+				spin_unlock(&dlm->spinlock);
+				goto done;
+			}	
+		}
 	}
 	}
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->master_lock);
 
 
 	/* ok everything checks out with the MLE
 	/* ok everything checks out with the MLE
 	 * now check to see if there is a lockres */
 	 * now check to see if there is a lockres */
-	res = __dlm_lookup_lockres(dlm, name, namelen);
+	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
 	if (res) {
 	if (res) {
 		spin_lock(&res->spinlock);
 		spin_lock(&res->spinlock);
 		if (res->state & DLM_LOCK_RES_RECOVERING)  {
 		if (res->state & DLM_LOCK_RES_RECOVERING)  {
@@ -1679,7 +1776,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
 			goto kill;
 			goto kill;
 		}
 		}
 		if (!mle) {
 		if (!mle) {
-			if (res->owner != assert->node_idx) {
+			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
+			    res->owner != assert->node_idx) {
 				mlog(ML_ERROR, "assert_master from "
 				mlog(ML_ERROR, "assert_master from "
 					  "%u, but current owner is "
 					  "%u, but current owner is "
 					  "%u! (%.*s)\n",
 					  "%u! (%.*s)\n",
@@ -1732,6 +1830,7 @@ ok:
 	if (mle) {
 	if (mle) {
 		int extra_ref = 0;
 		int extra_ref = 0;
 		int nn = -1;
 		int nn = -1;
+		int rr, err = 0;
 		
 		
 		spin_lock(&mle->spinlock);
 		spin_lock(&mle->spinlock);
 		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
 		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
@@ -1751,27 +1850,64 @@ ok:
 		wake_up(&mle->wq);
 		wake_up(&mle->wq);
 		spin_unlock(&mle->spinlock);
 		spin_unlock(&mle->spinlock);
 
 
-		if (mle->type == DLM_MLE_MIGRATION && res) {
-			mlog(0, "finishing off migration of lockres %.*s, "
-			     "from %u to %u\n",
-			       res->lockname.len, res->lockname.name,
-			       dlm->node_num, mle->new_master);
+		if (res) {
 			spin_lock(&res->spinlock);
 			spin_lock(&res->spinlock);
-			res->state &= ~DLM_LOCK_RES_MIGRATING;
-			dlm_change_lockres_owner(dlm, res, mle->new_master);
-			BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
+			if (mle->type == DLM_MLE_MIGRATION) {
+				mlog(0, "finishing off migration of lockres %.*s, "
+			     		"from %u to %u\n",
+			       		res->lockname.len, res->lockname.name,
+			       		dlm->node_num, mle->new_master);
+				res->state &= ~DLM_LOCK_RES_MIGRATING;
+				dlm_change_lockres_owner(dlm, res, mle->new_master);
+				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
+			} else {
+				dlm_change_lockres_owner(dlm, res, mle->master);
+			}
 			spin_unlock(&res->spinlock);
 			spin_unlock(&res->spinlock);
 		}
 		}
-		/* master is known, detach if not already detached */
-		dlm_mle_detach_hb_events(dlm, mle);
-		dlm_put_mle(mle);
-		
+
+		/* master is known, detach if not already detached.
+		 * ensures that only one assert_master call will happen
+		 * on this mle. */
+		spin_lock(&dlm->spinlock);
+		spin_lock(&dlm->master_lock);
+
+		rr = atomic_read(&mle->mle_refs.refcount);
+		if (mle->inuse > 0) {
+			if (extra_ref && rr < 3)
+				err = 1;
+			else if (!extra_ref && rr < 2)
+				err = 1;
+		} else {
+			if (extra_ref && rr < 2)
+				err = 1;
+			else if (!extra_ref && rr < 1)
+				err = 1;
+		}
+		if (err) {
+			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
+			     "that will mess up this node, refs=%d, extra=%d, "
+			     "inuse=%d\n", dlm->name, namelen, name,
+			     assert->node_idx, rr, extra_ref, mle->inuse);
+			dlm_print_one_mle(mle);
+		}
+		list_del_init(&mle->list);
+		__dlm_mle_detach_hb_events(dlm, mle);
+		__dlm_put_mle(mle);
 		if (extra_ref) {
 		if (extra_ref) {
 			/* the assert master message now balances the extra
 			/* the assert master message now balances the extra
 		 	 * ref given by the master / migration request message.
 		 	 * ref given by the master / migration request message.
 		 	 * if this is the last put, it will be removed
 		 	 * if this is the last put, it will be removed
 		 	 * from the list. */
 		 	 * from the list. */
-			dlm_put_mle(mle);
+			__dlm_put_mle(mle);
+		}
+		spin_unlock(&dlm->master_lock);
+		spin_unlock(&dlm->spinlock);
+	} else if (res) {
+		if (res->owner != assert->node_idx) {
+			mlog(0, "assert_master from %u, but current "
+			     "owner is %u (%.*s), no mle\n", assert->node_idx,
+			     res->owner, namelen, name);
 		}
 		}
 	}
 	}
 
 
@@ -1788,12 +1924,12 @@ done:
 
 
 kill:
 kill:
 	/* kill the caller! */
 	/* kill the caller! */
+	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
+	     "and killing the other node now!  This node is OK and can continue.\n");
+	__dlm_print_one_lock_resource(res);
 	spin_unlock(&res->spinlock);
 	spin_unlock(&res->spinlock);
 	spin_unlock(&dlm->spinlock);
 	spin_unlock(&dlm->spinlock);
 	dlm_lockres_put(res);
 	dlm_lockres_put(res);
-	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
-	     "and killing the other node now!  This node is OK and can continue.\n");
-	dlm_dump_lock_resources(dlm);
 	dlm_put(dlm);
 	dlm_put(dlm);
 	return -EINVAL;
 	return -EINVAL;
 }
 }
@@ -1803,7 +1939,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
 			       int ignore_higher, u8 request_from, u32 flags)
 			       int ignore_higher, u8 request_from, u32 flags)
 {
 {
 	struct dlm_work_item *item;
 	struct dlm_work_item *item;
-	item = kcalloc(1, sizeof(*item), GFP_KERNEL);
+	item = kcalloc(1, sizeof(*item), GFP_NOFS);
 	if (!item)
 	if (!item)
 		return -ENOMEM;
 		return -ENOMEM;
 
 
@@ -1825,7 +1961,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
 	list_add_tail(&item->list, &dlm->work_list);
 	list_add_tail(&item->list, &dlm->work_list);
 	spin_unlock(&dlm->work_lock);
 	spin_unlock(&dlm->work_lock);
 
 
-	schedule_work(&dlm->dispatched_work);
+	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
 	return 0;
 	return 0;
 }
 }
 
 
@@ -1866,6 +2002,23 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
 		}
 		}
 	}
 	}
 
 
+	/*
+	 * If we're migrating this lock to someone else, we are no
+	 * longer allowed to assert out own mastery.  OTOH, we need to
+	 * prevent migration from starting while we're still asserting
+	 * our dominance.  The reserved ast delays migration.
+	 */
+	spin_lock(&res->spinlock);
+	if (res->state & DLM_LOCK_RES_MIGRATING) {
+		mlog(0, "Someone asked us to assert mastery, but we're "
+		     "in the middle of migration.  Skipping assert, "
+		     "the new master will handle that.\n");
+		spin_unlock(&res->spinlock);
+		goto put;
+	} else
+		__dlm_lockres_reserve_ast(res);
+	spin_unlock(&res->spinlock);
+
 	/* this call now finishes out the nodemap
 	/* this call now finishes out the nodemap
 	 * even if one or more nodes die */
 	 * even if one or more nodes die */
 	mlog(0, "worker about to master %.*s here, this=%u\n",
 	mlog(0, "worker about to master %.*s here, this=%u\n",
@@ -1875,9 +2028,14 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
 				   nodemap, flags);
 				   nodemap, flags);
 	if (ret < 0) {
 	if (ret < 0) {
 		/* no need to restart, we are done */
 		/* no need to restart, we are done */
-		mlog_errno(ret);
+		if (!dlm_is_host_down(ret))
+			mlog_errno(ret);
 	}
 	}
 
 
+	/* Ok, we've asserted ourselves.  Let's let migration start. */
+	dlm_lockres_release_ast(dlm, res);
+
+put:
 	dlm_lockres_put(res);
 	dlm_lockres_put(res);
 
 
 	mlog(0, "finished with dlm_assert_master_worker\n");
 	mlog(0, "finished with dlm_assert_master_worker\n");
@@ -1916,6 +2074,7 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
 				BUG();
 				BUG();
 			/* host is down, so answer for that node would be
 			/* host is down, so answer for that node would be
 			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
 			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
+			ret = 0;
 		}
 		}
 
 
 		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
 		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
@@ -2016,14 +2175,14 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
 	 */
 	 */
 
 
 	ret = -ENOMEM;
 	ret = -ENOMEM;
-	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
+	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
 	if (!mres) {
 	if (!mres) {
 		mlog_errno(ret);
 		mlog_errno(ret);
 		goto leave;
 		goto leave;
 	}
 	}
 
 
 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
-								GFP_KERNEL);
+								GFP_NOFS);
 	if (!mle) {
 	if (!mle) {
 		mlog_errno(ret);
 		mlog_errno(ret);
 		goto leave;
 		goto leave;
@@ -2117,7 +2276,7 @@ fail:
 	 * take both dlm->spinlock and dlm->master_lock */
 	 * take both dlm->spinlock and dlm->master_lock */
 	spin_lock(&dlm->spinlock);
 	spin_lock(&dlm->spinlock);
 	spin_lock(&dlm->master_lock);
 	spin_lock(&dlm->master_lock);
-	dlm_get_mle(mle);
+	dlm_get_mle_inuse(mle);
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->spinlock);
 	spin_unlock(&dlm->spinlock);
 
 
@@ -2134,7 +2293,10 @@ fail:
 		/* migration failed, detach and clean up mle */
 		/* migration failed, detach and clean up mle */
 		dlm_mle_detach_hb_events(dlm, mle);
 		dlm_mle_detach_hb_events(dlm, mle);
 		dlm_put_mle(mle);
 		dlm_put_mle(mle);
-		dlm_put_mle(mle);
+		dlm_put_mle_inuse(mle);
+		spin_lock(&res->spinlock);
+		res->state &= ~DLM_LOCK_RES_MIGRATING;
+		spin_unlock(&res->spinlock);
 		goto leave;
 		goto leave;
 	}
 	}
 
 
@@ -2164,8 +2326,8 @@ fail:
 			/* avoid hang during shutdown when migrating lockres 
 			/* avoid hang during shutdown when migrating lockres 
 			 * to a node which also goes down */
 			 * to a node which also goes down */
 			if (dlm_is_node_dead(dlm, target)) {
 			if (dlm_is_node_dead(dlm, target)) {
-				mlog(0, "%s:%.*s: expected migration target %u "
-				     "is no longer up.  restarting.\n",
+				mlog(0, "%s:%.*s: expected migration "
+				     "target %u is no longer up, restarting\n",
 				     dlm->name, res->lockname.len,
 				     dlm->name, res->lockname.len,
 				     res->lockname.name, target);
 				     res->lockname.name, target);
 				ret = -ERESTARTSYS;
 				ret = -ERESTARTSYS;
@@ -2175,7 +2337,10 @@ fail:
 			/* migration failed, detach and clean up mle */
 			/* migration failed, detach and clean up mle */
 			dlm_mle_detach_hb_events(dlm, mle);
 			dlm_mle_detach_hb_events(dlm, mle);
 			dlm_put_mle(mle);
 			dlm_put_mle(mle);
-			dlm_put_mle(mle);
+			dlm_put_mle_inuse(mle);
+			spin_lock(&res->spinlock);
+			res->state &= ~DLM_LOCK_RES_MIGRATING;
+			spin_unlock(&res->spinlock);
 			goto leave;
 			goto leave;
 		}
 		}
 		/* TODO: if node died: stop, clean up, return error */
 		/* TODO: if node died: stop, clean up, return error */
@@ -2191,7 +2356,7 @@ fail:
 
 
 	/* master is known, detach if not already detached */
 	/* master is known, detach if not already detached */
 	dlm_mle_detach_hb_events(dlm, mle);
 	dlm_mle_detach_hb_events(dlm, mle);
-	dlm_put_mle(mle);
+	dlm_put_mle_inuse(mle);
 	ret = 0;
 	ret = 0;
 
 
 	dlm_lockres_calc_usage(dlm, res);
 	dlm_lockres_calc_usage(dlm, res);
@@ -2462,7 +2627,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
 	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
 	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
 	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
 	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
 	const char *name;
 	const char *name;
-	unsigned int namelen;
+	unsigned int namelen, hash;
 	int ret = 0;
 	int ret = 0;
 
 
 	if (!dlm_grab(dlm))
 	if (!dlm_grab(dlm))
@@ -2470,10 +2635,11 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
 
 
 	name = migrate->name;
 	name = migrate->name;
 	namelen = migrate->namelen;
 	namelen = migrate->namelen;
+	hash = dlm_lockid_hash(name, namelen);
 
 
 	/* preallocate.. if this fails, abort */
 	/* preallocate.. if this fails, abort */
 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
-							 GFP_KERNEL);
+							 GFP_NOFS);
 
 
 	if (!mle) {
 	if (!mle) {
 		ret = -ENOMEM;
 		ret = -ENOMEM;
@@ -2482,7 +2648,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
 
 
 	/* check for pre-existing lock */
 	/* check for pre-existing lock */
 	spin_lock(&dlm->spinlock);
 	spin_lock(&dlm->spinlock);
-	res = __dlm_lookup_lockres(dlm, name, namelen);
+	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
 	spin_lock(&dlm->master_lock);
 	spin_lock(&dlm->master_lock);
 
 
 	if (res) {
 	if (res) {
@@ -2580,6 +2746,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
 			/* remove it from the list so that only one
 			/* remove it from the list so that only one
 			 * mle will be found */
 			 * mle will be found */
 			list_del_init(&tmp->list);
 			list_del_init(&tmp->list);
+			__dlm_mle_detach_hb_events(dlm, mle);
 		}
 		}
 		spin_unlock(&tmp->spinlock);
 		spin_unlock(&tmp->spinlock);
 	}
 	}
@@ -2601,6 +2768,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
 	struct list_head *iter, *iter2;
 	struct list_head *iter, *iter2;
 	struct dlm_master_list_entry *mle;
 	struct dlm_master_list_entry *mle;
 	struct dlm_lock_resource *res;
 	struct dlm_lock_resource *res;
+	unsigned int hash;
 
 
 	mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
 	mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
 top:
 top:
@@ -2640,7 +2808,7 @@ top:
 				 * may result in the mle being unlinked and
 				 * may result in the mle being unlinked and
 				 * freed, but there may still be a process
 				 * freed, but there may still be a process
 				 * waiting in the dlmlock path which is fine. */
 				 * waiting in the dlmlock path which is fine. */
-				mlog(ML_ERROR, "node %u was expected master\n",
+				mlog(0, "node %u was expected master\n",
 				     dead_node);
 				     dead_node);
 				atomic_set(&mle->woken, 1);
 				atomic_set(&mle->woken, 1);
 				spin_unlock(&mle->spinlock);
 				spin_unlock(&mle->spinlock);
@@ -2673,19 +2841,21 @@ top:
 
 
 		/* remove from the list early.  NOTE: unlinking
 		/* remove from the list early.  NOTE: unlinking
 		 * list_head while in list_for_each_safe */
 		 * list_head while in list_for_each_safe */
+		__dlm_mle_detach_hb_events(dlm, mle);
 		spin_lock(&mle->spinlock);
 		spin_lock(&mle->spinlock);
 		list_del_init(&mle->list);
 		list_del_init(&mle->list);
 		atomic_set(&mle->woken, 1);
 		atomic_set(&mle->woken, 1);
 		spin_unlock(&mle->spinlock);
 		spin_unlock(&mle->spinlock);
 		wake_up(&mle->wq);
 		wake_up(&mle->wq);
 
 
-		mlog(0, "node %u died during migration from "
-		     "%u to %u!\n", dead_node,
+		mlog(0, "%s: node %u died during migration from "
+		     "%u to %u!\n", dlm->name, dead_node,
 		     mle->master, mle->new_master);
 		     mle->master, mle->new_master);
 		/* if there is a lockres associated with this
 		/* if there is a lockres associated with this
 	 	 * mle, find it and set its owner to UNKNOWN */
 	 	 * mle, find it and set its owner to UNKNOWN */
+		hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len);
 		res = __dlm_lookup_lockres(dlm, mle->u.name.name,
 		res = __dlm_lookup_lockres(dlm, mle->u.name.name,
-					mle->u.name.len);
+					   mle->u.name.len, hash);
 		if (res) {
 		if (res) {
 			/* unfortunately if we hit this rare case, our
 			/* unfortunately if we hit this rare case, our
 		 	 * lock ordering is messed.  we need to drop
 		 	 * lock ordering is messed.  we need to drop

文件差异内容过多而无法显示
+ 416 - 126
fs/ocfs2/dlm/dlmrecovery.c


+ 62 - 6
fs/ocfs2/dlm/dlmthread.c

@@ -39,6 +39,7 @@
 #include <linux/inet.h>
 #include <linux/inet.h>
 #include <linux/timer.h>
 #include <linux/timer.h>
 #include <linux/kthread.h>
 #include <linux/kthread.h>
+#include <linux/delay.h>
 
 
 
 
 #include "cluster/heartbeat.h"
 #include "cluster/heartbeat.h"
@@ -53,6 +54,8 @@
 #include "cluster/masklog.h"
 #include "cluster/masklog.h"
 
 
 static int dlm_thread(void *data);
 static int dlm_thread(void *data);
+static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
+				  struct dlm_lock_resource *lockres);
 
 
 static void dlm_flush_asts(struct dlm_ctxt *dlm);
 static void dlm_flush_asts(struct dlm_ctxt *dlm);
 
 
@@ -80,7 +83,7 @@ repeat:
 }
 }
 
 
 
 
-static int __dlm_lockres_unused(struct dlm_lock_resource *res)
+int __dlm_lockres_unused(struct dlm_lock_resource *res)
 {
 {
 	if (list_empty(&res->granted) &&
 	if (list_empty(&res->granted) &&
 	    list_empty(&res->converting) &&
 	    list_empty(&res->converting) &&
@@ -103,6 +106,20 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 	assert_spin_locked(&res->spinlock);
 	assert_spin_locked(&res->spinlock);
 
 
 	if (__dlm_lockres_unused(res)){
 	if (__dlm_lockres_unused(res)){
+		/* For now, just keep any resource we master */
+		if (res->owner == dlm->node_num)
+		{
+			if (!list_empty(&res->purge)) {
+				mlog(0, "we master %s:%.*s, but it is on "
+				     "the purge list.  Removing\n",
+				     dlm->name, res->lockname.len,
+				     res->lockname.name);
+				list_del_init(&res->purge);
+				dlm->purge_count--;
+			}
+			return;
+		}
+
 		if (list_empty(&res->purge)) {
 		if (list_empty(&res->purge)) {
 			mlog(0, "putting lockres %.*s from purge list\n",
 			mlog(0, "putting lockres %.*s from purge list\n",
 			     res->lockname.len, res->lockname.name);
 			     res->lockname.len, res->lockname.name);
@@ -110,10 +127,23 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 			res->last_used = jiffies;
 			res->last_used = jiffies;
 			list_add_tail(&res->purge, &dlm->purge_list);
 			list_add_tail(&res->purge, &dlm->purge_list);
 			dlm->purge_count++;
 			dlm->purge_count++;
+
+			/* if this node is not the owner, there is
+			 * no way to keep track of who the owner could be.
+			 * unhash it to avoid serious problems. */
+			if (res->owner != dlm->node_num) {
+				mlog(0, "%s:%.*s: doing immediate "
+				     "purge of lockres owned by %u\n",
+				     dlm->name, res->lockname.len,
+				     res->lockname.name, res->owner);
+
+				dlm_purge_lockres_now(dlm, res);
+			}
 		}
 		}
 	} else if (!list_empty(&res->purge)) {
 	} else if (!list_empty(&res->purge)) {
-		mlog(0, "removing lockres %.*s from purge list\n",
-		     res->lockname.len, res->lockname.name);
+		mlog(0, "removing lockres %.*s from purge list, "
+		     "owner=%u\n", res->lockname.len, res->lockname.name,
+		     res->owner);
 
 
 		list_del_init(&res->purge);
 		list_del_init(&res->purge);
 		dlm->purge_count--;
 		dlm->purge_count--;
@@ -165,6 +195,7 @@ again:
 	} else if (ret < 0) {
 	} else if (ret < 0) {
 		mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
 		mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
 		     lockres->lockname.len, lockres->lockname.name);
 		     lockres->lockname.len, lockres->lockname.name);
+		msleep(100);
 		goto again;
 		goto again;
 	}
 	}
 
 
@@ -178,6 +209,24 @@ finish:
 	__dlm_unhash_lockres(lockres);
 	__dlm_unhash_lockres(lockres);
 }
 }
 
 
+/* make an unused lockres go away immediately.
+ * as soon as the dlm spinlock is dropped, this lockres
+ * will not be found. kfree still happens on last put. */
+static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
+				  struct dlm_lock_resource *lockres)
+{
+	assert_spin_locked(&dlm->spinlock);
+	assert_spin_locked(&lockres->spinlock);
+
+	BUG_ON(!__dlm_lockres_unused(lockres));
+
+	if (!list_empty(&lockres->purge)) {
+		list_del_init(&lockres->purge);
+		dlm->purge_count--;
+	}
+	__dlm_unhash_lockres(lockres);
+}
+
 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
 			       int purge_now)
 			       int purge_now)
 {
 {
@@ -420,6 +469,8 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 	/* don't shuffle secondary queues */
 	/* don't shuffle secondary queues */
 	if ((res->owner == dlm->node_num) &&
 	if ((res->owner == dlm->node_num) &&
 	    !(res->state & DLM_LOCK_RES_DIRTY)) {
 	    !(res->state & DLM_LOCK_RES_DIRTY)) {
+		/* ref for dirty_list */
+		dlm_lockres_get(res);
 		list_add_tail(&res->dirty, &dlm->dirty_list);
 		list_add_tail(&res->dirty, &dlm->dirty_list);
 		res->state |= DLM_LOCK_RES_DIRTY;
 		res->state |= DLM_LOCK_RES_DIRTY;
 	}
 	}
@@ -604,6 +655,8 @@ static int dlm_thread(void *data)
 			list_del_init(&res->dirty);
 			list_del_init(&res->dirty);
 			spin_unlock(&res->spinlock);
 			spin_unlock(&res->spinlock);
 			spin_unlock(&dlm->spinlock);
 			spin_unlock(&dlm->spinlock);
+			/* Drop dirty_list ref */
+			dlm_lockres_put(res);
 
 
 		 	/* lockres can be re-dirtied/re-added to the
 		 	/* lockres can be re-dirtied/re-added to the
 			 * dirty_list in this gap, but that is ok */
 			 * dirty_list in this gap, but that is ok */
@@ -640,8 +693,9 @@ static int dlm_thread(void *data)
 			 * spinlock and do NOT have the dlm lock.
 			 * spinlock and do NOT have the dlm lock.
 			 * safe to reserve/queue asts and run the lists. */
 			 * safe to reserve/queue asts and run the lists. */
 
 
-			mlog(0, "calling dlm_shuffle_lists with dlm=%p, "
-			     "res=%p\n", dlm, res);
+			mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
+			     "res=%.*s\n", dlm->name,
+			     res->lockname.len, res->lockname.name);
 
 
 			/* called while holding lockres lock */
 			/* called while holding lockres lock */
 			dlm_shuffle_lists(dlm, res);
 			dlm_shuffle_lists(dlm, res);
@@ -655,6 +709,8 @@ in_progress:
 			/* if the lock was in-progress, stick
 			/* if the lock was in-progress, stick
 			 * it on the back of the list */
 			 * it on the back of the list */
 			if (delay) {
 			if (delay) {
+				/* ref for dirty_list */
+				dlm_lockres_get(res);
 				spin_lock(&res->spinlock);
 				spin_lock(&res->spinlock);
 				list_add_tail(&res->dirty, &dlm->dirty_list);
 				list_add_tail(&res->dirty, &dlm->dirty_list);
 				res->state |= DLM_LOCK_RES_DIRTY;
 				res->state |= DLM_LOCK_RES_DIRTY;
@@ -675,7 +731,7 @@ in_progress:
 
 
 		/* yield and continue right away if there is more work to do */
 		/* yield and continue right away if there is more work to do */
 		if (!n) {
 		if (!n) {
-			yield();
+			cond_resched();
 			continue;
 			continue;
 		}
 		}
 
 

+ 10 - 0
fs/ocfs2/dlm/dlmunlock.c

@@ -318,6 +318,16 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
 
 
 	mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
 	mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
 
 
+	if (owner == dlm->node_num) {
+		/* ended up trying to contact ourself.  this means
+		 * that the lockres had been remote but became local
+		 * via a migration.  just retry it, now as local */
+		mlog(0, "%s:%.*s: this node became the master due to a "
+		     "migration, re-evaluate now\n", dlm->name,
+		     res->lockname.len, res->lockname.name);
+		return DLM_FORWARD;
+	}
+
 	memset(&unlock, 0, sizeof(unlock));
 	memset(&unlock, 0, sizeof(unlock));
 	unlock.node_idx = dlm->node_num;
 	unlock.node_idx = dlm->node_num;
 	unlock.flags = cpu_to_be32(flags);
 	unlock.flags = cpu_to_be32(flags);

+ 1 - 1
fs/ocfs2/dlm/userdlm.c

@@ -672,7 +672,7 @@ struct dlm_ctxt *user_dlm_register_context(struct qstr *name)
 	u32 dlm_key;
 	u32 dlm_key;
 	char *domain;
 	char *domain;
 
 
-	domain = kmalloc(name->len + 1, GFP_KERNEL);
+	domain = kmalloc(name->len + 1, GFP_NOFS);
 	if (!domain) {
 	if (!domain) {
 		mlog_errno(-ENOMEM);
 		mlog_errno(-ENOMEM);
 		return ERR_PTR(-ENOMEM);
 		return ERR_PTR(-ENOMEM);

部分文件因为文件数量过多而无法显示