Bladeren bron

Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2

* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2: (48 commits)
  ocfs2: Avoid to evaluate xattr block flags again.
  ocfs2/cluster: Release debugfs file elapsed_time_in_ms
  ocfs2: Add a mount option "coherency=*" to handle cluster coherency for O_DIRECT writes.
  Initialize max_slots early
  When I tried to compile I got the following warning: fs/ocfs2/slot_map.c: In function ‘ocfs2_init_slot_info’: fs/ocfs2/slot_map.c:360: warning: ‘bytes’ may be used uninitialized in this function fs/ocfs2/slot_map.c:360: note: ‘bytes’ was declared here Compiler: gcc version 4.4.3 (GCC) on Mandriva I'm not sure why this warning occurs, I think compiler don't know that variable "bytes" is initialized when it is sent by reference to ocfs2_slot_map_physical_size and it throws that ugly warning. However, a simple initialization of "bytes" variable with 0 will fix it.
  ocfs2: validate bg_free_bits_count after update
  ocfs2/cluster: Bump up dlm protocol to version 1.1
  ocfs2/cluster: Show per region heartbeat elapsed time
  ocfs2/cluster: Add mlogs for heartbeat up/down events
  ocfs2/cluster: Create debugfs dir/files for each region
  ocfs2/cluster: Create debugfs files for live, quorum and failed region bitmaps
  ocfs2/cluster: Maintain bitmap of failed regions
  ocfs2/cluster: Maintain bitmap of quorum regions
  ocfs2/cluster: Track bitmap of live heartbeat regions
  ocfs2/cluster: Track number of global heartbeat regions
  ocfs2/cluster: Maintain live node bitmap per heartbeat region
  ocfs2/cluster: Reorganize o2hb debugfs init
  ocfs2/cluster: Check slots for unconfigured live nodes
  ocfs2/cluster: Print messages when adding/removing nodes
  ocfs2/cluster: Print messages when adding/removing heartbeat regions
  ...
Linus Torvalds 14 jaren geleden
bovenliggende
commit
f3270b16e0

+ 7 - 0
Documentation/filesystems/ocfs2.txt

@@ -87,3 +87,10 @@ dir_resv_level=	(*)	By default, directory reservations will scale with file
 			reservations - users should rarely need to change this
 			value. If allocation reservations are turned off, this
 			option will have no effect.
+coherency=full  (*)	Disallow concurrent O_DIRECT writes, cluster inode
+			lock will be taken to force other nodes drop cache,
+			therefore full cluster coherency is guaranteed even
+			for O_DIRECT writes.
+coherency=buffered	Allow concurrent O_DIRECT writes without EX lock among
+			nodes, which gains high performance at risk of getting
+			stale data on other nodes.

+ 2 - 2
fs/ext3/super.c

@@ -1849,8 +1849,8 @@ static int ext3_fill_super (struct super_block *sb, void *data, int silent)
 		goto failed_mount;
 	}
 
-	if (le32_to_cpu(es->s_blocks_count) >
-		    (sector_t)(~0ULL) >> (sb->s_blocksize_bits - 9)) {
+	if (generic_check_addressable(sb->s_blocksize_bits,
+				      le32_to_cpu(es->s_blocks_count))) {
 		ext3_msg(sb, KERN_ERR,
 			"error: filesystem is too large to mount safely");
 		if (sizeof(sector_t) < 8)

+ 3 - 5
fs/ext4/super.c

@@ -2831,15 +2831,13 @@ static int ext4_fill_super(struct super_block *sb, void *data, int silent)
 	 * Test whether we have more sectors than will fit in sector_t,
 	 * and whether the max offset is addressable by the page cache.
 	 */
-	if ((ext4_blocks_count(es) >
-	     (sector_t)(~0ULL) >> (sb->s_blocksize_bits - 9)) ||
-	    (ext4_blocks_count(es) >
-	     (pgoff_t)(~0ULL) >> (PAGE_CACHE_SHIFT - sb->s_blocksize_bits))) {
+	ret = generic_check_addressable(sb->s_blocksize_bits,
+					ext4_blocks_count(es));
+	if (ret) {
 		ext4_msg(sb, KERN_ERR, "filesystem"
 			 " too large to mount safely on this system");
 		if (sizeof(sector_t) < 8)
 			ext4_msg(sb, KERN_WARNING, "CONFIG_LBDAF not enabled");
-		ret = -EFBIG;
 		goto failed_mount;
 	}
 

+ 4 - 0
fs/jbd2/journal.c

@@ -1371,6 +1371,10 @@ int jbd2_journal_check_used_features (journal_t *journal, unsigned long compat,
 
 	if (!compat && !ro && !incompat)
 		return 1;
+	/* Load journal superblock if it is not loaded yet. */
+	if (journal->j_format_version == 0 &&
+	    journal_get_superblock(journal) != 0)
+		return 0;
 	if (journal->j_format_version == 1)
 		return 0;
 

+ 29 - 0
fs/libfs.c

@@ -913,6 +913,35 @@ int generic_file_fsync(struct file *file, int datasync)
 }
 EXPORT_SYMBOL(generic_file_fsync);
 
+/**
+ * generic_check_addressable - Check addressability of file system
+ * @blocksize_bits:	log of file system block size
+ * @num_blocks:		number of blocks in file system
+ *
+ * Determine whether a file system with @num_blocks blocks (and a
+ * block size of 2**@blocksize_bits) is addressable by the sector_t
+ * and page cache of the system.  Return 0 if so and -EFBIG otherwise.
+ */
+int generic_check_addressable(unsigned blocksize_bits, u64 num_blocks)
+{
+	u64 last_fs_block = num_blocks - 1;
+	u64 last_fs_page =
+		last_fs_block >> (PAGE_CACHE_SHIFT - blocksize_bits);
+
+	if (unlikely(num_blocks == 0))
+		return 0;
+
+	if ((blocksize_bits < 9) || (blocksize_bits > PAGE_CACHE_SHIFT))
+		return -EINVAL;
+
+	if ((last_fs_block > (sector_t)(~0ULL) >> (blocksize_bits - 9)) ||
+	    (last_fs_page > (pgoff_t)(~0ULL))) {
+		return -EFBIG;
+	}
+	return 0;
+}
+EXPORT_SYMBOL(generic_check_addressable);
+
 /*
  * No-op implementation of ->fsync for in-memory filesystems.
  */

+ 5 - 4
fs/ocfs2/aops.c

@@ -883,8 +883,8 @@ struct ocfs2_write_ctxt {
 	 * out in so that future reads from that region will get
 	 * zero's.
 	 */
-	struct page			*w_pages[OCFS2_MAX_CTXT_PAGES];
 	unsigned int			w_num_pages;
+	struct page			*w_pages[OCFS2_MAX_CTXT_PAGES];
 	struct page			*w_target_page;
 
 	/*
@@ -1642,7 +1642,8 @@ static int ocfs2_zero_tail(struct inode *inode, struct buffer_head *di_bh,
 	return ret;
 }
 
-int ocfs2_write_begin_nolock(struct address_space *mapping,
+int ocfs2_write_begin_nolock(struct file *filp,
+			     struct address_space *mapping,
 			     loff_t pos, unsigned len, unsigned flags,
 			     struct page **pagep, void **fsdata,
 			     struct buffer_head *di_bh, struct page *mmap_page)
@@ -1692,7 +1693,7 @@ int ocfs2_write_begin_nolock(struct address_space *mapping,
 		mlog_errno(ret);
 		goto out;
 	} else if (ret == 1) {
-		ret = ocfs2_refcount_cow(inode, di_bh,
+		ret = ocfs2_refcount_cow(inode, filp, di_bh,
 					 wc->w_cpos, wc->w_clen, UINT_MAX);
 		if (ret) {
 			mlog_errno(ret);
@@ -1854,7 +1855,7 @@ static int ocfs2_write_begin(struct file *file, struct address_space *mapping,
 	 */
 	down_write(&OCFS2_I(inode)->ip_alloc_sem);
 
-	ret = ocfs2_write_begin_nolock(mapping, pos, len, flags, pagep,
+	ret = ocfs2_write_begin_nolock(file, mapping, pos, len, flags, pagep,
 				       fsdata, di_bh, NULL);
 	if (ret) {
 		mlog_errno(ret);

+ 2 - 1
fs/ocfs2/aops.h

@@ -48,7 +48,8 @@ int ocfs2_write_end_nolock(struct address_space *mapping,
 			   loff_t pos, unsigned len, unsigned copied,
 			   struct page *page, void *fsdata);
 
-int ocfs2_write_begin_nolock(struct address_space *mapping,
+int ocfs2_write_begin_nolock(struct file *filp,
+			     struct address_space *mapping,
 			     loff_t pos, unsigned len, unsigned flags,
 			     struct page **pagep, void **fsdata,
 			     struct buffer_head *di_bh, struct page *mmap_page);

+ 501 - 31
fs/ocfs2/cluster/heartbeat.c

@@ -62,10 +62,51 @@ static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 static LIST_HEAD(o2hb_node_events);
 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
 
+/*
+ * In global heartbeat, we maintain a series of region bitmaps.
+ * 	- o2hb_region_bitmap allows us to limit the region number to max region.
+ * 	- o2hb_live_region_bitmap tracks live regions (seen steady iterations).
+ * 	- o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
+ * 		heartbeat on it.
+ * 	- o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
+ */
+static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
+static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
+static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
+static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
+
+#define O2HB_DB_TYPE_LIVENODES		0
+#define O2HB_DB_TYPE_LIVEREGIONS	1
+#define O2HB_DB_TYPE_QUORUMREGIONS	2
+#define O2HB_DB_TYPE_FAILEDREGIONS	3
+#define O2HB_DB_TYPE_REGION_LIVENODES	4
+#define O2HB_DB_TYPE_REGION_NUMBER	5
+#define O2HB_DB_TYPE_REGION_ELAPSED_TIME	6
+struct o2hb_debug_buf {
+	int db_type;
+	int db_size;
+	int db_len;
+	void *db_data;
+};
+
+static struct o2hb_debug_buf *o2hb_db_livenodes;
+static struct o2hb_debug_buf *o2hb_db_liveregions;
+static struct o2hb_debug_buf *o2hb_db_quorumregions;
+static struct o2hb_debug_buf *o2hb_db_failedregions;
+
 #define O2HB_DEBUG_DIR			"o2hb"
 #define O2HB_DEBUG_LIVENODES		"livenodes"
+#define O2HB_DEBUG_LIVEREGIONS		"live_regions"
+#define O2HB_DEBUG_QUORUMREGIONS	"quorum_regions"
+#define O2HB_DEBUG_FAILEDREGIONS	"failed_regions"
+#define O2HB_DEBUG_REGION_NUMBER	"num"
+#define O2HB_DEBUG_REGION_ELAPSED_TIME	"elapsed_time_in_ms"
+
 static struct dentry *o2hb_debug_dir;
 static struct dentry *o2hb_debug_livenodes;
+static struct dentry *o2hb_debug_liveregions;
+static struct dentry *o2hb_debug_quorumregions;
+static struct dentry *o2hb_debug_failedregions;
 
 static LIST_HEAD(o2hb_all_regions);
 
@@ -77,7 +118,19 @@ static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
 
 #define O2HB_DEFAULT_BLOCK_BITS       9
 
+enum o2hb_heartbeat_modes {
+	O2HB_HEARTBEAT_LOCAL		= 0,
+	O2HB_HEARTBEAT_GLOBAL,
+	O2HB_HEARTBEAT_NUM_MODES,
+};
+
+char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
+		"local",	/* O2HB_HEARTBEAT_LOCAL */
+		"global",	/* O2HB_HEARTBEAT_GLOBAL */
+};
+
 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
+unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
 
 /* Only sets a new threshold if there are no active regions.
  *
@@ -94,6 +147,22 @@ static void o2hb_dead_threshold_set(unsigned int threshold)
 	}
 }
 
+static int o2hb_global_hearbeat_mode_set(unsigned int hb_mode)
+{
+	int ret = -1;
+
+	if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
+		spin_lock(&o2hb_live_lock);
+		if (list_empty(&o2hb_all_regions)) {
+			o2hb_heartbeat_mode = hb_mode;
+			ret = 0;
+		}
+		spin_unlock(&o2hb_live_lock);
+	}
+
+	return ret;
+}
+
 struct o2hb_node_event {
 	struct list_head        hn_item;
 	enum o2hb_callback_type hn_event_type;
@@ -135,6 +204,18 @@ struct o2hb_region {
 	struct block_device	*hr_bdev;
 	struct o2hb_disk_slot	*hr_slots;
 
+	/* live node map of this region */
+	unsigned long		hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
+	unsigned int		hr_region_num;
+
+	struct dentry		*hr_debug_dir;
+	struct dentry		*hr_debug_livenodes;
+	struct dentry		*hr_debug_regnum;
+	struct dentry		*hr_debug_elapsed_time;
+	struct o2hb_debug_buf	*hr_db_livenodes;
+	struct o2hb_debug_buf	*hr_db_regnum;
+	struct o2hb_debug_buf	*hr_db_elapsed_time;
+
 	/* let the person setting up hb wait for it to return until it
 	 * has reached a 'steady' state.  This will be fixed when we have
 	 * a more complete api that doesn't lead to this sort of fragility. */
@@ -163,8 +244,19 @@ struct o2hb_bio_wait_ctxt {
 	int               wc_error;
 };
 
+static int o2hb_pop_count(void *map, int count)
+{
+	int i = -1, pop = 0;
+
+	while ((i = find_next_bit(map, count, i + 1)) < count)
+		pop++;
+	return pop;
+}
+
 static void o2hb_write_timeout(struct work_struct *work)
 {
+	int failed, quorum;
+	unsigned long flags;
 	struct o2hb_region *reg =
 		container_of(work, struct o2hb_region,
 			     hr_write_timeout_work.work);
@@ -172,6 +264,28 @@ static void o2hb_write_timeout(struct work_struct *work)
 	mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
 	     "milliseconds\n", reg->hr_dev_name,
 	     jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
+
+	if (o2hb_global_heartbeat_active()) {
+		spin_lock_irqsave(&o2hb_live_lock, flags);
+		if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
+			set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
+		failed = o2hb_pop_count(&o2hb_failed_region_bitmap,
+					O2NM_MAX_REGIONS);
+		quorum = o2hb_pop_count(&o2hb_quorum_region_bitmap,
+					O2NM_MAX_REGIONS);
+		spin_unlock_irqrestore(&o2hb_live_lock, flags);
+
+		mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
+		     quorum, failed);
+
+		/*
+		 * Fence if the number of failed regions >= half the number
+		 * of  quorum regions
+		 */
+		if ((failed << 1) < quorum)
+			return;
+	}
+
 	o2quo_disk_timeout();
 }
 
@@ -180,6 +294,11 @@ static void o2hb_arm_write_timeout(struct o2hb_region *reg)
 	mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
 	     O2HB_MAX_WRITE_TIMEOUT_MS);
 
+	if (o2hb_global_heartbeat_active()) {
+		spin_lock(&o2hb_live_lock);
+		clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
+		spin_unlock(&o2hb_live_lock);
+	}
 	cancel_delayed_work(&reg->hr_write_timeout_work);
 	reg->hr_last_timeout_start = jiffies;
 	schedule_delayed_work(&reg->hr_write_timeout_work,
@@ -513,6 +632,8 @@ static void o2hb_queue_node_event(struct o2hb_node_event *event,
 {
 	assert_spin_locked(&o2hb_live_lock);
 
+	BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
+
 	event->hn_event_type = type;
 	event->hn_node = node;
 	event->hn_node_num = node_num;
@@ -554,6 +675,35 @@ static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
 	o2nm_node_put(node);
 }
 
+static void o2hb_set_quorum_device(struct o2hb_region *reg,
+				   struct o2hb_disk_slot *slot)
+{
+	assert_spin_locked(&o2hb_live_lock);
+
+	if (!o2hb_global_heartbeat_active())
+		return;
+
+	if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
+		return;
+
+	/*
+	 * A region can be added to the quorum only when it sees all
+	 * live nodes heartbeat on it. In other words, the region has been
+	 * added to all nodes.
+	 */
+	if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
+		   sizeof(o2hb_live_node_bitmap)))
+		return;
+
+	if (slot->ds_changed_samples < O2HB_LIVE_THRESHOLD)
+		return;
+
+	printk(KERN_NOTICE "o2hb: Region %s is now a quorum device\n",
+	       config_item_name(&reg->hr_item));
+
+	set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
+}
+
 static int o2hb_check_slot(struct o2hb_region *reg,
 			   struct o2hb_disk_slot *slot)
 {
@@ -565,14 +715,22 @@ static int o2hb_check_slot(struct o2hb_region *reg,
 	u64 cputime;
 	unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
 	unsigned int slot_dead_ms;
+	int tmp;
 
 	memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
 
-	/* Is this correct? Do we assume that the node doesn't exist
-	 * if we're not configured for him? */
+	/*
+	 * If a node is no longer configured but is still in the livemap, we
+	 * may need to clear that bit from the livemap.
+	 */
 	node = o2nm_get_node_by_num(slot->ds_node_num);
-	if (!node)
-		return 0;
+	if (!node) {
+		spin_lock(&o2hb_live_lock);
+		tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
+		spin_unlock(&o2hb_live_lock);
+		if (!tmp)
+			return 0;
+	}
 
 	if (!o2hb_verify_crc(reg, hb_block)) {
 		/* all paths from here will drop o2hb_live_lock for
@@ -639,8 +797,12 @@ fire_callbacks:
 		mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
 		     slot->ds_node_num, (long long)slot->ds_last_generation);
 
+		set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
+
 		/* first on the list generates a callback */
 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
+			mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
+			     "bitmap\n", slot->ds_node_num);
 			set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 
 			o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
@@ -684,13 +846,18 @@ fire_callbacks:
 		mlog(ML_HEARTBEAT, "Node %d left my region\n",
 		     slot->ds_node_num);
 
+		clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
+
 		/* last off the live_slot generates a callback */
 		list_del_init(&slot->ds_live_item);
 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
+			mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
+			     "nodes bitmap\n", slot->ds_node_num);
 			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 
-			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
-					      slot->ds_node_num);
+			/* node can be null */
+			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
+					      node, slot->ds_node_num);
 
 			changed = 1;
 		}
@@ -706,11 +873,14 @@ fire_callbacks:
 		slot->ds_equal_samples = 0;
 	}
 out:
+	o2hb_set_quorum_device(reg, slot);
+
 	spin_unlock(&o2hb_live_lock);
 
 	o2hb_run_event_list(&event);
 
-	o2nm_node_put(node);
+	if (node)
+		o2nm_node_put(node);
 	return changed;
 }
 
@@ -737,6 +907,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
 {
 	int i, ret, highest_node, change = 0;
 	unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
+	unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	struct o2hb_bio_wait_ctxt write_wc;
 
 	ret = o2nm_configured_node_map(configured_nodes,
@@ -746,6 +917,17 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
 		return ret;
 	}
 
+	/*
+	 * If a node is not configured but is in the livemap, we still need
+	 * to read the slot so as to be able to remove it from the livemap.
+	 */
+	o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
+	i = -1;
+	while ((i = find_next_bit(live_node_bitmap,
+				  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
+		set_bit(i, configured_nodes);
+	}
+
 	highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
 	if (highest_node >= O2NM_MAX_NODES) {
 		mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
@@ -917,21 +1099,59 @@ static int o2hb_thread(void *data)
 #ifdef CONFIG_DEBUG_FS
 static int o2hb_debug_open(struct inode *inode, struct file *file)
 {
+	struct o2hb_debug_buf *db = inode->i_private;
+	struct o2hb_region *reg;
 	unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	char *buf = NULL;
 	int i = -1;
 	int out = 0;
 
+	/* max_nodes should be the largest bitmap we pass here */
+	BUG_ON(sizeof(map) < db->db_size);
+
 	buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
 	if (!buf)
 		goto bail;
 
-	o2hb_fill_node_map(map, sizeof(map));
+	switch (db->db_type) {
+	case O2HB_DB_TYPE_LIVENODES:
+	case O2HB_DB_TYPE_LIVEREGIONS:
+	case O2HB_DB_TYPE_QUORUMREGIONS:
+	case O2HB_DB_TYPE_FAILEDREGIONS:
+		spin_lock(&o2hb_live_lock);
+		memcpy(map, db->db_data, db->db_size);
+		spin_unlock(&o2hb_live_lock);
+		break;
+
+	case O2HB_DB_TYPE_REGION_LIVENODES:
+		spin_lock(&o2hb_live_lock);
+		reg = (struct o2hb_region *)db->db_data;
+		memcpy(map, reg->hr_live_node_bitmap, db->db_size);
+		spin_unlock(&o2hb_live_lock);
+		break;
+
+	case O2HB_DB_TYPE_REGION_NUMBER:
+		reg = (struct o2hb_region *)db->db_data;
+		out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
+				reg->hr_region_num);
+		goto done;
+
+	case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
+		reg = (struct o2hb_region *)db->db_data;
+		out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
+				jiffies_to_msecs(jiffies -
+						 reg->hr_last_timeout_start));
+		goto done;
+
+	default:
+		goto done;
+	}
 
-	while ((i = find_next_bit(map, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES)
+	while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
 		out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
 	out += snprintf(buf + out, PAGE_SIZE - out, "\n");
 
+done:
 	i_size_write(inode, out);
 
 	file->private_data = buf;
@@ -978,10 +1198,104 @@ static const struct file_operations o2hb_debug_fops = {
 
 void o2hb_exit(void)
 {
-	if (o2hb_debug_livenodes)
-		debugfs_remove(o2hb_debug_livenodes);
-	if (o2hb_debug_dir)
-		debugfs_remove(o2hb_debug_dir);
+	kfree(o2hb_db_livenodes);
+	kfree(o2hb_db_liveregions);
+	kfree(o2hb_db_quorumregions);
+	kfree(o2hb_db_failedregions);
+	debugfs_remove(o2hb_debug_failedregions);
+	debugfs_remove(o2hb_debug_quorumregions);
+	debugfs_remove(o2hb_debug_liveregions);
+	debugfs_remove(o2hb_debug_livenodes);
+	debugfs_remove(o2hb_debug_dir);
+}
+
+static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
+					struct o2hb_debug_buf **db, int db_len,
+					int type, int size, int len, void *data)
+{
+	*db = kmalloc(db_len, GFP_KERNEL);
+	if (!*db)
+		return NULL;
+
+	(*db)->db_type = type;
+	(*db)->db_size = size;
+	(*db)->db_len = len;
+	(*db)->db_data = data;
+
+	return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
+				   &o2hb_debug_fops);
+}
+
+static int o2hb_debug_init(void)
+{
+	int ret = -ENOMEM;
+
+	o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
+	if (!o2hb_debug_dir) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
+						 o2hb_debug_dir,
+						 &o2hb_db_livenodes,
+						 sizeof(*o2hb_db_livenodes),
+						 O2HB_DB_TYPE_LIVENODES,
+						 sizeof(o2hb_live_node_bitmap),
+						 O2NM_MAX_NODES,
+						 o2hb_live_node_bitmap);
+	if (!o2hb_debug_livenodes) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
+						   o2hb_debug_dir,
+						   &o2hb_db_liveregions,
+						   sizeof(*o2hb_db_liveregions),
+						   O2HB_DB_TYPE_LIVEREGIONS,
+						   sizeof(o2hb_live_region_bitmap),
+						   O2NM_MAX_REGIONS,
+						   o2hb_live_region_bitmap);
+	if (!o2hb_debug_liveregions) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	o2hb_debug_quorumregions =
+			o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
+					  o2hb_debug_dir,
+					  &o2hb_db_quorumregions,
+					  sizeof(*o2hb_db_quorumregions),
+					  O2HB_DB_TYPE_QUORUMREGIONS,
+					  sizeof(o2hb_quorum_region_bitmap),
+					  O2NM_MAX_REGIONS,
+					  o2hb_quorum_region_bitmap);
+	if (!o2hb_debug_quorumregions) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	o2hb_debug_failedregions =
+			o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
+					  o2hb_debug_dir,
+					  &o2hb_db_failedregions,
+					  sizeof(*o2hb_db_failedregions),
+					  O2HB_DB_TYPE_FAILEDREGIONS,
+					  sizeof(o2hb_failed_region_bitmap),
+					  O2NM_MAX_REGIONS,
+					  o2hb_failed_region_bitmap);
+	if (!o2hb_debug_failedregions) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	ret = 0;
+bail:
+	if (ret)
+		o2hb_exit();
+
+	return ret;
 }
 
 int o2hb_init(void)
@@ -997,24 +1311,12 @@ int o2hb_init(void)
 	INIT_LIST_HEAD(&o2hb_node_events);
 
 	memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
+	memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
+	memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
+	memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
+	memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
 
-	o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
-	if (!o2hb_debug_dir) {
-		mlog_errno(-ENOMEM);
-		return -ENOMEM;
-	}
-
-	o2hb_debug_livenodes = debugfs_create_file(O2HB_DEBUG_LIVENODES,
-						   S_IFREG|S_IRUSR,
-						   o2hb_debug_dir, NULL,
-						   &o2hb_debug_fops);
-	if (!o2hb_debug_livenodes) {
-		mlog_errno(-ENOMEM);
-		debugfs_remove(o2hb_debug_dir);
-		return -ENOMEM;
-	}
-
-	return 0;
+	return o2hb_debug_init();
 }
 
 /* if we're already in a callback then we're already serialized by the sem */
@@ -1078,6 +1380,13 @@ static void o2hb_region_release(struct config_item *item)
 	if (reg->hr_slots)
 		kfree(reg->hr_slots);
 
+	kfree(reg->hr_db_regnum);
+	kfree(reg->hr_db_livenodes);
+	debugfs_remove(reg->hr_debug_livenodes);
+	debugfs_remove(reg->hr_debug_regnum);
+	debugfs_remove(reg->hr_debug_elapsed_time);
+	debugfs_remove(reg->hr_debug_dir);
+
 	spin_lock(&o2hb_live_lock);
 	list_del(&reg->hr_all_item);
 	spin_unlock(&o2hb_live_lock);
@@ -1441,6 +1750,8 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
 	/* Ok, we were woken.  Make sure it wasn't by drop_item() */
 	spin_lock(&o2hb_live_lock);
 	hb_task = reg->hr_task;
+	if (o2hb_global_heartbeat_active())
+		set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
 	spin_unlock(&o2hb_live_lock);
 
 	if (hb_task)
@@ -1448,6 +1759,10 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
 	else
 		ret = -EIO;
 
+	if (hb_task && o2hb_global_heartbeat_active())
+		printk(KERN_NOTICE "o2hb: Heartbeat started on region %s\n",
+		       config_item_name(&reg->hr_item));
+
 out:
 	if (filp)
 		fput(filp);
@@ -1586,21 +1901,94 @@ static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group
 		: NULL;
 }
 
+static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
+{
+	int ret = -ENOMEM;
+
+	reg->hr_debug_dir =
+		debugfs_create_dir(config_item_name(&reg->hr_item), dir);
+	if (!reg->hr_debug_dir) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	reg->hr_debug_livenodes =
+			o2hb_debug_create(O2HB_DEBUG_LIVENODES,
+					  reg->hr_debug_dir,
+					  &(reg->hr_db_livenodes),
+					  sizeof(*(reg->hr_db_livenodes)),
+					  O2HB_DB_TYPE_REGION_LIVENODES,
+					  sizeof(reg->hr_live_node_bitmap),
+					  O2NM_MAX_NODES, reg);
+	if (!reg->hr_debug_livenodes) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	reg->hr_debug_regnum =
+			o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
+					  reg->hr_debug_dir,
+					  &(reg->hr_db_regnum),
+					  sizeof(*(reg->hr_db_regnum)),
+					  O2HB_DB_TYPE_REGION_NUMBER,
+					  0, O2NM_MAX_NODES, reg);
+	if (!reg->hr_debug_regnum) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	reg->hr_debug_elapsed_time =
+			o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
+					  reg->hr_debug_dir,
+					  &(reg->hr_db_elapsed_time),
+					  sizeof(*(reg->hr_db_elapsed_time)),
+					  O2HB_DB_TYPE_REGION_ELAPSED_TIME,
+					  0, 0, reg);
+	if (!reg->hr_debug_elapsed_time) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	ret = 0;
+bail:
+	return ret;
+}
+
 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
 							  const char *name)
 {
 	struct o2hb_region *reg = NULL;
+	int ret;
 
 	reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
 	if (reg == NULL)
 		return ERR_PTR(-ENOMEM);
 
-	config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
+	if (strlen(name) > O2HB_MAX_REGION_NAME_LEN)
+		return ERR_PTR(-ENAMETOOLONG);
 
 	spin_lock(&o2hb_live_lock);
+	reg->hr_region_num = 0;
+	if (o2hb_global_heartbeat_active()) {
+		reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
+							 O2NM_MAX_REGIONS);
+		if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
+			spin_unlock(&o2hb_live_lock);
+			return ERR_PTR(-EFBIG);
+		}
+		set_bit(reg->hr_region_num, o2hb_region_bitmap);
+	}
 	list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
 	spin_unlock(&o2hb_live_lock);
 
+	config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
+
+	ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
+	if (ret) {
+		config_item_put(&reg->hr_item);
+		return ERR_PTR(ret);
+	}
+
 	return &reg->hr_item;
 }
 
@@ -1612,6 +2000,10 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group,
 
 	/* stop the thread when the user removes the region dir */
 	spin_lock(&o2hb_live_lock);
+	if (o2hb_global_heartbeat_active()) {
+		clear_bit(reg->hr_region_num, o2hb_region_bitmap);
+		clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
+	}
 	hb_task = reg->hr_task;
 	reg->hr_task = NULL;
 	spin_unlock(&o2hb_live_lock);
@@ -1628,6 +2020,9 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group,
 		wake_up(&o2hb_steady_queue);
 	}
 
+	if (o2hb_global_heartbeat_active())
+		printk(KERN_NOTICE "o2hb: Heartbeat stopped on region %s\n",
+		       config_item_name(&reg->hr_item));
 	config_item_put(item);
 }
 
@@ -1688,6 +2083,41 @@ static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group
 	return count;
 }
 
+static
+ssize_t o2hb_heartbeat_group_mode_show(struct o2hb_heartbeat_group *group,
+				       char *page)
+{
+	return sprintf(page, "%s\n",
+		       o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
+}
+
+static
+ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group,
+					const char *page, size_t count)
+{
+	unsigned int i;
+	int ret;
+	size_t len;
+
+	len = (page[count - 1] == '\n') ? count - 1 : count;
+	if (!len)
+		return -EINVAL;
+
+	for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
+		if (strnicmp(page, o2hb_heartbeat_mode_desc[i], len))
+			continue;
+
+		ret = o2hb_global_hearbeat_mode_set(i);
+		if (!ret)
+			printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
+			       o2hb_heartbeat_mode_desc[i]);
+		return count;
+	}
+
+	return -EINVAL;
+
+}
+
 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
 	.attr	= { .ca_owner = THIS_MODULE,
 		    .ca_name = "dead_threshold",
@@ -1696,8 +2126,17 @@ static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold
 	.store	= o2hb_heartbeat_group_threshold_store,
 };
 
+static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_mode = {
+	.attr   = { .ca_owner = THIS_MODULE,
+		.ca_name = "mode",
+		.ca_mode = S_IRUGO | S_IWUSR },
+	.show   = o2hb_heartbeat_group_mode_show,
+	.store  = o2hb_heartbeat_group_mode_store,
+};
+
 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
 	&o2hb_heartbeat_group_attr_threshold.attr,
+	&o2hb_heartbeat_group_attr_mode.attr,
 	NULL,
 };
 
@@ -1963,3 +2402,34 @@ void o2hb_stop_all_regions(void)
 	spin_unlock(&o2hb_live_lock);
 }
 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
+
+int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
+{
+	struct o2hb_region *reg;
+	int numregs = 0;
+	char *p;
+
+	spin_lock(&o2hb_live_lock);
+
+	p = region_uuids;
+	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
+		mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
+		if (numregs < max_regions) {
+			memcpy(p, config_item_name(&reg->hr_item),
+			       O2HB_MAX_REGION_NAME_LEN);
+			p += O2HB_MAX_REGION_NAME_LEN;
+		}
+		numregs++;
+	}
+
+	spin_unlock(&o2hb_live_lock);
+
+	return numregs;
+}
+EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
+
+int o2hb_global_heartbeat_active(void)
+{
+	return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
+}
+EXPORT_SYMBOL(o2hb_global_heartbeat_active);

+ 4 - 0
fs/ocfs2/cluster/heartbeat.h

@@ -31,6 +31,8 @@
 
 #define O2HB_REGION_TIMEOUT_MS		2000
 
+#define O2HB_MAX_REGION_NAME_LEN	32
+
 /* number of changes to be seen as live */
 #define O2HB_LIVE_THRESHOLD	   2
 /* number of equal samples to be seen as dead */
@@ -81,5 +83,7 @@ int o2hb_check_node_heartbeating(u8 node_num);
 int o2hb_check_node_heartbeating_from_callback(u8 node_num);
 int o2hb_check_local_node_heartbeating(void);
 void o2hb_stop_all_regions(void);
+int o2hb_get_all_regions(char *region_uuids, u8 numregions);
+int o2hb_global_heartbeat_active(void);
 
 #endif /* O2CLUSTER_HEARTBEAT_H */

+ 2 - 1
fs/ocfs2/cluster/masklog.h

@@ -119,7 +119,8 @@
 #define ML_ERROR	0x0000000100000000ULL /* sent to KERN_ERR */
 #define ML_NOTICE	0x0000000200000000ULL /* setn to KERN_NOTICE */
 #define ML_KTHREAD	0x0000000400000000ULL /* kernel thread activity */
-#define	ML_RESERVATIONS	0x0000000800000000ULL /* ocfs2 alloc reservations */
+#define ML_RESERVATIONS	0x0000000800000000ULL /* ocfs2 alloc reservations */
+#define ML_CLUSTER	0x0000001000000000ULL /* cluster stack */
 
 #define MLOG_INITIAL_AND_MASK (ML_ERROR|ML_NOTICE)
 #define MLOG_INITIAL_NOT_MASK (ML_ENTRY|ML_EXIT)

+ 5 - 0
fs/ocfs2/cluster/nodemanager.c

@@ -711,6 +711,8 @@ static struct config_item *o2nm_node_group_make_item(struct config_group *group,
 	config_item_init_type_name(&node->nd_item, name, &o2nm_node_type);
 	spin_lock_init(&node->nd_lock);
 
+	mlog(ML_CLUSTER, "o2nm: Registering node %s\n", name);
+
 	return &node->nd_item;
 }
 
@@ -744,6 +746,9 @@ static void o2nm_node_group_drop_item(struct config_group *group,
 	}
 	write_unlock(&cluster->cl_nodes_lock);
 
+	mlog(ML_CLUSTER, "o2nm: Unregistered node %s\n",
+	     config_item_name(&node->nd_item));
+
 	config_item_put(item);
 }
 

+ 6 - 0
fs/ocfs2/cluster/ocfs2_nodemanager.h

@@ -36,4 +36,10 @@
 /* host name, group name, cluster name all 64 bytes */
 #define O2NM_MAX_NAME_LEN        64    // __NEW_UTS_LEN
 
+/*
+ * Maximum number of global heartbeat regions allowed.
+ * **CAUTION**  Changing this number will break dlm compatibility.
+ */
+#define O2NM_MAX_REGIONS	32
+
 #endif /* _OCFS2_NODEMANAGER_H */

+ 5 - 0
fs/ocfs2/cluster/tcp.c

@@ -1696,6 +1696,9 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
 {
 	o2quo_hb_down(node_num);
 
+	if (!node)
+		return;
+
 	if (node_num != o2nm_this_node())
 		o2net_disconnect_node(node);
 
@@ -1709,6 +1712,8 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
 
 	o2quo_hb_up(node_num);
 
+	BUG_ON(!node);
+
 	/* ensure an immediate connect attempt */
 	nn->nn_last_connect_attempt = jiffies -
 		(msecs_to_jiffies(o2net_reconnect_delay()) + 1);

+ 29 - 4
fs/ocfs2/dcache.c

@@ -40,6 +40,14 @@
 #include "inode.h"
 #include "super.h"
 
+void ocfs2_dentry_attach_gen(struct dentry *dentry)
+{
+	unsigned long gen =
+		OCFS2_I(dentry->d_parent->d_inode)->ip_dir_lock_gen;
+	BUG_ON(dentry->d_inode);
+	dentry->d_fsdata = (void *)gen;
+}
+
 
 static int ocfs2_dentry_revalidate(struct dentry *dentry,
 				   struct nameidata *nd)
@@ -51,11 +59,20 @@ static int ocfs2_dentry_revalidate(struct dentry *dentry,
 	mlog_entry("(0x%p, '%.*s')\n", dentry,
 		   dentry->d_name.len, dentry->d_name.name);
 
-	/* Never trust a negative dentry - force a new lookup. */
+	/* For a negative dentry -
+	 * check the generation number of the parent and compare with the
+	 * one stored in the inode.
+	 */
 	if (inode == NULL) {
-		mlog(0, "negative dentry: %.*s\n", dentry->d_name.len,
-		     dentry->d_name.name);
-		goto bail;
+		unsigned long gen = (unsigned long) dentry->d_fsdata;
+		unsigned long pgen =
+			OCFS2_I(dentry->d_parent->d_inode)->ip_dir_lock_gen;
+		mlog(0, "negative dentry: %.*s parent gen: %lu "
+			"dentry gen: %lu\n",
+			dentry->d_name.len, dentry->d_name.name, pgen, gen);
+		if (gen != pgen)
+			goto bail;
+		goto valid;
 	}
 
 	BUG_ON(!osb);
@@ -96,6 +113,7 @@ static int ocfs2_dentry_revalidate(struct dentry *dentry,
 		goto bail;
 	}
 
+valid:
 	ret = 1;
 
 bail:
@@ -227,6 +245,12 @@ int ocfs2_dentry_attach_lock(struct dentry *dentry,
 	if (!inode)
 		return 0;
 
+	if (!dentry->d_inode && dentry->d_fsdata) {
+		/* Converting a negative dentry to positive
+		   Clear dentry->d_fsdata */
+		dentry->d_fsdata = dl = NULL;
+	}
+
 	if (dl) {
 		mlog_bug_on_msg(dl->dl_parent_blkno != parent_blkno,
 				" \"%.*s\": old parent: %llu, new: %llu\n",
@@ -452,6 +476,7 @@ static void ocfs2_dentry_iput(struct dentry *dentry, struct inode *inode)
 
 out:
 	iput(inode);
+	ocfs2_dentry_attach_gen(dentry);
 }
 
 /*

+ 1 - 0
fs/ocfs2/dcache.h

@@ -64,5 +64,6 @@ void ocfs2_dentry_move(struct dentry *dentry, struct dentry *target,
 		       struct inode *old_dir, struct inode *new_dir);
 
 extern spinlock_t dentry_attach_lock;
+void ocfs2_dentry_attach_gen(struct dentry *dentry);
 
 #endif /* OCFS2_DCACHE_H */

+ 28 - 1
fs/ocfs2/dlm/dlmcommon.h

@@ -445,7 +445,9 @@ enum {
 	DLM_LOCK_REQUEST_MSG,	 /* 515 */
 	DLM_RECO_DATA_DONE_MSG,	 /* 516 */
 	DLM_BEGIN_RECO_MSG,	 /* 517 */
-	DLM_FINALIZE_RECO_MSG	 /* 518 */
+	DLM_FINALIZE_RECO_MSG,	 /* 518 */
+	DLM_QUERY_REGION,	 /* 519 */
+	DLM_QUERY_NODEINFO,	 /* 520 */
 };
 
 struct dlm_reco_node_data
@@ -727,6 +729,31 @@ struct dlm_cancel_join
 	u8 domain[O2NM_MAX_NAME_LEN];
 };
 
+struct dlm_query_region {
+	u8 qr_node;
+	u8 qr_numregions;
+	u8 qr_namelen;
+	u8 pad1;
+	u8 qr_domain[O2NM_MAX_NAME_LEN];
+	u8 qr_regions[O2HB_MAX_REGION_NAME_LEN * O2NM_MAX_REGIONS];
+};
+
+struct dlm_node_info {
+	u8 ni_nodenum;
+	u8 pad1;
+	u16 ni_ipv4_port;
+	u32 ni_ipv4_address;
+};
+
+struct dlm_query_nodeinfo {
+	u8 qn_nodenum;
+	u8 qn_numnodes;
+	u8 qn_namelen;
+	u8 pad1;
+	u8 qn_domain[O2NM_MAX_NAME_LEN];
+	struct dlm_node_info qn_nodes[O2NM_MAX_NODES];
+};
+
 struct dlm_exit_domain
 {
 	u8 node_idx;

+ 7 - 5
fs/ocfs2/dlm/dlmdebug.c

@@ -493,7 +493,7 @@ static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db)
 	struct hlist_head *bucket;
 	struct hlist_node *list;
 	int i, out = 0;
-	unsigned long total = 0, longest = 0, bktcnt;
+	unsigned long total = 0, longest = 0, bucket_count = 0;
 
 	out += snprintf(db->buf + out, db->len - out,
 			"Dumping MLEs for Domain: %s\n", dlm->name);
@@ -505,13 +505,13 @@ static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db)
 			mle = hlist_entry(list, struct dlm_master_list_entry,
 					  master_hash_node);
 			++total;
-			++bktcnt;
+			++bucket_count;
 			if (db->len - out < 200)
 				continue;
 			out += dump_mle(mle, db->buf + out, db->len - out);
 		}
-		longest = max(longest, bktcnt);
-		bktcnt = 0;
+		longest = max(longest, bucket_count);
+		bucket_count = 0;
 	}
 	spin_unlock(&dlm->master_lock);
 
@@ -782,7 +782,9 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db)
 
 	/* Domain: xxxxxxxxxx  Key: 0xdfbac769 */
 	out += snprintf(db->buf + out, db->len - out,
-			"Domain: %s  Key: 0x%08x\n", dlm->name, dlm->key);
+			"Domain: %s  Key: 0x%08x  Protocol: %d.%d\n",
+			dlm->name, dlm->key, dlm->dlm_locking_proto.pv_major,
+			dlm->dlm_locking_proto.pv_minor);
 
 	/* Thread Pid: xxx  Node: xxx  State: xxxxx */
 	out += snprintf(db->buf + out, db->len - out,

+ 399 - 1
fs/ocfs2/dlm/dlmdomain.c

@@ -128,10 +128,14 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
  * will have a negotiated version with the same major number and a minor
  * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
  * be used to determine what a running domain is actually using.
+ *
+ * New in version 1.1:
+ *	- Message DLM_QUERY_REGION added to support global heartbeat
+ *	- Message DLM_QUERY_NODEINFO added to allow online node removes
  */
 static const struct dlm_protocol_version dlm_protocol = {
 	.pv_major = 1,
-	.pv_minor = 0,
+	.pv_minor = 1,
 };
 
 #define DLM_DOMAIN_BACKOFF_MS 200
@@ -142,6 +146,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 				     void **ret_data);
 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 				   void **ret_data);
+static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
+				    void *data, void **ret_data);
 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 				   void **ret_data);
 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
@@ -921,6 +927,370 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
 	return 0;
 }
 
+static int dlm_match_regions(struct dlm_ctxt *dlm,
+			     struct dlm_query_region *qr)
+{
+	char *local = NULL, *remote = qr->qr_regions;
+	char *l, *r;
+	int localnr, i, j, foundit;
+	int status = 0;
+
+	if (!o2hb_global_heartbeat_active()) {
+		if (qr->qr_numregions) {
+			mlog(ML_ERROR, "Domain %s: Joining node %d has global "
+			     "heartbeat enabled but local node %d does not\n",
+			     qr->qr_domain, qr->qr_node, dlm->node_num);
+			status = -EINVAL;
+		}
+		goto bail;
+	}
+
+	if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
+		mlog(ML_ERROR, "Domain %s: Local node %d has global "
+		     "heartbeat enabled but joining node %d does not\n",
+		     qr->qr_domain, dlm->node_num, qr->qr_node);
+		status = -EINVAL;
+		goto bail;
+	}
+
+	r = remote;
+	for (i = 0; i < qr->qr_numregions; ++i) {
+		mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
+		r += O2HB_MAX_REGION_NAME_LEN;
+	}
+
+	local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
+	if (!local) {
+		status = -ENOMEM;
+		goto bail;
+	}
+
+	localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS);
+
+	/* compare local regions with remote */
+	l = local;
+	for (i = 0; i < localnr; ++i) {
+		foundit = 0;
+		r = remote;
+		for (j = 0; j <= qr->qr_numregions; ++j) {
+			if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
+				foundit = 1;
+				break;
+			}
+			r += O2HB_MAX_REGION_NAME_LEN;
+		}
+		if (!foundit) {
+			status = -EINVAL;
+			mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
+			     "in local node %d but not in joining node %d\n",
+			     qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
+			     dlm->node_num, qr->qr_node);
+			goto bail;
+		}
+		l += O2HB_MAX_REGION_NAME_LEN;
+	}
+
+	/* compare remote with local regions */
+	r = remote;
+	for (i = 0; i < qr->qr_numregions; ++i) {
+		foundit = 0;
+		l = local;
+		for (j = 0; j < localnr; ++j) {
+			if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
+				foundit = 1;
+				break;
+			}
+			l += O2HB_MAX_REGION_NAME_LEN;
+		}
+		if (!foundit) {
+			status = -EINVAL;
+			mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
+			     "in joining node %d but not in local node %d\n",
+			     qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
+			     qr->qr_node, dlm->node_num);
+			goto bail;
+		}
+		r += O2HB_MAX_REGION_NAME_LEN;
+	}
+
+bail:
+	kfree(local);
+
+	return status;
+}
+
+static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
+{
+	struct dlm_query_region *qr = NULL;
+	int status, ret = 0, i;
+	char *p;
+
+	if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
+		goto bail;
+
+	qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
+	if (!qr) {
+		ret = -ENOMEM;
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	qr->qr_node = dlm->node_num;
+	qr->qr_namelen = strlen(dlm->name);
+	memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
+	/* if local hb, the numregions will be zero */
+	if (o2hb_global_heartbeat_active())
+		qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
+							 O2NM_MAX_REGIONS);
+
+	p = qr->qr_regions;
+	for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
+		mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
+
+	i = -1;
+	while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
+				  i + 1)) < O2NM_MAX_NODES) {
+		if (i == dlm->node_num)
+			continue;
+
+		mlog(0, "Sending regions to node %d\n", i);
+
+		ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
+					 sizeof(struct dlm_query_region),
+					 i, &status);
+		if (ret >= 0)
+			ret = status;
+		if (ret) {
+			mlog(ML_ERROR, "Region mismatch %d, node %d\n",
+			     ret, i);
+			break;
+		}
+	}
+
+bail:
+	kfree(qr);
+	return ret;
+}
+
+static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
+				    void *data, void **ret_data)
+{
+	struct dlm_query_region *qr;
+	struct dlm_ctxt *dlm = NULL;
+	int status = 0;
+	int locked = 0;
+
+	qr = (struct dlm_query_region *) msg->buf;
+
+	mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
+	     qr->qr_domain);
+
+	status = -EINVAL;
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
+	if (!dlm) {
+		mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
+		     "before join domain\n", qr->qr_node, qr->qr_domain);
+		goto bail;
+	}
+
+	spin_lock(&dlm->spinlock);
+	locked = 1;
+	if (dlm->joining_node != qr->qr_node) {
+		mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
+		     "but joining node is %d\n", qr->qr_node, qr->qr_domain,
+		     dlm->joining_node);
+		goto bail;
+	}
+
+	/* Support for global heartbeat was added in 1.1 */
+	if (dlm->dlm_locking_proto.pv_major == 1 &&
+	    dlm->dlm_locking_proto.pv_minor == 0) {
+		mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
+		     "but active dlm protocol is %d.%d\n", qr->qr_node,
+		     qr->qr_domain, dlm->dlm_locking_proto.pv_major,
+		     dlm->dlm_locking_proto.pv_minor);
+		goto bail;
+	}
+
+	status = dlm_match_regions(dlm, qr);
+
+bail:
+	if (locked)
+		spin_unlock(&dlm->spinlock);
+	spin_unlock(&dlm_domain_lock);
+
+	return status;
+}
+
+static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
+{
+	struct o2nm_node *local;
+	struct dlm_node_info *remote;
+	int i, j;
+	int status = 0;
+
+	for (j = 0; j < qn->qn_numnodes; ++j)
+		mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
+		     &(qn->qn_nodes[j].ni_ipv4_address),
+		     ntohs(qn->qn_nodes[j].ni_ipv4_port));
+
+	for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
+		local = o2nm_get_node_by_num(i);
+		remote = NULL;
+		for (j = 0; j < qn->qn_numnodes; ++j) {
+			if (qn->qn_nodes[j].ni_nodenum == i) {
+				remote = &(qn->qn_nodes[j]);
+				break;
+			}
+		}
+
+		if (!local && !remote)
+			continue;
+
+		if ((local && !remote) || (!local && remote))
+			status = -EINVAL;
+
+		if (!status &&
+		    ((remote->ni_nodenum != local->nd_num) ||
+		     (remote->ni_ipv4_port != local->nd_ipv4_port) ||
+		     (remote->ni_ipv4_address != local->nd_ipv4_address)))
+			status = -EINVAL;
+
+		if (status) {
+			if (remote && !local)
+				mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
+				     "registered in joining node %d but not in "
+				     "local node %d\n", qn->qn_domain,
+				     remote->ni_nodenum,
+				     &(remote->ni_ipv4_address),
+				     ntohs(remote->ni_ipv4_port),
+				     qn->qn_nodenum, dlm->node_num);
+			if (local && !remote)
+				mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
+				     "registered in local node %d but not in "
+				     "joining node %d\n", qn->qn_domain,
+				     local->nd_num, &(local->nd_ipv4_address),
+				     ntohs(local->nd_ipv4_port),
+				     dlm->node_num, qn->qn_nodenum);
+			BUG_ON((!local && !remote));
+		}
+
+		if (local)
+			o2nm_node_put(local);
+	}
+
+	return status;
+}
+
+static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
+{
+	struct dlm_query_nodeinfo *qn = NULL;
+	struct o2nm_node *node;
+	int ret = 0, status, count, i;
+
+	if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
+		goto bail;
+
+	qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
+	if (!qn) {
+		ret = -ENOMEM;
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
+		node = o2nm_get_node_by_num(i);
+		if (!node)
+			continue;
+		qn->qn_nodes[count].ni_nodenum = node->nd_num;
+		qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
+		qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
+		mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
+		     &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
+		++count;
+		o2nm_node_put(node);
+	}
+
+	qn->qn_nodenum = dlm->node_num;
+	qn->qn_numnodes = count;
+	qn->qn_namelen = strlen(dlm->name);
+	memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
+
+	i = -1;
+	while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
+				  i + 1)) < O2NM_MAX_NODES) {
+		if (i == dlm->node_num)
+			continue;
+
+		mlog(0, "Sending nodeinfo to node %d\n", i);
+
+		ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
+					 qn, sizeof(struct dlm_query_nodeinfo),
+					 i, &status);
+		if (ret >= 0)
+			ret = status;
+		if (ret) {
+			mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
+			break;
+		}
+	}
+
+bail:
+	kfree(qn);
+	return ret;
+}
+
+static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
+				      void *data, void **ret_data)
+{
+	struct dlm_query_nodeinfo *qn;
+	struct dlm_ctxt *dlm = NULL;
+	int locked = 0, status = -EINVAL;
+
+	qn = (struct dlm_query_nodeinfo *) msg->buf;
+
+	mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
+	     qn->qn_domain);
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
+	if (!dlm) {
+		mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
+		     "join domain\n", qn->qn_nodenum, qn->qn_domain);
+		goto bail;
+	}
+
+	spin_lock(&dlm->spinlock);
+	locked = 1;
+	if (dlm->joining_node != qn->qn_nodenum) {
+		mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
+		     "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
+		     dlm->joining_node);
+		goto bail;
+	}
+
+	/* Support for node query was added in 1.1 */
+	if (dlm->dlm_locking_proto.pv_major == 1 &&
+	    dlm->dlm_locking_proto.pv_minor == 0) {
+		mlog(ML_ERROR, "Node %d queried nodes on domain %s "
+		     "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
+		     qn->qn_domain, dlm->dlm_locking_proto.pv_major,
+		     dlm->dlm_locking_proto.pv_minor);
+		goto bail;
+	}
+
+	status = dlm_match_nodes(dlm, qn);
+
+bail:
+	if (locked)
+		spin_unlock(&dlm->spinlock);
+	spin_unlock(&dlm_domain_lock);
+
+	return status;
+}
+
 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 				   void **ret_data)
 {
@@ -1241,6 +1611,20 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
 	set_bit(dlm->node_num, dlm->domain_map);
 	spin_unlock(&dlm->spinlock);
 
+	/* Support for global heartbeat and node info was added in 1.1 */
+	if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
+		status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
+		if (status) {
+			mlog_errno(status);
+			goto bail;
+		}
+		status = dlm_send_regions(dlm, ctxt->yes_resp_map);
+		if (status) {
+			mlog_errno(status);
+			goto bail;
+		}
+	}
+
 	dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
 
 	/* Joined state *must* be set before the joining node
@@ -1807,7 +2191,21 @@ static int dlm_register_net_handlers(void)
 					sizeof(struct dlm_cancel_join),
 					dlm_cancel_join_handler,
 					NULL, NULL, &dlm_join_handlers);
+	if (status)
+		goto bail;
+
+	status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
+					sizeof(struct dlm_query_region),
+					dlm_query_region_handler,
+					NULL, NULL, &dlm_join_handlers);
 
+	if (status)
+		goto bail;
+
+	status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
+					sizeof(struct dlm_query_nodeinfo),
+					dlm_query_nodeinfo_handler,
+					NULL, NULL, &dlm_join_handlers);
 bail:
 	if (status < 0)
 		dlm_unregister_net_handlers();

+ 8 - 0
fs/ocfs2/dlmglue.c

@@ -3635,10 +3635,18 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
 {
 	struct inode *inode;
 	struct address_space *mapping;
+	struct ocfs2_inode_info *oi;
 
        	inode = ocfs2_lock_res_inode(lockres);
 	mapping = inode->i_mapping;
 
+	if (S_ISDIR(inode->i_mode)) {
+		oi = OCFS2_I(inode);
+		oi->ip_dir_lock_gen++;
+		mlog(0, "generation: %u\n", oi->ip_dir_lock_gen);
+		goto out;
+	}
+
 	if (!S_ISREG(inode->i_mode))
 		goto out;
 

+ 40 - 33
fs/ocfs2/file.c

@@ -64,12 +64,6 @@
 
 #include "buffer_head_io.h"
 
-static int ocfs2_sync_inode(struct inode *inode)
-{
-	filemap_fdatawrite(inode->i_mapping);
-	return sync_mapping_buffers(inode->i_mapping);
-}
-
 static int ocfs2_init_file_private(struct inode *inode, struct file *file)
 {
 	struct ocfs2_file_private *fp;
@@ -180,16 +174,12 @@ static int ocfs2_sync_file(struct file *file, int datasync)
 {
 	int err = 0;
 	journal_t *journal;
-	struct dentry *dentry = file->f_path.dentry;
 	struct inode *inode = file->f_mapping->host;
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
-	mlog_entry("(0x%p, 0x%p, %d, '%.*s')\n", file, dentry, datasync,
-		   dentry->d_name.len, dentry->d_name.name);
-
-	err = ocfs2_sync_inode(dentry->d_inode);
-	if (err)
-		goto bail;
+	mlog_entry("(0x%p, %d, 0x%p, '%.*s')\n", file, datasync,
+		   file->f_path.dentry, file->f_path.dentry->d_name.len,
+		   file->f_path.dentry->d_name.name);
 
 	if (datasync && !(inode->i_state & I_DIRTY_DATASYNC)) {
 		/*
@@ -370,7 +360,7 @@ static int ocfs2_cow_file_pos(struct inode *inode,
 	if (!(ext_flags & OCFS2_EXT_REFCOUNTED))
 		goto out;
 
-	return ocfs2_refcount_cow(inode, fe_bh, cpos, 1, cpos+1);
+	return ocfs2_refcount_cow(inode, NULL, fe_bh, cpos, 1, cpos+1);
 
 out:
 	return status;
@@ -913,8 +903,8 @@ static int ocfs2_zero_extend_get_range(struct inode *inode,
 		zero_clusters = last_cpos - zero_cpos;
 
 	if (needs_cow) {
-		rc = ocfs2_refcount_cow(inode, di_bh, zero_cpos, zero_clusters,
-					UINT_MAX);
+		rc = ocfs2_refcount_cow(inode, NULL, di_bh, zero_cpos,
+					zero_clusters, UINT_MAX);
 		if (rc) {
 			mlog_errno(rc);
 			goto out;
@@ -2062,6 +2052,7 @@ out:
 }
 
 static int ocfs2_prepare_inode_for_refcount(struct inode *inode,
+					    struct file *file,
 					    loff_t pos, size_t count,
 					    int *meta_level)
 {
@@ -2079,7 +2070,7 @@ static int ocfs2_prepare_inode_for_refcount(struct inode *inode,
 
 	*meta_level = 1;
 
-	ret = ocfs2_refcount_cow(inode, di_bh, cpos, clusters, UINT_MAX);
+	ret = ocfs2_refcount_cow(inode, file, di_bh, cpos, clusters, UINT_MAX);
 	if (ret)
 		mlog_errno(ret);
 out:
@@ -2087,7 +2078,7 @@ out:
 	return ret;
 }
 
-static int ocfs2_prepare_inode_for_write(struct dentry *dentry,
+static int ocfs2_prepare_inode_for_write(struct file *file,
 					 loff_t *ppos,
 					 size_t count,
 					 int appending,
@@ -2095,6 +2086,7 @@ static int ocfs2_prepare_inode_for_write(struct dentry *dentry,
 					 int *has_refcount)
 {
 	int ret = 0, meta_level = 0;
+	struct dentry *dentry = file->f_path.dentry;
 	struct inode *inode = dentry->d_inode;
 	loff_t saved_pos, end;
 
@@ -2150,6 +2142,7 @@ static int ocfs2_prepare_inode_for_write(struct dentry *dentry,
 			meta_level = -1;
 
 			ret = ocfs2_prepare_inode_for_refcount(inode,
+							       file,
 							       saved_pos,
 							       count,
 							       &meta_level);
@@ -2232,6 +2225,8 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_path.dentry->d_inode;
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+	int full_coherency = !(osb->s_mount_opt &
+			       OCFS2_MOUNT_COHERENCY_BUFFERED);
 
 	mlog_entry("(0x%p, %u, '%.*s')\n", file,
 		   (unsigned int)nr_segs,
@@ -2255,16 +2250,39 @@ relock:
 		have_alloc_sem = 1;
 	}
 
-	/* concurrent O_DIRECT writes are allowed */
-	rw_level = !direct_io;
+	/*
+	 * Concurrent O_DIRECT writes are allowed with
+	 * mount_option "coherency=buffered".
+	 */
+	rw_level = (!direct_io || full_coherency);
+
 	ret = ocfs2_rw_lock(inode, rw_level);
 	if (ret < 0) {
 		mlog_errno(ret);
 		goto out_sems;
 	}
 
+	/*
+	 * O_DIRECT writes with "coherency=full" need to take EX cluster
+	 * inode_lock to guarantee coherency.
+	 */
+	if (direct_io && full_coherency) {
+		/*
+		 * We need to take and drop the inode lock to force
+		 * other nodes to drop their caches.  Buffered I/O
+		 * already does this in write_begin().
+		 */
+		ret = ocfs2_inode_lock(inode, NULL, 1);
+		if (ret < 0) {
+			mlog_errno(ret);
+			goto out_sems;
+		}
+
+		ocfs2_inode_unlock(inode, 1);
+	}
+
 	can_do_direct = direct_io;
-	ret = ocfs2_prepare_inode_for_write(file->f_path.dentry, ppos,
+	ret = ocfs2_prepare_inode_for_write(file, ppos,
 					    iocb->ki_left, appending,
 					    &can_do_direct, &has_refcount);
 	if (ret < 0) {
@@ -2312,17 +2330,6 @@ relock:
 		written = generic_file_direct_write(iocb, iov, &nr_segs, *ppos,
 						    ppos, count, ocount);
 		if (written < 0) {
-			/*
-			 * direct write may have instantiated a few
-			 * blocks outside i_size. Trim these off again.
-			 * Don't need i_size_read because we hold i_mutex.
-			 *
-			 * XXX(truncate): this looks buggy because ocfs2 did not
-			 * actually implement ->truncate.  Take a look at
-			 * the new truncate sequence and update this accordingly
-			 */
-			if (*ppos + count > inode->i_size)
-				truncate_setsize(inode, inode->i_size);
 			ret = written;
 			goto out_dio;
 		}
@@ -2394,7 +2401,7 @@ static int ocfs2_splice_to_file(struct pipe_inode_info *pipe,
 {
 	int ret;
 
-	ret = ocfs2_prepare_inode_for_write(out->f_path.dentry,	&sd->pos,
+	ret = ocfs2_prepare_inode_for_write(out, &sd->pos,
 					    sd->total_len, 0, NULL, NULL);
 	if (ret < 0) {
 		mlog_errno(ret);

+ 1 - 0
fs/ocfs2/inode.c

@@ -335,6 +335,7 @@ void ocfs2_populate_inode(struct inode *inode, struct ocfs2_dinode *fe,
 		    else
 			    inode->i_fop = &ocfs2_dops_no_plocks;
 		    i_size_write(inode, le64_to_cpu(fe->i_size));
+		    OCFS2_I(inode)->ip_dir_lock_gen = 1;
 		    break;
 	    case S_IFLNK:
 		    if (ocfs2_inode_is_fast_symlink(inode))

+ 5 - 7
fs/ocfs2/inode.h

@@ -46,30 +46,28 @@ struct ocfs2_inode_info
 	/* These fields are protected by ip_lock */
 	spinlock_t			ip_lock;
 	u32				ip_open_count;
-	u32				ip_clusters;
 	struct list_head		ip_io_markers;
+	u32				ip_clusters;
 
+	u16				ip_dyn_features;
 	struct mutex			ip_io_mutex;
-
 	u32				ip_flags; /* see below */
 	u32				ip_attr; /* inode attributes */
-	u16				ip_dyn_features;
 
 	/* protected by recovery_lock. */
 	struct inode			*ip_next_orphan;
 
-	u32				ip_dir_start_lookup;
-
 	struct ocfs2_caching_info	ip_metadata_cache;
-
 	struct ocfs2_extent_map		ip_extent_map;
-
 	struct inode			vfs_inode;
 	struct jbd2_inode		ip_jinode;
 
+	u32				ip_dir_start_lookup;
+
 	/* Only valid if the inode is the dir. */
 	u32				ip_last_used_slot;
 	u64				ip_last_used_group;
+	u32				ip_dir_lock_gen;
 
 	struct ocfs2_alloc_reservation	ip_la_data_resv;
 };

+ 356 - 0
fs/ocfs2/ioctl.c

@@ -26,6 +26,26 @@
 
 #include <linux/ext2_fs.h>
 
+#define o2info_from_user(a, b)	\
+		copy_from_user(&(a), (b), sizeof(a))
+#define o2info_to_user(a, b)	\
+		copy_to_user((typeof(a) __user *)b, &(a), sizeof(a))
+
+/*
+ * This call is void because we are already reporting an error that may
+ * be -EFAULT.  The error will be returned from the ioctl(2) call.  It's
+ * just a best-effort to tell userspace that this request caused the error.
+ */
+static inline void __o2info_set_request_error(struct ocfs2_info_request *kreq,
+					struct ocfs2_info_request __user *req)
+{
+	kreq->ir_flags |= OCFS2_INFO_FL_ERROR;
+	(void)put_user(kreq->ir_flags, (__u32 __user *)&(req->ir_flags));
+}
+
+#define o2info_set_request_error(a, b) \
+		__o2info_set_request_error((struct ocfs2_info_request *)&(a), b)
+
 static int ocfs2_get_inode_attr(struct inode *inode, unsigned *flags)
 {
 	int status;
@@ -109,6 +129,328 @@ bail:
 	return status;
 }
 
+int ocfs2_info_handle_blocksize(struct inode *inode,
+				struct ocfs2_info_request __user *req)
+{
+	int status = -EFAULT;
+	struct ocfs2_info_blocksize oib;
+
+	if (o2info_from_user(oib, req))
+		goto bail;
+
+	oib.ib_blocksize = inode->i_sb->s_blocksize;
+	oib.ib_req.ir_flags |= OCFS2_INFO_FL_FILLED;
+
+	if (o2info_to_user(oib, req))
+		goto bail;
+
+	status = 0;
+bail:
+	if (status)
+		o2info_set_request_error(oib, req);
+
+	return status;
+}
+
+int ocfs2_info_handle_clustersize(struct inode *inode,
+				  struct ocfs2_info_request __user *req)
+{
+	int status = -EFAULT;
+	struct ocfs2_info_clustersize oic;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+
+	if (o2info_from_user(oic, req))
+		goto bail;
+
+	oic.ic_clustersize = osb->s_clustersize;
+	oic.ic_req.ir_flags |= OCFS2_INFO_FL_FILLED;
+
+	if (o2info_to_user(oic, req))
+		goto bail;
+
+	status = 0;
+bail:
+	if (status)
+		o2info_set_request_error(oic, req);
+
+	return status;
+}
+
+int ocfs2_info_handle_maxslots(struct inode *inode,
+			       struct ocfs2_info_request __user *req)
+{
+	int status = -EFAULT;
+	struct ocfs2_info_maxslots oim;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+
+	if (o2info_from_user(oim, req))
+		goto bail;
+
+	oim.im_max_slots = osb->max_slots;
+	oim.im_req.ir_flags |= OCFS2_INFO_FL_FILLED;
+
+	if (o2info_to_user(oim, req))
+		goto bail;
+
+	status = 0;
+bail:
+	if (status)
+		o2info_set_request_error(oim, req);
+
+	return status;
+}
+
+int ocfs2_info_handle_label(struct inode *inode,
+			    struct ocfs2_info_request __user *req)
+{
+	int status = -EFAULT;
+	struct ocfs2_info_label oil;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+
+	if (o2info_from_user(oil, req))
+		goto bail;
+
+	memcpy(oil.il_label, osb->vol_label, OCFS2_MAX_VOL_LABEL_LEN);
+	oil.il_req.ir_flags |= OCFS2_INFO_FL_FILLED;
+
+	if (o2info_to_user(oil, req))
+		goto bail;
+
+	status = 0;
+bail:
+	if (status)
+		o2info_set_request_error(oil, req);
+
+	return status;
+}
+
+int ocfs2_info_handle_uuid(struct inode *inode,
+			   struct ocfs2_info_request __user *req)
+{
+	int status = -EFAULT;
+	struct ocfs2_info_uuid oiu;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+
+	if (o2info_from_user(oiu, req))
+		goto bail;
+
+	memcpy(oiu.iu_uuid_str, osb->uuid_str, OCFS2_TEXT_UUID_LEN + 1);
+	oiu.iu_req.ir_flags |= OCFS2_INFO_FL_FILLED;
+
+	if (o2info_to_user(oiu, req))
+		goto bail;
+
+	status = 0;
+bail:
+	if (status)
+		o2info_set_request_error(oiu, req);
+
+	return status;
+}
+
+int ocfs2_info_handle_fs_features(struct inode *inode,
+				  struct ocfs2_info_request __user *req)
+{
+	int status = -EFAULT;
+	struct ocfs2_info_fs_features oif;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+
+	if (o2info_from_user(oif, req))
+		goto bail;
+
+	oif.if_compat_features = osb->s_feature_compat;
+	oif.if_incompat_features = osb->s_feature_incompat;
+	oif.if_ro_compat_features = osb->s_feature_ro_compat;
+	oif.if_req.ir_flags |= OCFS2_INFO_FL_FILLED;
+
+	if (o2info_to_user(oif, req))
+		goto bail;
+
+	status = 0;
+bail:
+	if (status)
+		o2info_set_request_error(oif, req);
+
+	return status;
+}
+
+int ocfs2_info_handle_journal_size(struct inode *inode,
+				   struct ocfs2_info_request __user *req)
+{
+	int status = -EFAULT;
+	struct ocfs2_info_journal_size oij;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+
+	if (o2info_from_user(oij, req))
+		goto bail;
+
+	oij.ij_journal_size = osb->journal->j_inode->i_size;
+
+	oij.ij_req.ir_flags |= OCFS2_INFO_FL_FILLED;
+
+	if (o2info_to_user(oij, req))
+		goto bail;
+
+	status = 0;
+bail:
+	if (status)
+		o2info_set_request_error(oij, req);
+
+	return status;
+}
+
+int ocfs2_info_handle_unknown(struct inode *inode,
+			      struct ocfs2_info_request __user *req)
+{
+	int status = -EFAULT;
+	struct ocfs2_info_request oir;
+
+	if (o2info_from_user(oir, req))
+		goto bail;
+
+	oir.ir_flags &= ~OCFS2_INFO_FL_FILLED;
+
+	if (o2info_to_user(oir, req))
+		goto bail;
+
+	status = 0;
+bail:
+	if (status)
+		o2info_set_request_error(oir, req);
+
+	return status;
+}
+
+/*
+ * Validate and distinguish OCFS2_IOC_INFO requests.
+ *
+ * - validate the magic number.
+ * - distinguish different requests.
+ * - validate size of different requests.
+ */
+int ocfs2_info_handle_request(struct inode *inode,
+			      struct ocfs2_info_request __user *req)
+{
+	int status = -EFAULT;
+	struct ocfs2_info_request oir;
+
+	if (o2info_from_user(oir, req))
+		goto bail;
+
+	status = -EINVAL;
+	if (oir.ir_magic != OCFS2_INFO_MAGIC)
+		goto bail;
+
+	switch (oir.ir_code) {
+	case OCFS2_INFO_BLOCKSIZE:
+		if (oir.ir_size == sizeof(struct ocfs2_info_blocksize))
+			status = ocfs2_info_handle_blocksize(inode, req);
+		break;
+	case OCFS2_INFO_CLUSTERSIZE:
+		if (oir.ir_size == sizeof(struct ocfs2_info_clustersize))
+			status = ocfs2_info_handle_clustersize(inode, req);
+		break;
+	case OCFS2_INFO_MAXSLOTS:
+		if (oir.ir_size == sizeof(struct ocfs2_info_maxslots))
+			status = ocfs2_info_handle_maxslots(inode, req);
+		break;
+	case OCFS2_INFO_LABEL:
+		if (oir.ir_size == sizeof(struct ocfs2_info_label))
+			status = ocfs2_info_handle_label(inode, req);
+		break;
+	case OCFS2_INFO_UUID:
+		if (oir.ir_size == sizeof(struct ocfs2_info_uuid))
+			status = ocfs2_info_handle_uuid(inode, req);
+		break;
+	case OCFS2_INFO_FS_FEATURES:
+		if (oir.ir_size == sizeof(struct ocfs2_info_fs_features))
+			status = ocfs2_info_handle_fs_features(inode, req);
+		break;
+	case OCFS2_INFO_JOURNAL_SIZE:
+		if (oir.ir_size == sizeof(struct ocfs2_info_journal_size))
+			status = ocfs2_info_handle_journal_size(inode, req);
+		break;
+	default:
+		status = ocfs2_info_handle_unknown(inode, req);
+		break;
+	}
+
+bail:
+	return status;
+}
+
+int ocfs2_get_request_ptr(struct ocfs2_info *info, int idx,
+			  u64 *req_addr, int compat_flag)
+{
+	int status = -EFAULT;
+	u64 __user *bp = NULL;
+
+	if (compat_flag) {
+#ifdef CONFIG_COMPAT
+		/*
+		 * pointer bp stores the base address of a pointers array,
+		 * which collects all addresses of separate request.
+		 */
+		bp = (u64 __user *)(unsigned long)compat_ptr(info->oi_requests);
+#else
+		BUG();
+#endif
+	} else
+		bp = (u64 __user *)(unsigned long)(info->oi_requests);
+
+	if (o2info_from_user(*req_addr, bp + idx))
+		goto bail;
+
+	status = 0;
+bail:
+	return status;
+}
+
+/*
+ * OCFS2_IOC_INFO handles an array of requests passed from userspace.
+ *
+ * ocfs2_info_handle() recevies a large info aggregation, grab and
+ * validate the request count from header, then break it into small
+ * pieces, later specific handlers can handle them one by one.
+ *
+ * Idea here is to make each separate request small enough to ensure
+ * a better backward&forward compatibility, since a small piece of
+ * request will be less likely to be broken if disk layout get changed.
+ */
+int ocfs2_info_handle(struct inode *inode, struct ocfs2_info *info,
+		      int compat_flag)
+{
+	int i, status = 0;
+	u64 req_addr;
+	struct ocfs2_info_request __user *reqp;
+
+	if ((info->oi_count > OCFS2_INFO_MAX_REQUEST) ||
+	    (!info->oi_requests)) {
+		status = -EINVAL;
+		goto bail;
+	}
+
+	for (i = 0; i < info->oi_count; i++) {
+
+		status = ocfs2_get_request_ptr(info, i, &req_addr, compat_flag);
+		if (status)
+			break;
+
+		reqp = (struct ocfs2_info_request *)(unsigned long)req_addr;
+		if (!reqp) {
+			status = -EINVAL;
+			goto bail;
+		}
+
+		status = ocfs2_info_handle_request(inode, reqp);
+		if (status)
+			break;
+	}
+
+bail:
+	return status;
+}
+
 long ocfs2_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
 {
 	struct inode *inode = filp->f_path.dentry->d_inode;
@@ -120,6 +462,7 @@ long ocfs2_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
 	struct reflink_arguments args;
 	const char *old_path, *new_path;
 	bool preserve;
+	struct ocfs2_info info;
 
 	switch (cmd) {
 	case OCFS2_IOC_GETFLAGS:
@@ -174,6 +517,12 @@ long ocfs2_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
 		preserve = (args.preserve != 0);
 
 		return ocfs2_reflink_ioctl(inode, old_path, new_path, preserve);
+	case OCFS2_IOC_INFO:
+		if (copy_from_user(&info, (struct ocfs2_info __user *)arg,
+				   sizeof(struct ocfs2_info)))
+			return -EFAULT;
+
+		return ocfs2_info_handle(inode, &info, 0);
 	default:
 		return -ENOTTY;
 	}
@@ -185,6 +534,7 @@ long ocfs2_compat_ioctl(struct file *file, unsigned cmd, unsigned long arg)
 	bool preserve;
 	struct reflink_arguments args;
 	struct inode *inode = file->f_path.dentry->d_inode;
+	struct ocfs2_info info;
 
 	switch (cmd) {
 	case OCFS2_IOC32_GETFLAGS:
@@ -209,6 +559,12 @@ long ocfs2_compat_ioctl(struct file *file, unsigned cmd, unsigned long arg)
 
 		return ocfs2_reflink_ioctl(inode, compat_ptr(args.old_path),
 					   compat_ptr(args.new_path), preserve);
+	case OCFS2_IOC_INFO:
+		if (copy_from_user(&info, (struct ocfs2_info __user *)arg,
+				   sizeof(struct ocfs2_info)))
+			return -EFAULT;
+
+		return ocfs2_info_handle(inode, &info, 1);
 	default:
 		return -ENOIOCTLCMD;
 	}

+ 4 - 5
fs/ocfs2/journal.c

@@ -301,7 +301,6 @@ static int ocfs2_commit_cache(struct ocfs2_super *osb)
 {
 	int status = 0;
 	unsigned int flushed;
-	unsigned long old_id;
 	struct ocfs2_journal *journal = NULL;
 
 	mlog_entry_void();
@@ -326,7 +325,7 @@ static int ocfs2_commit_cache(struct ocfs2_super *osb)
 		goto finally;
 	}
 
-	old_id = ocfs2_inc_trans_id(journal);
+	ocfs2_inc_trans_id(journal);
 
 	flushed = atomic_read(&journal->j_num_trans);
 	atomic_set(&journal->j_num_trans, 0);
@@ -342,9 +341,6 @@ finally:
 	return status;
 }
 
-/* pass it NULL and it will allocate a new handle object for you.  If
- * you pass it a handle however, it may still return error, in which
- * case it has free'd the passed handle for you. */
 handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
 {
 	journal_t *journal = osb->journal->j_journal;
@@ -1888,6 +1884,8 @@ void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
 
 	os = &osb->osb_orphan_scan;
 
+	mlog(0, "Begin orphan scan\n");
+
 	if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE)
 		goto out;
 
@@ -1920,6 +1918,7 @@ void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
 unlock:
 	ocfs2_orphan_scan_unlock(osb, seqno);
 out:
+	mlog(0, "Orphan scan completed\n");
 	return;
 }
 

+ 2 - 1
fs/ocfs2/journal.h

@@ -67,11 +67,12 @@ struct ocfs2_journal {
 	struct buffer_head        *j_bh;      /* Journal disk inode block */
 	atomic_t                  j_num_trans; /* Number of transactions
 					        * currently in the system. */
+	spinlock_t                j_lock;
 	unsigned long             j_trans_id;
 	struct rw_semaphore       j_trans_barrier;
 	wait_queue_head_t         j_checkpointed;
 
-	spinlock_t                j_lock;
+	/* both fields protected by j_lock*/
 	struct list_head          j_la_cleanups;
 	struct work_struct        j_recovery_work;
 };

+ 4 - 3
fs/ocfs2/mmap.c

@@ -59,10 +59,11 @@ static int ocfs2_fault(struct vm_area_struct *area, struct vm_fault *vmf)
 	return ret;
 }
 
-static int __ocfs2_page_mkwrite(struct inode *inode, struct buffer_head *di_bh,
+static int __ocfs2_page_mkwrite(struct file *file, struct buffer_head *di_bh,
 				struct page *page)
 {
 	int ret;
+	struct inode *inode = file->f_path.dentry->d_inode;
 	struct address_space *mapping = inode->i_mapping;
 	loff_t pos = page_offset(page);
 	unsigned int len = PAGE_CACHE_SIZE;
@@ -111,7 +112,7 @@ static int __ocfs2_page_mkwrite(struct inode *inode, struct buffer_head *di_bh,
 	if (page->index == last_index)
 		len = ((size - 1) & ~PAGE_CACHE_MASK) + 1;
 
-	ret = ocfs2_write_begin_nolock(mapping, pos, len, 0, &locked_page,
+	ret = ocfs2_write_begin_nolock(file, mapping, pos, len, 0, &locked_page,
 				       &fsdata, di_bh, page);
 	if (ret) {
 		if (ret != -ENOSPC)
@@ -159,7 +160,7 @@ static int ocfs2_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 	 */
 	down_write(&OCFS2_I(inode)->ip_alloc_sem);
 
-	ret = __ocfs2_page_mkwrite(inode, di_bh, page);
+	ret = __ocfs2_page_mkwrite(vma->vm_file, di_bh, page);
 
 	up_write(&OCFS2_I(inode)->ip_alloc_sem);
 

+ 2 - 1
fs/ocfs2/namei.c

@@ -171,7 +171,8 @@ bail_add:
 			ret = ERR_PTR(status);
 			goto bail_unlock;
 		}
-	}
+	} else
+		ocfs2_dentry_attach_gen(dentry);
 
 bail_unlock:
 	/* Don't drop the cluster lock until *after* the d_add --

+ 51 - 12
fs/ocfs2/ocfs2.h

@@ -150,26 +150,33 @@ typedef void (*ocfs2_lock_callback)(int status, unsigned long data);
 struct ocfs2_lock_res {
 	void                    *l_priv;
 	struct ocfs2_lock_res_ops *l_ops;
-	spinlock_t               l_lock;
+
 
 	struct list_head         l_blocked_list;
 	struct list_head         l_mask_waiters;
 
-	enum ocfs2_lock_type     l_type;
 	unsigned long		 l_flags;
 	char                     l_name[OCFS2_LOCK_ID_MAX_LEN];
-	int                      l_level;
 	unsigned int             l_ro_holders;
 	unsigned int             l_ex_holders;
-	struct ocfs2_dlm_lksb    l_lksb;
+	unsigned char            l_level;
+
+	/* Data packed - type enum ocfs2_lock_type */
+	unsigned char            l_type;
 
 	/* used from AST/BAST funcs. */
-	enum ocfs2_ast_action    l_action;
-	enum ocfs2_unlock_action l_unlock_action;
-	int                      l_requested;
-	int                      l_blocking;
+	/* Data packed - enum type ocfs2_ast_action */
+	unsigned char            l_action;
+	/* Data packed - enum type ocfs2_unlock_action */
+	unsigned char            l_unlock_action;
+	unsigned char            l_requested;
+	unsigned char            l_blocking;
 	unsigned int             l_pending_gen;
 
+	spinlock_t               l_lock;
+
+	struct ocfs2_dlm_lksb    l_lksb;
+
 	wait_queue_head_t        l_event;
 
 	struct list_head         l_debug_list;
@@ -243,7 +250,7 @@ enum ocfs2_local_alloc_state
 
 enum ocfs2_mount_options
 {
-	OCFS2_MOUNT_HB_LOCAL   = 1 << 0, /* Heartbeat started in local mode */
+	OCFS2_MOUNT_HB_LOCAL = 1 << 0, /* Local heartbeat */
 	OCFS2_MOUNT_BARRIER = 1 << 1,	/* Use block barriers */
 	OCFS2_MOUNT_NOINTR  = 1 << 2,   /* Don't catch signals */
 	OCFS2_MOUNT_ERRORS_PANIC = 1 << 3, /* Panic on errors */
@@ -256,6 +263,10 @@ enum ocfs2_mount_options
 						   control lists */
 	OCFS2_MOUNT_USRQUOTA = 1 << 10, /* We support user quotas */
 	OCFS2_MOUNT_GRPQUOTA = 1 << 11, /* We support group quotas */
+	OCFS2_MOUNT_COHERENCY_BUFFERED = 1 << 12, /* Allow concurrent O_DIRECT
+						     writes */
+	OCFS2_MOUNT_HB_NONE = 1 << 13, /* No heartbeat */
+	OCFS2_MOUNT_HB_GLOBAL = 1 << 14, /* Global heartbeat */
 };
 
 #define OCFS2_OSB_SOFT_RO			0x0001
@@ -277,7 +288,8 @@ struct ocfs2_super
 	struct super_block *sb;
 	struct inode *root_inode;
 	struct inode *sys_root_inode;
-	struct inode *system_inodes[NUM_SYSTEM_INODES];
+	struct inode *global_system_inodes[NUM_GLOBAL_SYSTEM_INODES];
+	struct inode **local_system_inodes;
 
 	struct ocfs2_slot_info *slot_info;
 
@@ -368,6 +380,8 @@ struct ocfs2_super
 	struct ocfs2_alloc_stats alloc_stats;
 	char dev_str[20];		/* "major,minor" of the device */
 
+	u8 osb_stackflags;
+
 	char osb_cluster_stack[OCFS2_STACK_LABEL_LEN + 1];
 	struct ocfs2_cluster_connection *cconn;
 	struct ocfs2_lock_res osb_super_lockres;
@@ -601,10 +615,35 @@ static inline int ocfs2_is_soft_readonly(struct ocfs2_super *osb)
 	return ret;
 }
 
-static inline int ocfs2_userspace_stack(struct ocfs2_super *osb)
+static inline int ocfs2_clusterinfo_valid(struct ocfs2_super *osb)
 {
 	return (osb->s_feature_incompat &
-		OCFS2_FEATURE_INCOMPAT_USERSPACE_STACK);
+		(OCFS2_FEATURE_INCOMPAT_USERSPACE_STACK |
+		 OCFS2_FEATURE_INCOMPAT_CLUSTERINFO));
+}
+
+static inline int ocfs2_userspace_stack(struct ocfs2_super *osb)
+{
+	if (ocfs2_clusterinfo_valid(osb) &&
+	    memcmp(osb->osb_cluster_stack, OCFS2_CLASSIC_CLUSTER_STACK,
+		   OCFS2_STACK_LABEL_LEN))
+		return 1;
+	return 0;
+}
+
+static inline int ocfs2_o2cb_stack(struct ocfs2_super *osb)
+{
+	if (ocfs2_clusterinfo_valid(osb) &&
+	    !memcmp(osb->osb_cluster_stack, OCFS2_CLASSIC_CLUSTER_STACK,
+		   OCFS2_STACK_LABEL_LEN))
+		return 1;
+	return 0;
+}
+
+static inline int ocfs2_cluster_o2cb_global_heartbeat(struct ocfs2_super *osb)
+{
+	return ocfs2_o2cb_stack(osb) &&
+		(osb->osb_stackflags & OCFS2_CLUSTER_O2CB_GLOBAL_HEARTBEAT);
 }
 
 static inline int ocfs2_mount_local(struct ocfs2_super *osb)

+ 40 - 6
fs/ocfs2/ocfs2_fs.h

@@ -101,7 +101,8 @@
 					 | OCFS2_FEATURE_INCOMPAT_META_ECC \
 					 | OCFS2_FEATURE_INCOMPAT_INDEXED_DIRS \
 					 | OCFS2_FEATURE_INCOMPAT_REFCOUNT_TREE \
-					 | OCFS2_FEATURE_INCOMPAT_DISCONTIG_BG)
+					 | OCFS2_FEATURE_INCOMPAT_DISCONTIG_BG	\
+					 | OCFS2_FEATURE_INCOMPAT_CLUSTERINFO)
 #define OCFS2_FEATURE_RO_COMPAT_SUPP	(OCFS2_FEATURE_RO_COMPAT_UNWRITTEN \
 					 | OCFS2_FEATURE_RO_COMPAT_USRQUOTA \
 					 | OCFS2_FEATURE_RO_COMPAT_GRPQUOTA)
@@ -169,6 +170,13 @@
 /* Discontigous block groups */
 #define OCFS2_FEATURE_INCOMPAT_DISCONTIG_BG	0x2000
 
+/*
+ * Incompat bit to indicate useable clusterinfo with stackflags for all
+ * cluster stacks (userspace adnd o2cb). If this bit is set,
+ * INCOMPAT_USERSPACE_STACK becomes superfluous and thus should not be set.
+ */
+#define OCFS2_FEATURE_INCOMPAT_CLUSTERINFO	0x4000
+
 /*
  * backup superblock flag is used to indicate that this volume
  * has backup superblocks.
@@ -292,10 +300,13 @@
 #define OCFS2_VOL_UUID_LEN		16
 #define OCFS2_MAX_VOL_LABEL_LEN		64
 
-/* The alternate, userspace stack fields */
+/* The cluster stack fields */
 #define OCFS2_STACK_LABEL_LEN		4
 #define OCFS2_CLUSTER_NAME_LEN		16
 
+/* Classic (historically speaking) cluster stack */
+#define OCFS2_CLASSIC_CLUSTER_STACK	"o2cb"
+
 /* Journal limits (in bytes) */
 #define OCFS2_MIN_JOURNAL_SIZE		(4 * 1024 * 1024)
 
@@ -305,6 +316,11 @@
  */
 #define OCFS2_MIN_XATTR_INLINE_SIZE     256
 
+/*
+ * Cluster info flags (ocfs2_cluster_info.ci_stackflags)
+ */
+#define OCFS2_CLUSTER_O2CB_GLOBAL_HEARTBEAT	(0x01)
+
 struct ocfs2_system_inode_info {
 	char	*si_name;
 	int	si_iflags;
@@ -322,6 +338,7 @@ enum {
 	USER_QUOTA_SYSTEM_INODE,
 	GROUP_QUOTA_SYSTEM_INODE,
 #define OCFS2_LAST_GLOBAL_SYSTEM_INODE GROUP_QUOTA_SYSTEM_INODE
+#define OCFS2_FIRST_LOCAL_SYSTEM_INODE ORPHAN_DIR_SYSTEM_INODE
 	ORPHAN_DIR_SYSTEM_INODE,
 	EXTENT_ALLOC_SYSTEM_INODE,
 	INODE_ALLOC_SYSTEM_INODE,
@@ -330,8 +347,12 @@ enum {
 	TRUNCATE_LOG_SYSTEM_INODE,
 	LOCAL_USER_QUOTA_SYSTEM_INODE,
 	LOCAL_GROUP_QUOTA_SYSTEM_INODE,
+#define OCFS2_LAST_LOCAL_SYSTEM_INODE LOCAL_GROUP_QUOTA_SYSTEM_INODE
 	NUM_SYSTEM_INODES
 };
+#define NUM_GLOBAL_SYSTEM_INODES OCFS2_LAST_GLOBAL_SYSTEM_INODE
+#define NUM_LOCAL_SYSTEM_INODES	\
+		(NUM_SYSTEM_INODES - OCFS2_FIRST_LOCAL_SYSTEM_INODE)
 
 static struct ocfs2_system_inode_info ocfs2_system_inodes[NUM_SYSTEM_INODES] = {
 	/* Global system inodes (single copy) */
@@ -360,6 +381,7 @@ static struct ocfs2_system_inode_info ocfs2_system_inodes[NUM_SYSTEM_INODES] = {
 /* Parameter passed from mount.ocfs2 to module */
 #define OCFS2_HB_NONE			"heartbeat=none"
 #define OCFS2_HB_LOCAL			"heartbeat=local"
+#define OCFS2_HB_GLOBAL			"heartbeat=global"
 
 /*
  * OCFS2 directory file types.  Only the low 3 bits are used.  The
@@ -566,9 +588,21 @@ struct ocfs2_slot_map_extended {
  */
 };
 
+/*
+ * ci_stackflags is only valid if the incompat bit
+ * OCFS2_FEATURE_INCOMPAT_CLUSTERINFO is set.
+ */
 struct ocfs2_cluster_info {
 /*00*/	__u8   ci_stack[OCFS2_STACK_LABEL_LEN];
-	__le32 ci_reserved;
+	union {
+		__le32 ci_reserved;
+		struct {
+			__u8 ci_stackflags;
+			__u8 ci_reserved1;
+			__u8 ci_reserved2;
+			__u8 ci_reserved3;
+		};
+	};
 /*08*/	__u8   ci_cluster[OCFS2_CLUSTER_NAME_LEN];
 /*18*/
 };
@@ -605,9 +639,9 @@ struct ocfs2_super_block {
 					 * group header */
 /*50*/	__u8  s_label[OCFS2_MAX_VOL_LABEL_LEN];	/* Label for mounting, etc. */
 /*90*/	__u8  s_uuid[OCFS2_VOL_UUID_LEN];	/* 128-bit uuid */
-/*A0*/  struct ocfs2_cluster_info s_cluster_info; /* Selected userspace
-						     stack.  Only valid
-						     with INCOMPAT flag. */
+/*A0*/  struct ocfs2_cluster_info s_cluster_info; /* Only valid if either
+						     userspace or clusterinfo
+						     INCOMPAT flag set. */
 /*B8*/	__le16 s_xattr_inline_size;	/* extended attribute inline size
 					   for this fs*/
 	__le16 s_reserved0;

+ 95 - 0
fs/ocfs2/ocfs2_ioctl.h

@@ -76,4 +76,99 @@ struct reflink_arguments {
 };
 #define OCFS2_IOC_REFLINK	_IOW('o', 4, struct reflink_arguments)
 
+/* Following definitions dedicated for ocfs2_info_request ioctls. */
+#define OCFS2_INFO_MAX_REQUEST		(50)
+#define OCFS2_TEXT_UUID_LEN		(OCFS2_VOL_UUID_LEN * 2)
+
+/* Magic number of all requests */
+#define OCFS2_INFO_MAGIC		(0x4F32494E)
+
+/*
+ * Always try to separate info request into small pieces to
+ * guarantee the backward&forward compatibility.
+ */
+struct ocfs2_info {
+	__u64 oi_requests;	/* Array of __u64 pointers to requests */
+	__u32 oi_count;		/* Number of requests in info_requests */
+	__u32 oi_pad;
+};
+
+struct ocfs2_info_request {
+/*00*/	__u32 ir_magic;	/* Magic number */
+	__u32 ir_code;	/* Info request code */
+	__u32 ir_size;	/* Size of request */
+	__u32 ir_flags;	/* Request flags */
+/*10*/			/* Request specific fields */
+};
+
+struct ocfs2_info_clustersize {
+	struct ocfs2_info_request ic_req;
+	__u32 ic_clustersize;
+	__u32 ic_pad;
+};
+
+struct ocfs2_info_blocksize {
+	struct ocfs2_info_request ib_req;
+	__u32 ib_blocksize;
+	__u32 ib_pad;
+};
+
+struct ocfs2_info_maxslots {
+	struct ocfs2_info_request im_req;
+	__u32 im_max_slots;
+	__u32 im_pad;
+};
+
+struct ocfs2_info_label {
+	struct ocfs2_info_request il_req;
+	__u8	il_label[OCFS2_MAX_VOL_LABEL_LEN];
+} __attribute__ ((packed));
+
+struct ocfs2_info_uuid {
+	struct ocfs2_info_request iu_req;
+	__u8	iu_uuid_str[OCFS2_TEXT_UUID_LEN + 1];
+} __attribute__ ((packed));
+
+struct ocfs2_info_fs_features {
+	struct ocfs2_info_request if_req;
+	__u32 if_compat_features;
+	__u32 if_incompat_features;
+	__u32 if_ro_compat_features;
+	__u32 if_pad;
+};
+
+struct ocfs2_info_journal_size {
+	struct ocfs2_info_request ij_req;
+	__u64 ij_journal_size;
+};
+
+/* Codes for ocfs2_info_request */
+enum ocfs2_info_type {
+	OCFS2_INFO_CLUSTERSIZE = 1,
+	OCFS2_INFO_BLOCKSIZE,
+	OCFS2_INFO_MAXSLOTS,
+	OCFS2_INFO_LABEL,
+	OCFS2_INFO_UUID,
+	OCFS2_INFO_FS_FEATURES,
+	OCFS2_INFO_JOURNAL_SIZE,
+	OCFS2_INFO_NUM_TYPES
+};
+
+/* Flags for struct ocfs2_info_request */
+/* Filled by the caller */
+#define OCFS2_INFO_FL_NON_COHERENT	(0x00000001)	/* Cluster coherency not
+							   required. This is a hint.
+							   It is up to ocfs2 whether
+							   the request can be fulfilled
+							   without locking. */
+/* Filled by ocfs2 */
+#define OCFS2_INFO_FL_FILLED		(0x40000000)	/* Filesystem understood
+							   this request and
+							   filled in the answer */
+
+#define OCFS2_INFO_FL_ERROR		(0x80000000)	/* Error happened during
+							   request handling. */
+
+#define OCFS2_IOC_INFO		_IOR('o', 5, struct ocfs2_info)
+
 #endif /* OCFS2_IOCTL_H */

+ 41 - 2
fs/ocfs2/refcounttree.c

@@ -49,6 +49,7 @@
 
 struct ocfs2_cow_context {
 	struct inode *inode;
+	struct file *file;
 	u32 cow_start;
 	u32 cow_len;
 	struct ocfs2_extent_tree data_et;
@@ -2932,13 +2933,16 @@ static int ocfs2_duplicate_clusters_by_page(handle_t *handle,
 	u64 new_block = ocfs2_clusters_to_blocks(sb, new_cluster);
 	struct page *page;
 	pgoff_t page_index;
-	unsigned int from, to;
+	unsigned int from, to, readahead_pages;
 	loff_t offset, end, map_end;
 	struct address_space *mapping = context->inode->i_mapping;
 
 	mlog(0, "old_cluster %u, new %u, len %u at offset %u\n", old_cluster,
 	     new_cluster, new_len, cpos);
 
+	readahead_pages =
+		(ocfs2_cow_contig_clusters(sb) <<
+		 OCFS2_SB(sb)->s_clustersize_bits) >> PAGE_CACHE_SHIFT;
 	offset = ((loff_t)cpos) << OCFS2_SB(sb)->s_clustersize_bits;
 	end = offset + (new_len << OCFS2_SB(sb)->s_clustersize_bits);
 	/*
@@ -2969,6 +2973,14 @@ static int ocfs2_duplicate_clusters_by_page(handle_t *handle,
 		if (PAGE_CACHE_SIZE <= OCFS2_SB(sb)->s_clustersize)
 			BUG_ON(PageDirty(page));
 
+		if (PageReadahead(page) && context->file) {
+			page_cache_async_readahead(mapping,
+						   &context->file->f_ra,
+						   context->file,
+						   page, page_index,
+						   readahead_pages);
+		}
+
 		if (!PageUptodate(page)) {
 			ret = block_read_full_page(page, ocfs2_get_block);
 			if (ret) {
@@ -3409,12 +3421,35 @@ static int ocfs2_replace_cow(struct ocfs2_cow_context *context)
 	return ret;
 }
 
+static void ocfs2_readahead_for_cow(struct inode *inode,
+				    struct file *file,
+				    u32 start, u32 len)
+{
+	struct address_space *mapping;
+	pgoff_t index;
+	unsigned long num_pages;
+	int cs_bits = OCFS2_SB(inode->i_sb)->s_clustersize_bits;
+
+	if (!file)
+		return;
+
+	mapping = file->f_mapping;
+	num_pages = (len << cs_bits) >> PAGE_CACHE_SHIFT;
+	if (!num_pages)
+		num_pages = 1;
+
+	index = ((loff_t)start << cs_bits) >> PAGE_CACHE_SHIFT;
+	page_cache_sync_readahead(mapping, &file->f_ra, file,
+				  index, num_pages);
+}
+
 /*
  * Starting at cpos, try to CoW write_len clusters.  Don't CoW
  * past max_cpos.  This will stop when it runs into a hole or an
  * unrefcounted extent.
  */
 static int ocfs2_refcount_cow_hunk(struct inode *inode,
+				   struct file *file,
 				   struct buffer_head *di_bh,
 				   u32 cpos, u32 write_len, u32 max_cpos)
 {
@@ -3443,6 +3478,8 @@ static int ocfs2_refcount_cow_hunk(struct inode *inode,
 
 	BUG_ON(cow_len == 0);
 
+	ocfs2_readahead_for_cow(inode, file, cow_start, cow_len);
+
 	context = kzalloc(sizeof(struct ocfs2_cow_context), GFP_NOFS);
 	if (!context) {
 		ret = -ENOMEM;
@@ -3464,6 +3501,7 @@ static int ocfs2_refcount_cow_hunk(struct inode *inode,
 	context->ref_root_bh = ref_root_bh;
 	context->cow_duplicate_clusters = ocfs2_duplicate_clusters_by_page;
 	context->get_clusters = ocfs2_di_get_clusters;
+	context->file = file;
 
 	ocfs2_init_dinode_extent_tree(&context->data_et,
 				      INODE_CACHE(inode), di_bh);
@@ -3492,6 +3530,7 @@ out:
  * clusters between cpos and cpos+write_len are safe to modify.
  */
 int ocfs2_refcount_cow(struct inode *inode,
+		       struct file *file,
 		       struct buffer_head *di_bh,
 		       u32 cpos, u32 write_len, u32 max_cpos)
 {
@@ -3511,7 +3550,7 @@ int ocfs2_refcount_cow(struct inode *inode,
 			num_clusters = write_len;
 
 		if (ext_flags & OCFS2_EXT_REFCOUNTED) {
-			ret = ocfs2_refcount_cow_hunk(inode, di_bh, cpos,
+			ret = ocfs2_refcount_cow_hunk(inode, file, di_bh, cpos,
 						      num_clusters, max_cpos);
 			if (ret) {
 				mlog_errno(ret);

+ 4 - 3
fs/ocfs2/refcounttree.h

@@ -21,14 +21,14 @@ struct ocfs2_refcount_tree {
 	struct rb_node rf_node;
 	u64 rf_blkno;
 	u32 rf_generation;
+	struct kref rf_getcnt;
 	struct rw_semaphore rf_sem;
 	struct ocfs2_lock_res rf_lockres;
-	struct kref rf_getcnt;
 	int rf_removed;
 
 	/* the following 4 fields are used by caching_info. */
-	struct ocfs2_caching_info rf_ci;
 	spinlock_t rf_lock;
+	struct ocfs2_caching_info rf_ci;
 	struct mutex rf_io_mutex;
 	struct super_block *rf_sb;
 };
@@ -52,7 +52,8 @@ int ocfs2_prepare_refcount_change_for_del(struct inode *inode,
 					  u32 clusters,
 					  int *credits,
 					  int *ref_blocks);
-int ocfs2_refcount_cow(struct inode *inode, struct buffer_head *di_bh,
+int ocfs2_refcount_cow(struct inode *inode,
+		       struct file *filep, struct buffer_head *di_bh,
 		       u32 cpos, u32 write_len, u32 max_cpos);
 
 typedef int (ocfs2_post_refcount_func)(struct inode *inode,

+ 1 - 1
fs/ocfs2/slot_map.c

@@ -357,7 +357,7 @@ static int ocfs2_map_slot_buffers(struct ocfs2_super *osb,
 {
 	int status = 0;
 	u64 blkno;
-	unsigned long long blocks, bytes;
+	unsigned long long blocks, bytes = 0;
 	unsigned int i;
 	struct buffer_head *bh;
 

+ 2 - 0
fs/ocfs2/stack_o2cb.c

@@ -283,6 +283,8 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn)
 	/* for now we only have one cluster/node, make sure we see it
 	 * in the heartbeat universe */
 	if (!o2hb_check_local_node_heartbeating()) {
+		if (o2hb_global_heartbeat_active())
+			mlog(ML_ERROR, "Global heartbeat not started\n");
 		rc = -EINVAL;
 		goto out;
 	}

+ 16 - 0
fs/ocfs2/suballoc.c

@@ -1380,6 +1380,14 @@ static inline int ocfs2_block_group_set_bits(handle_t *handle,
 	}
 
 	le16_add_cpu(&bg->bg_free_bits_count, -num_bits);
+	if (le16_to_cpu(bg->bg_free_bits_count) > le16_to_cpu(bg->bg_bits)) {
+		ocfs2_error(alloc_inode->i_sb, "Group descriptor # %llu has bit"
+			    " count %u but claims %u are freed. num_bits %d",
+			    (unsigned long long)le64_to_cpu(bg->bg_blkno),
+			    le16_to_cpu(bg->bg_bits),
+			    le16_to_cpu(bg->bg_free_bits_count), num_bits);
+		return -EROFS;
+	}
 	while(num_bits--)
 		ocfs2_set_bit(bit_off++, bitmap);
 
@@ -2419,6 +2427,14 @@ static int ocfs2_block_group_clear_bits(handle_t *handle,
 				(unsigned long *) undo_bg->bg_bitmap);
 	}
 	le16_add_cpu(&bg->bg_free_bits_count, num_bits);
+	if (le16_to_cpu(bg->bg_free_bits_count) > le16_to_cpu(bg->bg_bits)) {
+		ocfs2_error(alloc_inode->i_sb, "Group descriptor # %llu has bit"
+			    " count %u but claims %u are freed. num_bits %d",
+			    (unsigned long long)le64_to_cpu(bg->bg_blkno),
+			    le16_to_cpu(bg->bg_bits),
+			    le16_to_cpu(bg->bg_free_bits_count), num_bits);
+		return -EROFS;
+	}
 
 	if (undo_fn)
 		jbd_unlock_bh_state(group_bh);

+ 131 - 32
fs/ocfs2/super.c

@@ -162,6 +162,7 @@ enum {
 	Opt_nointr,
 	Opt_hb_none,
 	Opt_hb_local,
+	Opt_hb_global,
 	Opt_data_ordered,
 	Opt_data_writeback,
 	Opt_atime_quantum,
@@ -177,6 +178,8 @@ enum {
 	Opt_noacl,
 	Opt_usrquota,
 	Opt_grpquota,
+	Opt_coherency_buffered,
+	Opt_coherency_full,
 	Opt_resv_level,
 	Opt_dir_resv_level,
 	Opt_err,
@@ -190,6 +193,7 @@ static const match_table_t tokens = {
 	{Opt_nointr, "nointr"},
 	{Opt_hb_none, OCFS2_HB_NONE},
 	{Opt_hb_local, OCFS2_HB_LOCAL},
+	{Opt_hb_global, OCFS2_HB_GLOBAL},
 	{Opt_data_ordered, "data=ordered"},
 	{Opt_data_writeback, "data=writeback"},
 	{Opt_atime_quantum, "atime_quantum=%u"},
@@ -205,6 +209,8 @@ static const match_table_t tokens = {
 	{Opt_noacl, "noacl"},
 	{Opt_usrquota, "usrquota"},
 	{Opt_grpquota, "grpquota"},
+	{Opt_coherency_buffered, "coherency=buffered"},
+	{Opt_coherency_full, "coherency=full"},
 	{Opt_resv_level, "resv_level=%u"},
 	{Opt_dir_resv_level, "dir_resv_level=%u"},
 	{Opt_err, NULL}
@@ -514,11 +520,11 @@ static void ocfs2_release_system_inodes(struct ocfs2_super *osb)
 
 	mlog_entry_void();
 
-	for (i = 0; i < NUM_SYSTEM_INODES; i++) {
-		inode = osb->system_inodes[i];
+	for (i = 0; i < NUM_GLOBAL_SYSTEM_INODES; i++) {
+		inode = osb->global_system_inodes[i];
 		if (inode) {
 			iput(inode);
-			osb->system_inodes[i] = NULL;
+			osb->global_system_inodes[i] = NULL;
 		}
 	}
 
@@ -534,6 +540,20 @@ static void ocfs2_release_system_inodes(struct ocfs2_super *osb)
 		osb->root_inode = NULL;
 	}
 
+	if (!osb->local_system_inodes)
+		goto out;
+
+	for (i = 0; i < NUM_LOCAL_SYSTEM_INODES * osb->max_slots; i++) {
+		if (osb->local_system_inodes[i]) {
+			iput(osb->local_system_inodes[i]);
+			osb->local_system_inodes[i] = NULL;
+		}
+	}
+
+	kfree(osb->local_system_inodes);
+	osb->local_system_inodes = NULL;
+
+out:
 	mlog_exit(0);
 }
 
@@ -608,6 +628,7 @@ static int ocfs2_remount(struct super_block *sb, int *flags, char *data)
 	int ret = 0;
 	struct mount_options parsed_options;
 	struct ocfs2_super *osb = OCFS2_SB(sb);
+	u32 tmp;
 
 	lock_kernel();
 
@@ -617,8 +638,9 @@ static int ocfs2_remount(struct super_block *sb, int *flags, char *data)
 		goto out;
 	}
 
-	if ((osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) !=
-	    (parsed_options.mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
+	tmp = OCFS2_MOUNT_HB_LOCAL | OCFS2_MOUNT_HB_GLOBAL |
+		OCFS2_MOUNT_HB_NONE;
+	if ((osb->s_mount_opt & tmp) != (parsed_options.mount_opt & tmp)) {
 		ret = -EINVAL;
 		mlog(ML_ERROR, "Cannot change heartbeat mode on remount\n");
 		goto out;
@@ -809,23 +831,29 @@ bail:
 
 static int ocfs2_verify_heartbeat(struct ocfs2_super *osb)
 {
-	if (ocfs2_mount_local(osb)) {
-		if (osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) {
+	u32 hb_enabled = OCFS2_MOUNT_HB_LOCAL | OCFS2_MOUNT_HB_GLOBAL;
+
+	if (osb->s_mount_opt & hb_enabled) {
+		if (ocfs2_mount_local(osb)) {
 			mlog(ML_ERROR, "Cannot heartbeat on a locally "
 			     "mounted device.\n");
 			return -EINVAL;
 		}
-	}
-
-	if (ocfs2_userspace_stack(osb)) {
-		if (osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) {
+		if (ocfs2_userspace_stack(osb)) {
 			mlog(ML_ERROR, "Userspace stack expected, but "
 			     "o2cb heartbeat arguments passed to mount\n");
 			return -EINVAL;
 		}
+		if (((osb->s_mount_opt & OCFS2_MOUNT_HB_GLOBAL) &&
+		     !ocfs2_cluster_o2cb_global_heartbeat(osb)) ||
+		    ((osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) &&
+		     ocfs2_cluster_o2cb_global_heartbeat(osb))) {
+			mlog(ML_ERROR, "Mismatching o2cb heartbeat modes\n");
+			return -EINVAL;
+		}
 	}
 
-	if (!(osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
+	if (!(osb->s_mount_opt & hb_enabled)) {
 		if (!ocfs2_mount_local(osb) && !ocfs2_is_hard_readonly(osb) &&
 		    !ocfs2_userspace_stack(osb)) {
 			mlog(ML_ERROR, "Heartbeat has to be started to mount "
@@ -1291,6 +1319,7 @@ static int ocfs2_parse_options(struct super_block *sb,
 {
 	int status;
 	char *p;
+	u32 tmp;
 
 	mlog_entry("remount: %d, options: \"%s\"\n", is_remount,
 		   options ? options : "(none)");
@@ -1322,7 +1351,10 @@ static int ocfs2_parse_options(struct super_block *sb,
 			mopt->mount_opt |= OCFS2_MOUNT_HB_LOCAL;
 			break;
 		case Opt_hb_none:
-			mopt->mount_opt &= ~OCFS2_MOUNT_HB_LOCAL;
+			mopt->mount_opt |= OCFS2_MOUNT_HB_NONE;
+			break;
+		case Opt_hb_global:
+			mopt->mount_opt |= OCFS2_MOUNT_HB_GLOBAL;
 			break;
 		case Opt_barrier:
 			if (match_int(&args[0], &option)) {
@@ -1438,6 +1470,12 @@ static int ocfs2_parse_options(struct super_block *sb,
 		case Opt_grpquota:
 			mopt->mount_opt |= OCFS2_MOUNT_GRPQUOTA;
 			break;
+		case Opt_coherency_buffered:
+			mopt->mount_opt |= OCFS2_MOUNT_COHERENCY_BUFFERED;
+			break;
+		case Opt_coherency_full:
+			mopt->mount_opt &= ~OCFS2_MOUNT_COHERENCY_BUFFERED;
+			break;
 		case Opt_acl:
 			mopt->mount_opt |= OCFS2_MOUNT_POSIX_ACL;
 			mopt->mount_opt &= ~OCFS2_MOUNT_NO_POSIX_ACL;
@@ -1477,6 +1515,15 @@ static int ocfs2_parse_options(struct super_block *sb,
 		}
 	}
 
+	/* Ensure only one heartbeat mode */
+	tmp = mopt->mount_opt & (OCFS2_MOUNT_HB_LOCAL | OCFS2_MOUNT_HB_GLOBAL |
+				 OCFS2_MOUNT_HB_NONE);
+	if (hweight32(tmp) != 1) {
+		mlog(ML_ERROR, "Invalid heartbeat mount options\n");
+		status = 0;
+		goto bail;
+	}
+
 	status = 1;
 
 bail:
@@ -1490,10 +1537,14 @@ static int ocfs2_show_options(struct seq_file *s, struct vfsmount *mnt)
 	unsigned long opts = osb->s_mount_opt;
 	unsigned int local_alloc_megs;
 
-	if (opts & OCFS2_MOUNT_HB_LOCAL)
-		seq_printf(s, ",_netdev,heartbeat=local");
-	else
-		seq_printf(s, ",heartbeat=none");
+	if (opts & (OCFS2_MOUNT_HB_LOCAL | OCFS2_MOUNT_HB_GLOBAL)) {
+		seq_printf(s, ",_netdev");
+		if (opts & OCFS2_MOUNT_HB_LOCAL)
+			seq_printf(s, ",%s", OCFS2_HB_LOCAL);
+		else
+			seq_printf(s, ",%s", OCFS2_HB_GLOBAL);
+	} else
+		seq_printf(s, ",%s", OCFS2_HB_NONE);
 
 	if (opts & OCFS2_MOUNT_NOINTR)
 		seq_printf(s, ",nointr");
@@ -1536,6 +1587,11 @@ static int ocfs2_show_options(struct seq_file *s, struct vfsmount *mnt)
 	if (opts & OCFS2_MOUNT_GRPQUOTA)
 		seq_printf(s, ",grpquota");
 
+	if (opts & OCFS2_MOUNT_COHERENCY_BUFFERED)
+		seq_printf(s, ",coherency=buffered");
+	else
+		seq_printf(s, ",coherency=full");
+
 	if (opts & OCFS2_MOUNT_NOUSERXATTR)
 		seq_printf(s, ",nouser_xattr");
 	else
@@ -1990,6 +2046,36 @@ static int ocfs2_setup_osb_uuid(struct ocfs2_super *osb, const unsigned char *uu
 	return 0;
 }
 
+/* Make sure entire volume is addressable by our journal.  Requires
+   osb_clusters_at_boot to be valid and for the journal to have been
+   initialized by ocfs2_journal_init(). */
+static int ocfs2_journal_addressable(struct ocfs2_super *osb)
+{
+	int status = 0;
+	u64 max_block =
+		ocfs2_clusters_to_blocks(osb->sb,
+					 osb->osb_clusters_at_boot) - 1;
+
+	/* 32-bit block number is always OK. */
+	if (max_block <= (u32)~0ULL)
+		goto out;
+
+	/* Volume is "huge", so see if our journal is new enough to
+	   support it. */
+	if (!(OCFS2_HAS_COMPAT_FEATURE(osb->sb,
+				       OCFS2_FEATURE_COMPAT_JBD2_SB) &&
+	      jbd2_journal_check_used_features(osb->journal->j_journal, 0, 0,
+					       JBD2_FEATURE_INCOMPAT_64BIT))) {
+		mlog(ML_ERROR, "The journal cannot address the entire volume. "
+		     "Enable the 'block64' journal option with tunefs.ocfs2");
+		status = -EFBIG;
+		goto out;
+	}
+
+ out:
+	return status;
+}
+
 static int ocfs2_initialize_super(struct super_block *sb,
 				  struct buffer_head *bh,
 				  int sector_size,
@@ -2002,6 +2088,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
 	struct ocfs2_journal *journal;
 	__le32 uuid_net_key;
 	struct ocfs2_super *osb;
+	u64 total_blocks;
 
 	mlog_entry_void();
 
@@ -2060,6 +2147,15 @@ static int ocfs2_initialize_super(struct super_block *sb,
 	snprintf(osb->dev_str, sizeof(osb->dev_str), "%u,%u",
 		 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
 
+	osb->max_slots = le16_to_cpu(di->id2.i_super.s_max_slots);
+	if (osb->max_slots > OCFS2_MAX_SLOTS || osb->max_slots == 0) {
+		mlog(ML_ERROR, "Invalid number of node slots (%u)\n",
+		     osb->max_slots);
+		status = -EINVAL;
+		goto bail;
+	}
+	mlog(0, "max_slots for this device: %u\n", osb->max_slots);
+
 	ocfs2_orphan_scan_init(osb);
 
 	status = ocfs2_recovery_init(osb);
@@ -2098,15 +2194,6 @@ static int ocfs2_initialize_super(struct super_block *sb,
 		goto bail;
 	}
 
-	osb->max_slots = le16_to_cpu(di->id2.i_super.s_max_slots);
-	if (osb->max_slots > OCFS2_MAX_SLOTS || osb->max_slots == 0) {
-		mlog(ML_ERROR, "Invalid number of node slots (%u)\n",
-		     osb->max_slots);
-		status = -EINVAL;
-		goto bail;
-	}
-	mlog(0, "max_slots for this device: %u\n", osb->max_slots);
-
 	osb->slot_recovery_generations =
 		kcalloc(osb->max_slots, sizeof(*osb->slot_recovery_generations),
 			GFP_KERNEL);
@@ -2149,7 +2236,9 @@ static int ocfs2_initialize_super(struct super_block *sb,
 		goto bail;
 	}
 
-	if (ocfs2_userspace_stack(osb)) {
+	if (ocfs2_clusterinfo_valid(osb)) {
+		osb->osb_stackflags =
+			OCFS2_RAW_SB(di)->s_cluster_info.ci_stackflags;
 		memcpy(osb->osb_cluster_stack,
 		       OCFS2_RAW_SB(di)->s_cluster_info.ci_stack,
 		       OCFS2_STACK_LABEL_LEN);
@@ -2214,11 +2303,15 @@ static int ocfs2_initialize_super(struct super_block *sb,
 		goto bail;
 	}
 
-	if (ocfs2_clusters_to_blocks(osb->sb, le32_to_cpu(di->i_clusters) - 1)
-	    > (u32)~0UL) {
-		mlog(ML_ERROR, "Volume might try to write to blocks beyond "
-		     "what jbd can address in 32 bits.\n");
-		status = -EINVAL;
+	total_blocks = ocfs2_clusters_to_blocks(osb->sb,
+						le32_to_cpu(di->i_clusters));
+
+	status = generic_check_addressable(osb->sb->s_blocksize_bits,
+					   total_blocks);
+	if (status) {
+		mlog(ML_ERROR, "Volume too large "
+		     "to mount safely on this system");
+		status = -EFBIG;
 		goto bail;
 	}
 
@@ -2380,6 +2473,12 @@ static int ocfs2_check_volume(struct ocfs2_super *osb)
 		goto finally;
 	}
 
+	/* Now that journal has been initialized, check to make sure
+	   entire volume is addressable. */
+	status = ocfs2_journal_addressable(osb);
+	if (status)
+		goto finally;
+
 	/* If the journal was unmounted cleanly then we don't want to
 	 * recover anything. Otherwise, journal_load will do that
 	 * dirty work for us :) */

+ 49 - 11
fs/ocfs2/sysfile.c

@@ -44,11 +44,6 @@ static struct inode * _ocfs2_get_system_file_inode(struct ocfs2_super *osb,
 						   int type,
 						   u32 slot);
 
-static inline int is_global_system_inode(int type);
-static inline int is_in_system_inode_array(struct ocfs2_super *osb,
-					   int type,
-					   u32 slot);
-
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 static struct lock_class_key ocfs2_sysfile_cluster_lock_key[NUM_SYSTEM_INODES];
 #endif
@@ -59,11 +54,52 @@ static inline int is_global_system_inode(int type)
 		type <= OCFS2_LAST_GLOBAL_SYSTEM_INODE;
 }
 
-static inline int is_in_system_inode_array(struct ocfs2_super *osb,
-					   int type,
-					   u32 slot)
+static struct inode **get_local_system_inode(struct ocfs2_super *osb,
+					     int type,
+					     u32 slot)
 {
-	return slot == osb->slot_num || is_global_system_inode(type);
+	int index;
+	struct inode **local_system_inodes, **free = NULL;
+
+	BUG_ON(slot == OCFS2_INVALID_SLOT);
+	BUG_ON(type < OCFS2_FIRST_LOCAL_SYSTEM_INODE ||
+	       type > OCFS2_LAST_LOCAL_SYSTEM_INODE);
+
+	spin_lock(&osb->osb_lock);
+	local_system_inodes = osb->local_system_inodes;
+	spin_unlock(&osb->osb_lock);
+
+	if (unlikely(!local_system_inodes)) {
+		local_system_inodes = kzalloc(sizeof(struct inode *) *
+					      NUM_LOCAL_SYSTEM_INODES *
+					      osb->max_slots,
+					      GFP_NOFS);
+		if (!local_system_inodes) {
+			mlog_errno(-ENOMEM);
+			/*
+			 * return NULL here so that ocfs2_get_sytem_file_inodes
+			 * will try to create an inode and use it. We will try
+			 * to initialize local_system_inodes next time.
+			 */
+			return NULL;
+		}
+
+		spin_lock(&osb->osb_lock);
+		if (osb->local_system_inodes) {
+			/* Someone has initialized it for us. */
+			free = local_system_inodes;
+			local_system_inodes = osb->local_system_inodes;
+		} else
+			osb->local_system_inodes = local_system_inodes;
+		spin_unlock(&osb->osb_lock);
+		if (unlikely(free))
+			kfree(free);
+	}
+
+	index = (slot * NUM_LOCAL_SYSTEM_INODES) +
+		(type - OCFS2_FIRST_LOCAL_SYSTEM_INODE);
+
+	return &local_system_inodes[index];
 }
 
 struct inode *ocfs2_get_system_file_inode(struct ocfs2_super *osb,
@@ -74,8 +110,10 @@ struct inode *ocfs2_get_system_file_inode(struct ocfs2_super *osb,
 	struct inode **arr = NULL;
 
 	/* avoid the lookup if cached in local system file array */
-	if (is_in_system_inode_array(osb, type, slot))
-		arr = &(osb->system_inodes[type]);
+	if (is_global_system_inode(type)) {
+		arr = &(osb->global_system_inodes[type]);
+	} else
+		arr = get_local_system_inode(osb, type, slot);
 
 	if (arr && ((inode = *arr) != NULL)) {
 		/* get a ref in addition to the array ref */

+ 1 - 1
fs/ocfs2/xattr.c

@@ -7081,7 +7081,7 @@ static int ocfs2_reflink_xattr_in_block(struct ocfs2_xattr_reflink *args,
 		goto out;
 	}
 
-	if (!(le16_to_cpu(xb->xb_flags) & OCFS2_XATTR_INDEXED))
+	if (!indexed)
 		ret = ocfs2_reflink_xattr_block(args, blk_bh, new_blk_bh);
 	else
 		ret = ocfs2_reflink_xattr_tree(args, blk_bh, new_blk_bh);

+ 2 - 0
include/linux/fs.h

@@ -2378,6 +2378,8 @@ extern ssize_t simple_write_to_buffer(void *to, size_t available, loff_t *ppos,
 
 extern int generic_file_fsync(struct file *, int);
 
+extern int generic_check_addressable(unsigned, u64);
+
 #ifdef CONFIG_MIGRATION
 extern int buffer_migrate_page(struct address_space *,
 				struct page *, struct page *);