6913010 assertion failed: error||lr->lr_length <= zp->z_blksz, file: ../../common/fs/zfs/zfs_vnops.c
authorNeil Perrin <Neil.Perrin@Sun.COM>
Fri, 25 Jun 2010 15:50:31 -0600
changeset 12699 36aebb51164a
parent 12698 119eaf3c5982
child 12700 0f31afc2abec
6913010 assertion failed: error||lr->lr_length <= zp->z_blksz, file: ../../common/fs/zfs/zfs_vnops.c 6620948 slow progress with stress tests 6478419 Should push all synchronous transactions in zil_commit_writer() 6916723 ZIL: Unnecessary scanning of the intent log tx chain. 6598837 zil_commit() might wait unnecessarily 6535172 zil_sync causing long hold times on zl_lock 6916703 ZIL: race on setting z_last_itx 6579989 zl_lock hold time can still be improved
usr/src/cmd/ztest/ztest.c
usr/src/uts/common/fs/zfs/dsl_pool.c
usr/src/uts/common/fs/zfs/sys/zfs_znode.h
usr/src/uts/common/fs/zfs/sys/zil.h
usr/src/uts/common/fs/zfs/sys/zil_impl.h
usr/src/uts/common/fs/zfs/zfs_log.c
usr/src/uts/common/fs/zfs/zfs_vfsops.c
usr/src/uts/common/fs/zfs/zfs_vnops.c
usr/src/uts/common/fs/zfs/zfs_znode.c
usr/src/uts/common/fs/zfs/zil.c
usr/src/uts/common/fs/zfs/zvol.c
usr/src/uts/common/io/comstar/lu/stmf_sbd/sbd_zvol.c
--- a/usr/src/cmd/ztest/ztest.c	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/cmd/ztest/ztest.c	Fri Jun 25 15:50:31 2010 -0600
@@ -1102,7 +1102,7 @@
 #define	lrz_bonustype	lr_rdev
 #define	lrz_bonuslen	lr_crtime[1]
 
-static uint64_t
+static void
 ztest_log_create(ztest_ds_t *zd, dmu_tx_t *tx, lr_create_t *lr)
 {
 	char *name = (void *)(lr + 1);		/* name follows lr */
@@ -1110,40 +1110,41 @@
 	itx_t *itx;
 
 	if (zil_replaying(zd->zd_zilog, tx))
-		return (0);
+		return;
 
 	itx = zil_itx_create(TX_CREATE, sizeof (*lr) + namesize);
 	bcopy(&lr->lr_common + 1, &itx->itx_lr + 1,
 	    sizeof (*lr) + namesize - sizeof (lr_t));
 
-	return (zil_itx_assign(zd->zd_zilog, itx, tx));
+	zil_itx_assign(zd->zd_zilog, itx, tx);
 }
 
-static uint64_t
-ztest_log_remove(ztest_ds_t *zd, dmu_tx_t *tx, lr_remove_t *lr)
+static void
+ztest_log_remove(ztest_ds_t *zd, dmu_tx_t *tx, lr_remove_t *lr, uint64_t object)
 {
 	char *name = (void *)(lr + 1);		/* name follows lr */
 	size_t namesize = strlen(name) + 1;
 	itx_t *itx;
 
 	if (zil_replaying(zd->zd_zilog, tx))
-		return (0);
+		return;
 
 	itx = zil_itx_create(TX_REMOVE, sizeof (*lr) + namesize);
 	bcopy(&lr->lr_common + 1, &itx->itx_lr + 1,
 	    sizeof (*lr) + namesize - sizeof (lr_t));
 
-	return (zil_itx_assign(zd->zd_zilog, itx, tx));
+	itx->itx_private = (void *)object;
+	zil_itx_assign(zd->zd_zilog, itx, tx);
 }
 
-static uint64_t
+static void
 ztest_log_write(ztest_ds_t *zd, dmu_tx_t *tx, lr_write_t *lr)
 {
 	itx_t *itx;
 	itx_wr_state_t write_state = ztest_random(WR_NUM_STATES);
 
 	if (zil_replaying(zd->zd_zilog, tx))
-		return (0);
+		return;
 
 	if (lr->lr_length > ZIL_MAX_LOG_DATA)
 		write_state = WR_INDIRECT;
@@ -1166,37 +1167,39 @@
 	bcopy(&lr->lr_common + 1, &itx->itx_lr + 1,
 	    sizeof (*lr) - sizeof (lr_t));
 
-	return (zil_itx_assign(zd->zd_zilog, itx, tx));
+	zil_itx_assign(zd->zd_zilog, itx, tx);
 }
 
-static uint64_t
+static void
 ztest_log_truncate(ztest_ds_t *zd, dmu_tx_t *tx, lr_truncate_t *lr)
 {
 	itx_t *itx;
 
 	if (zil_replaying(zd->zd_zilog, tx))
-		return (0);
+		return;
 
 	itx = zil_itx_create(TX_TRUNCATE, sizeof (*lr));
 	bcopy(&lr->lr_common + 1, &itx->itx_lr + 1,
 	    sizeof (*lr) - sizeof (lr_t));
 
-	return (zil_itx_assign(zd->zd_zilog, itx, tx));
+	itx->itx_sync = B_FALSE;
+	zil_itx_assign(zd->zd_zilog, itx, tx);
 }
 
-static uint64_t
+static void
 ztest_log_setattr(ztest_ds_t *zd, dmu_tx_t *tx, lr_setattr_t *lr)
 {
 	itx_t *itx;
 
 	if (zil_replaying(zd->zd_zilog, tx))
-		return (0);
+		return;
 
 	itx = zil_itx_create(TX_SETATTR, sizeof (*lr));
 	bcopy(&lr->lr_common + 1, &itx->itx_lr + 1,
 	    sizeof (*lr) - sizeof (lr_t));
 
-	return (zil_itx_assign(zd->zd_zilog, itx, tx));
+	itx->itx_sync = B_FALSE;
+	zil_itx_assign(zd->zd_zilog, itx, tx);
 }
 
 /*
@@ -1328,7 +1331,7 @@
 
 	VERIFY3U(0, ==, zap_remove(os, lr->lr_doid, name, tx));
 
-	(void) ztest_log_remove(zd, tx, lr);
+	(void) ztest_log_remove(zd, tx, lr, object);
 
 	dmu_tx_commit(tx);
 
@@ -2045,7 +2048,7 @@
 {
 	zilog_t *zilog = zd->zd_zilog;
 
-	zil_commit(zilog, UINT64_MAX, ztest_random(ZTEST_OBJECTS));
+	zil_commit(zilog, ztest_random(ZTEST_OBJECTS));
 
 	/*
 	 * Remember the committed values in zd, which is in parent/child
@@ -5266,7 +5269,7 @@
 	 */
 	while (BP_IS_HOLE(&zd->zd_zilog->zl_header->zh_log)) {
 		ztest_dmu_object_alloc_free(zd, 0);
-		zil_commit(zd->zd_zilog, UINT64_MAX, 0);
+		zil_commit(zd->zd_zilog, 0);
 	}
 
 	txg_wait_synced(spa_get_dsl(spa), 0);
@@ -5293,7 +5296,7 @@
 	/*
 	 * Commit all of the changes we just generated.
 	 */
-	zil_commit(zd->zd_zilog, UINT64_MAX, 0);
+	zil_commit(zd->zd_zilog, 0);
 	txg_wait_synced(spa_get_dsl(spa), 0);
 
 	/*
--- a/usr/src/uts/common/fs/zfs/dsl_pool.c	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/dsl_pool.c	Fri Jun 25 15:50:31 2010 -0600
@@ -451,7 +451,7 @@
 	while (ds = list_head(&dp->dp_synced_datasets)) {
 		list_remove(&dp->dp_synced_datasets, ds);
 		os = ds->ds_objset;
-		zil_clean(os->os_zil);
+		zil_clean(os->os_zil, txg);
 		ASSERT(!dmu_objset_is_dirty(os, txg));
 		dmu_buf_rele(ds->ds_dbuf, ds);
 	}
--- a/usr/src/uts/common/fs/zfs/sys/zfs_znode.h	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/zfs_znode.h	Fri Jun 25 15:50:31 2010 -0600
@@ -191,7 +191,6 @@
 	uint_t		z_blksz;	/* block size in bytes */
 	uint_t		z_seq;		/* modification sequence number */
 	uint64_t	z_mapcnt;	/* number of pages mapped to file */
-	uint64_t	z_last_itx;	/* last ZIL itx on this znode */
 	uint64_t	z_gen;		/* generation (cached) */
 	uint64_t	z_size;		/* file size (cached) */
 	uint64_t	z_atime[2];	/* atime (cached) */
@@ -321,13 +320,14 @@
 extern int zfs_log_create_txtype(zil_create_t, vsecattr_t *vsecp,
     vattr_t *vap);
 extern void zfs_log_remove(zilog_t *zilog, dmu_tx_t *tx, uint64_t txtype,
-    znode_t *dzp, char *name);
+    znode_t *dzp, char *name, uint64_t foid);
+#define	ZFS_NO_OBJECT	0	/* no object id */
 extern void zfs_log_link(zilog_t *zilog, dmu_tx_t *tx, uint64_t txtype,
     znode_t *dzp, znode_t *zp, char *name);
 extern void zfs_log_symlink(zilog_t *zilog, dmu_tx_t *tx, uint64_t txtype,
     znode_t *dzp, znode_t *zp, char *name, char *link);
 extern void zfs_log_rename(zilog_t *zilog, dmu_tx_t *tx, uint64_t txtype,
-    znode_t *sdzp, char *sname, znode_t *tdzp, char *dname, znode_t *szp);
+    znode_t *sdzp, char *sname, znode_t *tdzp, char *dname);
 extern void zfs_log_write(zilog_t *zilog, dmu_tx_t *tx, int txtype,
     znode_t *zp, offset_t off, ssize_t len, int ioflag);
 extern void zfs_log_truncate(zilog_t *zilog, dmu_tx_t *tx, int txtype,
--- a/usr/src/uts/common/fs/zfs/sys/zil.h	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/zil.h	Fri Jun 25 15:50:31 2010 -0600
@@ -169,18 +169,14 @@
 	(txtype) == TX_ACL ||		\
 	(txtype) == TX_WRITE2)
 
-
 /*
  * Format of log records.
  * The fields are carefully defined to allow them to be aligned
  * and sized the same on sparc & intel architectures.
  * Each log record has a common structure at the beginning.
  *
- * Note, lrc_seq holds two different sequence numbers. Whilst in memory
- * it contains the transaction sequence number.  The log record on
- * disk holds the sequence number of all log records which is used to
- * ensure we don't replay the same record.  The two sequence numbers are
- * different because the transactions can now be pushed out of order.
+ * The log record on disk (lrc_seq) holds the sequence number of all log
+ * records which is used to ensure we don't replay the same record.
  */
 typedef struct {			/* common log record header */
 	uint64_t	lrc_txtype;	/* intent log transaction type */
@@ -402,15 +398,15 @@
 
 extern itx_t	*zil_itx_create(uint64_t txtype, size_t lrsize);
 extern void	zil_itx_destroy(itx_t *itx);
-extern uint64_t zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx);
+extern void	zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx);
 
-extern void	zil_commit(zilog_t *zilog, uint64_t seq, uint64_t oid);
+extern void	zil_commit(zilog_t *zilog, uint64_t oid);
 
 extern int	zil_vdev_offline(const char *osname, void *txarg);
 extern int	zil_claim(const char *osname, void *txarg);
 extern int	zil_check_log_chain(const char *osname, void *txarg);
 extern void	zil_sync(zilog_t *zilog, dmu_tx_t *tx);
-extern void	zil_clean(zilog_t *zilog);
+extern void	zil_clean(zilog_t *zilog, uint64_t synced_txg);
 
 extern int	zil_suspend(zilog_t *zilog);
 extern void	zil_resume(zilog_t *zilog);
--- a/usr/src/uts/common/fs/zfs/sys/zil_impl.h	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/zil_impl.h	Fri Jun 25 15:50:31 2010 -0600
@@ -50,6 +50,28 @@
 } lwb_t;
 
 /*
+ * Intent log transaction lists
+ */
+typedef struct itxs {
+	list_t		i_sync_list;	/* list of synchronous itxs */
+	avl_tree_t	i_async_tree;	/* tree of foids for async itxs */
+} itxs_t;
+
+typedef struct itxg {
+	kmutex_t	itxg_lock;	/* lock for this structure */
+	uint64_t	itxg_txg;	/* txg for this chain */
+	uint64_t	itxg_sod;	/* total size on disk for this txg */
+	itxs_t		*itxg_itxs;	/* sync and async itxs */
+} itxg_t;
+
+/* for async nodes we build up an AVL tree of lists of async itxs per file */
+typedef struct itx_async_node {
+	uint64_t	ia_foid;	/* file object id */
+	list_t		ia_list;	/* list of async itxs for this foid */
+	avl_node_t	ia_node;	/* AVL tree linkage */
+} itx_async_node_t;
+
+/*
  * Vdev flushing: during a zil_commit(), we build up an AVL tree of the vdevs
  * we've touched so we know which ones need a write cache flush at the end.
  */
@@ -71,9 +93,7 @@
 	objset_t	*zl_os;		/* object set we're logging */
 	zil_get_data_t	*zl_get_data;	/* callback to get object content */
 	zio_t		*zl_root_zio;	/* log writer root zio */
-	uint64_t	zl_itx_seq;	/* next in-core itx sequence number */
 	uint64_t	zl_lr_seq;	/* on-disk log record sequence number */
-	uint64_t	zl_commit_seq;	/* committed upto this number */
 	uint64_t	zl_commit_lr_seq; /* last committed on-disk lr seq */
 	uint64_t	zl_destroy_txg;	/* txg of last zil_destroy() */
 	uint64_t	zl_replayed_seq[TXG_SIZE]; /* last replayed rec seq */
@@ -93,10 +113,13 @@
 	uint64_t	zl_parse_lr_seq; /* highest lr seq on last parse */
 	uint64_t	zl_parse_blk_count; /* number of blocks parsed */
 	uint64_t	zl_parse_lr_count; /* number of log records parsed */
-	list_t		zl_itx_list;	/* in-memory itx list */
+	uint64_t	zl_next_batch;	/* next batch number */
+	uint64_t	zl_com_batch;	/* committed batch number */
+	kcondvar_t	zl_cv_batch[2];	/* batch condition variables */
+	itxg_t		zl_itxg[TXG_SIZE]; /* intent log txg chains */
+	list_t		zl_itx_commit_list; /* itx list to be committed */
 	uint64_t	zl_itx_list_sz;	/* total size of records on list */
 	uint64_t	zl_cur_used;	/* current commit log size used */
-	uint64_t	zl_prev_used;	/* previous commit log size used */
 	list_t		zl_lwb_list;	/* in-flight log write list */
 	kmutex_t	zl_vdev_lock;	/* protects zl_vdev_tree */
 	avl_tree_t	zl_vdev_tree;	/* vdevs to flush in zil_commit() */
--- a/usr/src/uts/common/fs/zfs/zfs_log.c	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/zfs_log.c	Fri Jun 25 15:50:31 2010 -0600
@@ -19,8 +19,7 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
- * Use is subject to license terms.
+ * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  */
 
 #include <sys/types.h>
@@ -231,7 +230,6 @@
     zfs_fuid_info_t *fuidp, vattr_t *vap)
 {
 	itx_t *itx;
-	uint64_t seq;
 	lr_create_t *lr;
 	lr_acl_create_t *lracl;
 	size_t aclsize;
@@ -333,9 +331,7 @@
 	 */
 	bcopy(name, end, namesize);
 
-	seq = zil_itx_assign(zilog, itx, tx);
-	dzp->z_last_itx = seq;
-	zp->z_last_itx = seq;
+	zil_itx_assign(zilog, itx, tx);
 }
 
 /*
@@ -343,10 +339,9 @@
  */
 void
 zfs_log_remove(zilog_t *zilog, dmu_tx_t *tx, uint64_t txtype,
-	znode_t *dzp, char *name)
+	znode_t *dzp, char *name, uint64_t foid)
 {
 	itx_t *itx;
-	uint64_t seq;
 	lr_remove_t *lr;
 	size_t namesize = strlen(name) + 1;
 
@@ -358,8 +353,10 @@
 	lr->lr_doid = dzp->z_id;
 	bcopy(name, (char *)(lr + 1), namesize);
 
-	seq = zil_itx_assign(zilog, itx, tx);
-	dzp->z_last_itx = seq;
+	/* pass the object id through itx_private */
+	itx->itx_private = (void *)foid;
+
+	zil_itx_assign(zilog, itx, tx);
 }
 
 /*
@@ -370,7 +367,6 @@
 	znode_t *dzp, znode_t *zp, char *name)
 {
 	itx_t *itx;
-	uint64_t seq;
 	lr_link_t *lr;
 	size_t namesize = strlen(name) + 1;
 
@@ -383,9 +379,7 @@
 	lr->lr_link_obj = zp->z_id;
 	bcopy(name, (char *)(lr + 1), namesize);
 
-	seq = zil_itx_assign(zilog, itx, tx);
-	dzp->z_last_itx = seq;
-	zp->z_last_itx = seq;
+	zil_itx_assign(zilog, itx, tx);
 }
 
 /*
@@ -396,7 +390,6 @@
     znode_t *dzp, znode_t *zp, char *name, char *link)
 {
 	itx_t *itx;
-	uint64_t seq;
 	lr_create_t *lr;
 	size_t namesize = strlen(name) + 1;
 	size_t linksize = strlen(link) + 1;
@@ -418,9 +411,7 @@
 	bcopy(name, (char *)(lr + 1), namesize);
 	bcopy(link, (char *)(lr + 1) + namesize, linksize);
 
-	seq = zil_itx_assign(zilog, itx, tx);
-	dzp->z_last_itx = seq;
-	zp->z_last_itx = seq;
+	zil_itx_assign(zilog, itx, tx);
 }
 
 /*
@@ -428,10 +419,9 @@
  */
 void
 zfs_log_rename(zilog_t *zilog, dmu_tx_t *tx, uint64_t txtype,
-	znode_t *sdzp, char *sname, znode_t *tdzp, char *dname, znode_t *szp)
+	znode_t *sdzp, char *sname, znode_t *tdzp, char *dname)
 {
 	itx_t *itx;
-	uint64_t seq;
 	lr_rename_t *lr;
 	size_t snamesize = strlen(sname) + 1;
 	size_t dnamesize = strlen(dname) + 1;
@@ -446,10 +436,7 @@
 	bcopy(sname, (char *)(lr + 1), snamesize);
 	bcopy(dname, (char *)(lr + 1) + snamesize, dnamesize);
 
-	seq = zil_itx_assign(zilog, itx, tx);
-	sdzp->z_last_itx = seq;
-	tdzp->z_last_itx = seq;
-	szp->z_last_itx = seq;
+	zil_itx_assign(zilog, itx, tx);
 }
 
 /*
@@ -520,13 +507,11 @@
 
 		itx->itx_private = zp->z_zfsvfs;
 
-		if ((zp->z_sync_cnt != 0) || (fsync_cnt != 0) ||
-		    (ioflag & (FSYNC | FDSYNC)))
-			itx->itx_sync = B_TRUE;
-		else
+		if (!(ioflag & (FSYNC | FDSYNC)) && (zp->z_sync_cnt == 0) &&
+		    (fsync_cnt == 0))
 			itx->itx_sync = B_FALSE;
 
-		zp->z_last_itx = zil_itx_assign(zilog, itx, tx);
+		zil_itx_assign(zilog, itx, tx);
 
 		off += len;
 		resid -= len;
@@ -541,7 +526,6 @@
 	znode_t *zp, uint64_t off, uint64_t len)
 {
 	itx_t *itx;
-	uint64_t seq;
 	lr_truncate_t *lr;
 
 	if (zil_replaying(zilog, tx) || zp->z_unlinked)
@@ -554,8 +538,7 @@
 	lr->lr_length = len;
 
 	itx->itx_sync = (zp->z_sync_cnt != 0);
-	seq = zil_itx_assign(zilog, itx, tx);
-	zp->z_last_itx = seq;
+	zil_itx_assign(zilog, itx, tx);
 }
 
 /*
@@ -566,7 +549,6 @@
 	znode_t *zp, vattr_t *vap, uint_t mask_applied, zfs_fuid_info_t *fuidp)
 {
 	itx_t		*itx;
-	uint64_t	seq;
 	lr_setattr_t	*lr;
 	xvattr_t	*xvap = (xvattr_t *)vap;
 	size_t		recsize = sizeof (lr_setattr_t);
@@ -618,8 +600,7 @@
 		(void) zfs_log_fuid_domains(fuidp, start);
 
 	itx->itx_sync = (zp->z_sync_cnt != 0);
-	seq = zil_itx_assign(zilog, itx, tx);
-	zp->z_last_itx = seq;
+	zil_itx_assign(zilog, itx, tx);
 }
 
 /*
@@ -630,7 +611,6 @@
     vsecattr_t *vsecp, zfs_fuid_info_t *fuidp)
 {
 	itx_t *itx;
-	uint64_t seq;
 	lr_acl_v0_t *lrv0;
 	lr_acl_t *lr;
 	int txtype;
@@ -686,6 +666,5 @@
 	}
 
 	itx->itx_sync = (zp->z_sync_cnt != 0);
-	seq = zil_itx_assign(zilog, itx, tx);
-	zp->z_last_itx = seq;
+	zil_itx_assign(zilog, itx, tx);
 }
--- a/usr/src/uts/common/fs/zfs/zfs_vfsops.c	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/zfs_vfsops.c	Fri Jun 25 15:50:31 2010 -0600
@@ -166,7 +166,7 @@
 		}
 
 		if (zfsvfs->z_log != NULL)
-			zil_commit(zfsvfs->z_log, UINT64_MAX, 0);
+			zil_commit(zfsvfs->z_log, 0);
 
 		ZFS_EXIT(zfsvfs);
 	} else {
--- a/usr/src/uts/common/fs/zfs/zfs_vnops.c	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/zfs_vnops.c	Fri Jun 25 15:50:31 2010 -0600
@@ -132,7 +132,7 @@
  *  (6)	At the end of each vnode op, the DMU tx must always commit,
  *	regardless of whether there were any errors.
  *
- *  (7)	After dropping all locks, invoke zil_commit(zilog, seq, foid)
+ *  (7)	After dropping all locks, invoke zil_commit(zilog, foid)
  *	to ensure that synchronous semantics are provided when necessary.
  *
  * In general, this is how things should be ordered in each vnode op:
@@ -164,7 +164,7 @@
  *	rw_exit(...);			// drop locks
  *	zfs_dirent_unlock(dl);		// unlock directory entry
  *	VN_RELE(...);			// release held vnodes
- *	zil_commit(zilog, seq, foid);	// synchronous when necessary
+ *	zil_commit(zilog, foid);	// synchronous when necessary
  *	ZFS_EXIT(zfsvfs);		// finished in zfs
  *	return (error);			// done, report error
  */
@@ -490,7 +490,7 @@
 	 * If we're in FRSYNC mode, sync out this znode before reading it.
 	 */
 	if (ioflag & FRSYNC || zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zfsvfs->z_log, zp->z_last_itx, zp->z_id);
+		zil_commit(zfsvfs->z_log, zp->z_id);
 
 	/*
 	 * Lock the range against changes.
@@ -917,7 +917,7 @@
 
 	if (ioflag & (FSYNC | FDSYNC) ||
 	    zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, zp->z_last_itx, zp->z_id);
+		zil_commit(zilog, zp->z_id);
 
 	ZFS_EXIT(zfsvfs);
 	return (0);
@@ -1496,7 +1496,7 @@
 	}
 
 	if (zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, UINT64_MAX, 0);
+		zil_commit(zilog, 0);
 
 	ZFS_EXIT(zfsvfs);
 	return (error);
@@ -1706,7 +1706,7 @@
 	txtype = TX_REMOVE;
 	if (flags & FIGNORECASE)
 		txtype |= TX_CI;
-	zfs_log_remove(zilog, tx, txtype, dzp, name);
+	zfs_log_remove(zilog, tx, txtype, dzp, name, zp->z_id);
 
 	dmu_tx_commit(tx);
 out:
@@ -1721,7 +1721,7 @@
 		VN_RELE(ZTOV(xzp));
 
 	if (zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, UINT64_MAX, 0);
+		zil_commit(zilog, 0);
 
 	ZFS_EXIT(zfsvfs);
 	return (error);
@@ -1903,7 +1903,7 @@
 	zfs_dirent_unlock(dl);
 
 	if (zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, UINT64_MAX, 0);
+		zil_commit(zilog, 0);
 
 	ZFS_EXIT(zfsvfs);
 	return (0);
@@ -2018,7 +2018,7 @@
 		uint64_t txtype = TX_RMDIR;
 		if (flags & FIGNORECASE)
 			txtype |= TX_CI;
-		zfs_log_remove(zilog, tx, txtype, dzp, name);
+		zfs_log_remove(zilog, tx, txtype, dzp, name, ZFS_NO_OBJECT);
 	}
 
 	dmu_tx_commit(tx);
@@ -2031,7 +2031,7 @@
 	VN_RELE(vp);
 
 	if (zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, UINT64_MAX, 0);
+		zil_commit(zilog, 0);
 
 	ZFS_EXIT(zfsvfs);
 	return (error);
@@ -2350,7 +2350,7 @@
 	if (zfsvfs->z_os->os_sync != ZFS_SYNC_DISABLED) {
 		ZFS_ENTER(zfsvfs);
 		ZFS_VERIFY_ZP(zp);
-		zil_commit(zfsvfs->z_log, zp->z_last_itx, zp->z_id);
+		zil_commit(zfsvfs->z_log, zp->z_id);
 		ZFS_EXIT(zfsvfs);
 	}
 	return (0);
@@ -3165,7 +3165,7 @@
 
 out2:
 	if (zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, UINT64_MAX, 0);
+		zil_commit(zilog, 0);
 
 	ZFS_EXIT(zfsvfs);
 	return (err);
@@ -3573,8 +3573,7 @@
 			if (error == 0) {
 				zfs_log_rename(zilog, tx, TX_RENAME |
 				    (flags & FIGNORECASE ? TX_CI : 0),
-				    sdzp, sdl->dl_name, tdzp, tdl->dl_name,
-				    szp);
+				    sdzp, sdl->dl_name, tdzp, tdl->dl_name);
 
 				/*
 				 * Update path information for the target vnode
@@ -3617,7 +3616,7 @@
 		VN_RELE(ZTOV(tzp));
 
 	if (zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, UINT64_MAX, 0);
+		zil_commit(zilog, 0);
 
 	ZFS_EXIT(zfsvfs);
 	return (error);
@@ -3770,7 +3769,7 @@
 	VN_RELE(ZTOV(zp));
 
 	if (zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, UINT64_MAX, 0);
+		zil_commit(zilog, 0);
 
 	ZFS_EXIT(zfsvfs);
 	return (error);
@@ -3965,7 +3964,7 @@
 	}
 
 	if (zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, UINT64_MAX, 0);
+		zil_commit(zilog, 0);
 
 	ZFS_EXIT(zfsvfs);
 	return (error);
@@ -4202,7 +4201,7 @@
 out:
 	zfs_range_unlock(rl);
 	if ((flags & B_ASYNC) == 0 || zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zfsvfs->z_log, UINT64_MAX, zp->z_id);
+		zil_commit(zfsvfs->z_log, zp->z_id);
 	ZFS_EXIT(zfsvfs);
 	return (error);
 }
@@ -4857,7 +4856,7 @@
 	error = zfs_setacl(zp, vsecp, skipaclchk, cr);
 
 	if (zfsvfs->z_os->os_sync == ZFS_SYNC_ALWAYS)
-		zil_commit(zilog, UINT64_MAX, 0);
+		zil_commit(zilog, 0);
 
 	ZFS_EXIT(zfsvfs);
 	return (error);
--- a/usr/src/uts/common/fs/zfs/zfs_znode.c	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/zfs_znode.c	Fri Jun 25 15:50:31 2010 -0600
@@ -194,7 +194,6 @@
 	nzp->z_blksz = ozp->z_blksz;
 	nzp->z_seq = ozp->z_seq;
 	nzp->z_mapcnt = ozp->z_mapcnt;
-	nzp->z_last_itx = ozp->z_last_itx;
 	nzp->z_gen = ozp->z_gen;
 	nzp->z_sync_cnt = ozp->z_sync_cnt;
 	nzp->z_is_sa = ozp->z_is_sa;
@@ -643,7 +642,6 @@
 	zp->z_unlinked = 0;
 	zp->z_atime_dirty = 0;
 	zp->z_mapcnt = 0;
-	zp->z_last_itx = 0;
 	zp->z_id = db->db_object;
 	zp->z_blksz = blksz;
 	zp->z_seq = 0x7A4653;
--- a/usr/src/uts/common/fs/zfs/zil.c	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/zil.c	Fri Jun 25 15:50:31 2010 -0600
@@ -84,6 +84,15 @@
     sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
 
 
+/*
+ * ziltest is by and large an ugly hack, but very useful in
+ * checking replay without tedious work.
+ * When running ziltest we want to keep all itx's and so maintain
+ * a single list in the zl_itxg[] that uses a high txg: ZILTEST_TXG
+ * We subtract TXG_CONCURRENT_STATES to allow for common code.
+ */
+#define	ZILTEST_TXG (UINT64_MAX - TXG_CONCURRENT_STATES)
+
 static int
 zil_bp_compare(const void *x1, const void *x2)
 {
@@ -661,8 +670,8 @@
 static int
 zil_vdev_compare(const void *x1, const void *x2)
 {
-	uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
-	uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
+	const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
+	const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
 
 	if (v1 < v2)
 		return (-1);
@@ -1045,6 +1054,7 @@
 	itx->itx_lr.lrc_reclen = lrsize;
 	itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
 	itx->itx_lr.lrc_seq = 0;	/* defensive */
+	itx->itx_sync = B_TRUE;		/* default is synchronous */
 
 	return (itx);
 }
@@ -1055,64 +1065,177 @@
 	kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
 }
 
-uint64_t
-zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
+/*
+ * Free up the sync and async itxs. The itxs_t has already been detached
+ * so no locks are needed.
+ */
+static void
+zil_itxg_clean(itxs_t *itxs)
 {
-	uint64_t seq;
+	itx_t *itx;
+	list_t *list;
+	avl_tree_t *t;
+	void *cookie;
+	itx_async_node_t *ian;
 
-	ASSERT(itx->itx_lr.lrc_seq == 0);
-	ASSERT(!zilog->zl_replay);
+	list = &itxs->i_sync_list;
+	while ((itx = list_head(list)) != NULL) {
+		list_remove(list, itx);
+		kmem_free(itx, offsetof(itx_t, itx_lr) +
+		    itx->itx_lr.lrc_reclen);
+	}
 
-	mutex_enter(&zilog->zl_lock);
-	list_insert_tail(&zilog->zl_itx_list, itx);
-	zilog->zl_itx_list_sz += itx->itx_sod;
-	itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
-	itx->itx_lr.lrc_seq = seq = ++zilog->zl_itx_seq;
-	mutex_exit(&zilog->zl_lock);
+	cookie = NULL;
+	t = &itxs->i_async_tree;
+	while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
+		list = &ian->ia_list;
+		while ((itx = list_head(list)) != NULL) {
+			list_remove(list, itx);
+			kmem_free(itx, offsetof(itx_t, itx_lr) +
+			    itx->itx_lr.lrc_reclen);
+		}
+		list_destroy(list);
+		kmem_free(ian, sizeof (itx_async_node_t));
+	}
+	avl_destroy(t);
 
-	return (seq);
+	kmem_free(itxs, sizeof (itxs_t));
+}
+
+static int
+zil_aitx_compare(const void *x1, const void *x2)
+{
+	const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
+	const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
+
+	if (o1 < o2)
+		return (-1);
+	if (o1 > o2)
+		return (1);
+
+	return (0);
 }
 
 /*
- * Free up all in-memory intent log transactions that have now been synced.
+ * Remove all async itx with the given oid.
  */
-static void
-zil_itx_clean(zilog_t *zilog)
+void
+zil_remove_async(zilog_t *zilog, uint64_t oid)
 {
-	uint64_t synced_txg = spa_last_synced_txg(zilog->zl_spa);
-	uint64_t freeze_txg = spa_freeze_txg(zilog->zl_spa);
+	uint64_t otxg, txg;
+	itx_async_node_t *ian;
+	avl_tree_t *t;
+	avl_index_t where;
 	list_t clean_list;
 	itx_t *itx;
 
+	ASSERT(oid != 0);
 	list_create(&clean_list, sizeof (itx_t), offsetof(itx_t, itx_node));
 
-	mutex_enter(&zilog->zl_lock);
-	/* wait for a log writer to finish walking list */
-	while (zilog->zl_writer) {
-		cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
+	if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
+		otxg = ZILTEST_TXG;
+	else
+		otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
+
+	for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
+		itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
+
+		mutex_enter(&itxg->itxg_lock);
+		if (itxg->itxg_txg != txg) {
+			mutex_exit(&itxg->itxg_lock);
+			continue;
+		}
+
+		/*
+		 * Locate the object node and append its list.
+		 */
+		t = &itxg->itxg_itxs->i_async_tree;
+		ian = avl_find(t, &oid, &where);
+		if (ian != NULL)
+			list_move_tail(&clean_list, &ian->ia_list);
+		mutex_exit(&itxg->itxg_lock);
 	}
+	while ((itx = list_head(&clean_list)) != NULL) {
+		list_remove(&clean_list, itx);
+		kmem_free(itx, offsetof(itx_t, itx_lr) +
+		    itx->itx_lr.lrc_reclen);
+	}
+	list_destroy(&clean_list);
+}
+
+void
+zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
+{
+	uint64_t txg;
+	itxg_t *itxg;
+	itxs_t *itxs, *clean = NULL;
 
 	/*
-	 * Move the sync'd log transactions to a separate list so we can call
-	 * kmem_free without holding the zl_lock.
-	 *
-	 * There is no need to set zl_writer as we don't drop zl_lock here
+	 * Object ids can be re-instantiated in the same or next txg so
+	 * remove any async transactions to avoid future leaks.
+	 * This can happen if a fsync occurs on the re-instantiated
+	 * object for a WR_INDIRECT or WR_NEED_COPY write, which gets
+	 * the new file data and flushes a write record for the old object.
 	 */
-	while ((itx = list_head(&zilog->zl_itx_list)) != NULL &&
-	    itx->itx_lr.lrc_txg <= MIN(synced_txg, freeze_txg)) {
-		list_remove(&zilog->zl_itx_list, itx);
-		zilog->zl_itx_list_sz -= itx->itx_sod;
-		list_insert_tail(&clean_list, itx);
+	if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_REMOVE)
+		zil_remove_async(zilog, (uint64_t)itx->itx_private);
+
+	if (spa_freeze_txg(zilog->zl_spa) !=  UINT64_MAX)
+		txg = ZILTEST_TXG;
+	else
+		txg = dmu_tx_get_txg(tx);
+
+	itxg = &zilog->zl_itxg[txg & TXG_MASK];
+	mutex_enter(&itxg->itxg_lock);
+	itxs = itxg->itxg_itxs;
+	if (itxg->itxg_txg != txg) {
+		if (itxs != NULL) {
+			/*
+			 * The zil_clean callback hasn't got around to cleaning
+			 * this itxg. Save the itxs for release below.
+			 * This should be rare.
+			 */
+			atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
+			itxg->itxg_sod = 0;
+			clean = itxg->itxg_itxs;
+		}
+		ASSERT(itxg->itxg_sod == 0);
+		itxg->itxg_txg = txg;
+		itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t), KM_SLEEP);
+
+		list_create(&itxs->i_sync_list, sizeof (itx_t),
+		    offsetof(itx_t, itx_node));
+		avl_create(&itxs->i_async_tree, zil_aitx_compare,
+		    sizeof (itx_async_node_t),
+		    offsetof(itx_async_node_t, ia_node));
 	}
-	cv_broadcast(&zilog->zl_cv_writer);
-	mutex_exit(&zilog->zl_lock);
+	if (itx->itx_sync) {
+		list_insert_tail(&itxs->i_sync_list, itx);
+		atomic_add_64(&zilog->zl_itx_list_sz, itx->itx_sod);
+		itxg->itxg_sod += itx->itx_sod;
+	} else {
+		avl_tree_t *t = &itxs->i_async_tree;
+		uint64_t foid = ((lr_ooo_t *)&itx->itx_lr)->lr_foid;
+		itx_async_node_t *ian;
+		avl_index_t where;
 
-	/* destroy sync'd log transactions */
-	while ((itx = list_head(&clean_list)) != NULL) {
-		list_remove(&clean_list, itx);
-		zil_itx_destroy(itx);
+		ian = avl_find(t, &foid, &where);
+		if (ian == NULL) {
+			ian = kmem_alloc(sizeof (itx_async_node_t), KM_SLEEP);
+			list_create(&ian->ia_list, sizeof (itx_t),
+			    offsetof(itx_t, itx_node));
+			ian->ia_foid = foid;
+			avl_insert(t, ian, where);
+		}
+		list_insert_tail(&ian->ia_list, itx);
 	}
-	list_destroy(&clean_list);
+
+	itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
+	mutex_exit(&itxg->itxg_lock);
+
+	/* Release the old itxs now we've dropped the lock */
+	if (clean != NULL)
+		zil_itxg_clean(clean);
 }
 
 /*
@@ -1120,125 +1243,178 @@
  * synced then start up a taskq to free them.
  */
 void
-zil_clean(zilog_t *zilog)
+zil_clean(zilog_t *zilog, uint64_t synced_txg)
 {
-	itx_t *itx;
+	itxg_t *itxg = &zilog->zl_itxg[synced_txg & TXG_MASK];
+	itxs_t *clean_me;
+
+	mutex_enter(&itxg->itxg_lock);
+	if (itxg->itxg_itxs == NULL || itxg->itxg_txg == ZILTEST_TXG) {
+		mutex_exit(&itxg->itxg_lock);
+		return;
+	}
+	ASSERT3U(itxg->itxg_txg, <=, synced_txg);
+	ASSERT(itxg->itxg_txg != 0);
+	ASSERT(zilog->zl_clean_taskq != NULL);
+	atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
+	itxg->itxg_sod = 0;
+	clean_me = itxg->itxg_itxs;
+	itxg->itxg_itxs = NULL;
+	itxg->itxg_txg = 0;
+	mutex_exit(&itxg->itxg_lock);
+	/*
+	 * Preferably start a task queue to free up the old itxs but
+	 * if taskq_dispatch can't allocate resources to do that then
+	 * free it in-line. This should be rare. Note, using TQ_SLEEP
+	 * created a bad performance problem.
+	 */
+	if (taskq_dispatch(zilog->zl_clean_taskq,
+	    (void (*)(void *))zil_itxg_clean, clean_me, TQ_NOSLEEP) == NULL)
+		zil_itxg_clean(clean_me);
+}
+
+/*
+ * Get the list of itxs to commit into zl_itx_commit_list.
+ */
+void
+zil_get_commit_list(zilog_t *zilog)
+{
+	uint64_t otxg, txg;
+	list_t *commit_list = &zilog->zl_itx_commit_list;
+	uint64_t push_sod = 0;
+
+	if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
+		otxg = ZILTEST_TXG;
+	else
+		otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
+
+	for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
+		itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
+
+		mutex_enter(&itxg->itxg_lock);
+		if (itxg->itxg_txg != txg) {
+			mutex_exit(&itxg->itxg_lock);
+			continue;
+		}
 
-	mutex_enter(&zilog->zl_lock);
-	itx = list_head(&zilog->zl_itx_list);
-	if ((itx != NULL) &&
-	    (itx->itx_lr.lrc_txg <= spa_last_synced_txg(zilog->zl_spa))) {
-		(void) taskq_dispatch(zilog->zl_clean_taskq,
-		    (task_func_t *)zil_itx_clean, zilog, TQ_NOSLEEP);
+		list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
+		push_sod += itxg->itxg_sod;
+		itxg->itxg_sod = 0;
+
+		mutex_exit(&itxg->itxg_lock);
 	}
-	mutex_exit(&zilog->zl_lock);
+	atomic_add_64(&zilog->zl_itx_list_sz, -push_sod);
+}
+
+/*
+ * Move the async itxs for a specified object to commit into sync lists.
+ */
+void
+zil_async_to_sync(zilog_t *zilog, uint64_t foid)
+{
+	uint64_t otxg, txg;
+	itx_async_node_t *ian;
+	avl_tree_t *t;
+	avl_index_t where;
+
+	if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
+		otxg = ZILTEST_TXG;
+	else
+		otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
+
+	for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
+		itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
+
+		mutex_enter(&itxg->itxg_lock);
+		if (itxg->itxg_txg != txg) {
+			mutex_exit(&itxg->itxg_lock);
+			continue;
+		}
+
+		/*
+		 * If a foid is specified then find that node and append its
+		 * list. Otherwise walk the tree appending all the lists
+		 * to the sync list. We add to the end rather than the
+		 * beginning to ensure the create has happened.
+		 */
+		t = &itxg->itxg_itxs->i_async_tree;
+		if (foid != 0) {
+			ian = avl_find(t, &foid, &where);
+			if (ian != NULL) {
+				list_move_tail(&itxg->itxg_itxs->i_sync_list,
+				    &ian->ia_list);
+			}
+		} else {
+			void *cookie = NULL;
+
+			while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
+				list_move_tail(&itxg->itxg_itxs->i_sync_list,
+				    &ian->ia_list);
+				list_destroy(&ian->ia_list);
+				kmem_free(ian, sizeof (itx_async_node_t));
+			}
+		}
+		mutex_exit(&itxg->itxg_lock);
+	}
 }
 
 static void
-zil_commit_writer(zilog_t *zilog, uint64_t seq, uint64_t foid)
+zil_commit_writer(zilog_t *zilog)
 {
 	uint64_t txg;
-	uint64_t commit_seq = 0;
-	itx_t *itx, *itx_next;
+	itx_t *itx;
 	lwb_t *lwb;
-	spa_t *spa;
+	spa_t *spa = zilog->zl_spa;
 	int error = 0;
 
-	zilog->zl_writer = B_TRUE;
 	ASSERT(zilog->zl_root_zio == NULL);
-	spa = zilog->zl_spa;
+
+	mutex_exit(&zilog->zl_lock);
+
+	zil_get_commit_list(zilog);
+
+	/*
+	 * Return if there's nothing to commit before we dirty the fs by
+	 * calling zil_create().
+	 */
+	if (list_head(&zilog->zl_itx_commit_list) == NULL) {
+		mutex_enter(&zilog->zl_lock);
+		return;
+	}
 
 	if (zilog->zl_suspend) {
 		lwb = NULL;
 	} else {
 		lwb = list_tail(&zilog->zl_lwb_list);
-		if (lwb == NULL) {
-			/*
-			 * Return if there's nothing to flush before we
-			 * dirty the fs by calling zil_create()
-			 */
-			if (list_is_empty(&zilog->zl_itx_list)) {
-				zilog->zl_writer = B_FALSE;
-				return;
-			}
-			mutex_exit(&zilog->zl_lock);
+		if (lwb == NULL)
 			lwb = zil_create(zilog);
-			mutex_enter(&zilog->zl_lock);
-		}
 	}
-	ASSERT(lwb == NULL || lwb->lwb_zio == NULL);
-
-	/* Loop through in-memory log transactions filling log blocks. */
-	DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
 
-	for (itx = list_head(&zilog->zl_itx_list); itx; itx = itx_next) {
-		/*
-		 * Save the next pointer.  Even though we drop zl_lock below,
-		 * all threads that can remove itx list entries (other writers
-		 * and zil_itx_clean()) can't do so until they have zl_writer.
-		 */
-		itx_next = list_next(&zilog->zl_itx_list, itx);
-
-		/*
-		 * Determine whether to push this itx.
-		 * Push all transactions related to specified foid and
-		 * all other transactions except those that can be logged
-		 * out of order (TX_WRITE, TX_TRUNCATE, TX_SETATTR, TX_ACL)
-		 * for all other files.
-		 *
-		 * If foid == 0 (meaning "push all foids") or
-		 * itx->itx_sync is set (meaning O_[D]SYNC), push regardless.
-		 */
-		if (foid != 0 && !itx->itx_sync &&
-		    TX_OOO(itx->itx_lr.lrc_txtype) &&
-		    ((lr_ooo_t *)&itx->itx_lr)->lr_foid != foid)
-			continue; /* skip this record */
-
-		if ((itx->itx_lr.lrc_seq > seq) &&
-		    ((lwb == NULL) || (LWB_EMPTY(lwb)) ||
-		    (lwb->lwb_nused + itx->itx_sod > lwb->lwb_sz)))
-			break;
-
-		list_remove(&zilog->zl_itx_list, itx);
-		zilog->zl_itx_list_sz -= itx->itx_sod;
-
-		mutex_exit(&zilog->zl_lock);
-
+	DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
+	while (itx = list_head(&zilog->zl_itx_commit_list)) {
 		txg = itx->itx_lr.lrc_txg;
 		ASSERT(txg);
 
-		if (txg > spa_last_synced_txg(spa) ||
-		    txg > spa_freeze_txg(spa))
+		if (txg > spa_last_synced_txg(spa) || txg > spa_freeze_txg(spa))
 			lwb = zil_lwb_commit(zilog, itx, lwb);
-
-		zil_itx_destroy(itx);
-
-		mutex_enter(&zilog->zl_lock);
+		list_remove(&zilog->zl_itx_commit_list, itx);
+		kmem_free(itx, offsetof(itx_t, itx_lr)
+		    + itx->itx_lr.lrc_reclen);
 	}
 	DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
-	/* determine commit sequence number */
-	itx = list_head(&zilog->zl_itx_list);
-	if (itx)
-		commit_seq = itx->itx_lr.lrc_seq - 1;
-	else
-		commit_seq = zilog->zl_itx_seq;
-	mutex_exit(&zilog->zl_lock);
 
 	/* write the last block out */
 	if (lwb != NULL && lwb->lwb_zio != NULL)
 		lwb = zil_lwb_write_start(zilog, lwb);
 
-	zilog->zl_prev_used = zilog->zl_cur_used;
 	zilog->zl_cur_used = 0;
 
 	/*
 	 * Wait if necessary for the log blocks to be on stable storage.
 	 */
 	if (zilog->zl_root_zio) {
-		DTRACE_PROBE1(zil__cw3, zilog_t *, zilog);
 		error = zio_wait(zilog->zl_root_zio);
 		zilog->zl_root_zio = NULL;
-		DTRACE_PROBE1(zil__cw4, zilog_t *, zilog);
 		zil_flush_vdevs(zilog);
 	}
 
@@ -1246,10 +1422,6 @@
 		txg_wait_synced(zilog->zl_dmu_pool, 0);
 
 	mutex_enter(&zilog->zl_lock);
-	zilog->zl_writer = B_FALSE;
-
-	ASSERT3U(commit_seq, >=, zilog->zl_commit_seq);
-	zilog->zl_commit_seq = commit_seq;
 
 	/*
 	 * Remember the highest committed log sequence number for ztest.
@@ -1261,58 +1433,61 @@
 }
 
 /*
- * Push zfs transactions to stable storage up to the supplied sequence number.
+ * Commit zfs transactions to stable storage.
  * If foid is 0 push out all transactions, otherwise push only those
- * for that file or might have been used to create that file.
+ * for that object or might reference that object.
+ *
+ * itxs are committed in batches. In a heavily stressed zil there will be
+ * a commit writer thread who is writing out a bunch of itxs to the log
+ * for a set of committing threads (cthreads) in the same batch as the writer.
+ * Those cthreads are all waiting on the same cv for that batch.
+ *
+ * There will also be a different and growing batch of threads that are
+ * waiting to commit (qthreads). When the committing batch completes
+ * a transition occurs such that the cthreads exit and the qthreads become
+ * cthreads. One of the new cthreads becomes the writer thread for the
+ * batch. Any new threads arriving become new qthreads.
+ *
+ * Only 2 condition variables are needed and there's no transition
+ * between the two cvs needed. They just flip-flop between qthreads
+ * and cthreads.
+ *
+ * Using this scheme we can efficiently wakeup up only those threads
+ * that have been committed.
  */
 void
-zil_commit(zilog_t *zilog, uint64_t seq, uint64_t foid)
+zil_commit(zilog_t *zilog, uint64_t foid)
 {
-	if (zilog->zl_sync == ZFS_SYNC_DISABLED || seq == 0)
+	uint64_t mybatch;
+
+	if (zilog->zl_sync == ZFS_SYNC_DISABLED)
 		return;
 
+	/* move the async itxs for the foid to the sync queues */
+	zil_async_to_sync(zilog, foid);
+
 	mutex_enter(&zilog->zl_lock);
-
-	seq = MIN(seq, zilog->zl_itx_seq);	/* cap seq at largest itx seq */
-
+	mybatch = zilog->zl_next_batch;
 	while (zilog->zl_writer) {
-		cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
-		if (seq <= zilog->zl_commit_seq) {
+		cv_wait(&zilog->zl_cv_batch[mybatch & 1], &zilog->zl_lock);
+		if (mybatch <= zilog->zl_com_batch) {
 			mutex_exit(&zilog->zl_lock);
 			return;
 		}
 	}
-	zil_commit_writer(zilog, seq, foid); /* drops zl_lock */
-	/* wake up others waiting on the commit */
-	cv_broadcast(&zilog->zl_cv_writer);
-	mutex_exit(&zilog->zl_lock);
-}
-
-/*
- * Report whether all transactions are committed.
- */
-static boolean_t
-zil_is_committed(zilog_t *zilog)
-{
-	lwb_t *lwb;
-	boolean_t committed;
 
-	mutex_enter(&zilog->zl_lock);
-
-	while (zilog->zl_writer)
-		cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
+	zilog->zl_next_batch++;
+	zilog->zl_writer = B_TRUE;
+	zil_commit_writer(zilog);
+	zilog->zl_com_batch = mybatch;
+	zilog->zl_writer = B_FALSE;
+	mutex_exit(&zilog->zl_lock);
 
-	if (!list_is_empty(&zilog->zl_itx_list))
-		committed = B_FALSE;		/* unpushed transactions */
-	else if ((lwb = list_head(&zilog->zl_lwb_list)) == NULL)
-		committed = B_TRUE;		/* intent log never used */
-	else if (list_next(&zilog->zl_lwb_list, lwb) != NULL)
-		committed = B_FALSE;		/* zil_sync() not done yet */
-	else
-		committed = B_TRUE;		/* everything synced */
+	/* wake up one thread to become the next writer */
+	cv_signal(&zilog->zl_cv_batch[(mybatch+1) & 1]);
 
-	mutex_exit(&zilog->zl_lock);
-	return (committed);
+	/* wake up all threads waiting for this batch to be committed */
+	cv_broadcast(&zilog->zl_cv_batch[mybatch & 1]);
 }
 
 /*
@@ -1425,15 +1600,21 @@
 	zilog->zl_destroy_txg = TXG_INITIAL - 1;
 	zilog->zl_logbias = dmu_objset_logbias(os);
 	zilog->zl_sync = dmu_objset_syncprop(os);
+	zilog->zl_next_batch = 1;
 
 	mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
 
-	list_create(&zilog->zl_itx_list, sizeof (itx_t),
-	    offsetof(itx_t, itx_node));
+	for (int i = 0; i < TXG_SIZE; i++) {
+		mutex_init(&zilog->zl_itxg[i].itxg_lock, NULL,
+		    MUTEX_DEFAULT, NULL);
+	}
 
 	list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
 	    offsetof(lwb_t, lwb_node));
 
+	list_create(&zilog->zl_itx_commit_list, sizeof (itx_t),
+	    offsetof(itx_t, itx_node));
+
 	mutex_init(&zilog->zl_vdev_lock, NULL, MUTEX_DEFAULT, NULL);
 
 	avl_create(&zilog->zl_vdev_tree, zil_vdev_compare,
@@ -1441,6 +1622,8 @@
 
 	cv_init(&zilog->zl_cv_writer, NULL, CV_DEFAULT, NULL);
 	cv_init(&zilog->zl_cv_suspend, NULL, CV_DEFAULT, NULL);
+	cv_init(&zilog->zl_cv_batch[0], NULL, CV_DEFAULT, NULL);
+	cv_init(&zilog->zl_cv_batch[1], NULL, CV_DEFAULT, NULL);
 
 	return (zilog);
 }
@@ -1448,27 +1631,46 @@
 void
 zil_free(zilog_t *zilog)
 {
-	lwb_t *lwb;
+	lwb_t *head_lwb;
 
 	zilog->zl_stop_sync = 1;
 
-	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
-		list_remove(&zilog->zl_lwb_list, lwb);
-		if (lwb->lwb_buf != NULL)
-			zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
-		kmem_cache_free(zil_lwb_cache, lwb);
+	/*
+	 * After zil_close() there should only be one lwb with a buffer.
+	 */
+	head_lwb = list_head(&zilog->zl_lwb_list);
+	if (head_lwb) {
+		ASSERT(head_lwb == list_tail(&zilog->zl_lwb_list));
+		list_remove(&zilog->zl_lwb_list, head_lwb);
+		kmem_cache_free(zil_lwb_cache, head_lwb);
 	}
 	list_destroy(&zilog->zl_lwb_list);
 
 	avl_destroy(&zilog->zl_vdev_tree);
 	mutex_destroy(&zilog->zl_vdev_lock);
 
-	ASSERT(list_head(&zilog->zl_itx_list) == NULL);
-	list_destroy(&zilog->zl_itx_list);
+	ASSERT(list_is_empty(&zilog->zl_itx_commit_list));
+	list_destroy(&zilog->zl_itx_commit_list);
+
+	for (int i = 0; i < TXG_SIZE; i++) {
+		/*
+		 * It's possible for an itx to be generated that doesn't dirty
+		 * a txg (e.g. ztest TX_TRUNCATE). So there's no zil_clean()
+		 * callback to remove the entry. We remove those here.
+		 *
+		 * Also free up the ziltest itxs.
+		 */
+		if (zilog->zl_itxg[i].itxg_itxs)
+			zil_itxg_clean(zilog->zl_itxg[i].itxg_itxs);
+		mutex_destroy(&zilog->zl_itxg[i].itxg_lock);
+	}
+
 	mutex_destroy(&zilog->zl_lock);
 
 	cv_destroy(&zilog->zl_cv_writer);
 	cv_destroy(&zilog->zl_cv_suspend);
+	cv_destroy(&zilog->zl_cv_batch[0]);
+	cv_destroy(&zilog->zl_cv_batch[1]);
 
 	kmem_free(zilog, sizeof (zilog_t));
 }
@@ -1494,26 +1696,28 @@
 void
 zil_close(zilog_t *zilog)
 {
+	lwb_t *tail_lwb;
+	uint64_t txg = 0;
+
+	zil_commit(zilog, 0); /* commit all itx */
+
 	/*
-	 * If the log isn't already committed, mark the objset dirty
-	 * (so zil_sync() will be called) and wait for that txg to sync.
+	 * The lwb_max_txg for the stubby lwb will reflect the last activity
+	 * for the zil.  After a txg_wait_synced() on the txg we know all the
+	 * callbacks have occurred that may clean the zil.  Only then can we
+	 * destroy the zl_clean_taskq.
 	 */
-	if (!zil_is_committed(zilog)) {
-		uint64_t txg;
-		dmu_tx_t *tx = dmu_tx_create(zilog->zl_os);
-		VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
-		dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
-		txg = dmu_tx_get_txg(tx);
-		dmu_tx_commit(tx);
+	mutex_enter(&zilog->zl_lock);
+	tail_lwb = list_tail(&zilog->zl_lwb_list);
+	if (tail_lwb != NULL)
+		txg = tail_lwb->lwb_max_txg;
+	mutex_exit(&zilog->zl_lock);
+	if (txg)
 		txg_wait_synced(zilog->zl_dmu_pool, txg);
-	}
 
 	taskq_destroy(zilog->zl_clean_taskq);
 	zilog->zl_clean_taskq = NULL;
 	zilog->zl_get_data = NULL;
-
-	zil_itx_clean(zilog);
-	ASSERT(list_head(&zilog->zl_itx_list) == NULL);
 }
 
 /*
@@ -1545,15 +1749,7 @@
 	zilog->zl_suspending = B_TRUE;
 	mutex_exit(&zilog->zl_lock);
 
-	zil_commit(zilog, UINT64_MAX, 0);
-
-	/*
-	 * Wait for any in-flight log writes to complete.
-	 */
-	mutex_enter(&zilog->zl_lock);
-	while (zilog->zl_writer)
-		cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
-	mutex_exit(&zilog->zl_lock);
+	zil_commit(zilog, 0);
 
 	zil_destroy(zilog, B_FALSE);
 
--- a/usr/src/uts/common/fs/zfs/zvol.c	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/fs/zfs/zvol.c	Fri Jun 25 15:50:31 2010 -0600
@@ -1028,7 +1028,7 @@
 		itx->itx_private = zv;
 		itx->itx_sync = sync;
 
-		(void) zil_itx_assign(zilog, itx, tx);
+		zil_itx_assign(zilog, itx, tx);
 
 		off += len;
 		resid -= len;
@@ -1221,7 +1221,7 @@
 		bioerror(bp, off > volsize ? EINVAL : error);
 
 	if (sync)
-		zil_commit(zv->zv_zilog, UINT64_MAX, ZVOL_OBJ);
+		zil_commit(zv->zv_zilog, ZVOL_OBJ);
 	biodone(bp);
 
 	return (0);
@@ -1375,7 +1375,7 @@
 	}
 	zfs_range_unlock(rl);
 	if (sync)
-		zil_commit(zv->zv_zilog, UINT64_MAX, ZVOL_OBJ);
+		zil_commit(zv->zv_zilog, ZVOL_OBJ);
 	return (error);
 }
 
@@ -1569,7 +1569,7 @@
 	case DKIOCFLUSHWRITECACHE:
 		dkc = (struct dk_callback *)arg;
 		mutex_exit(&zfsdev_state_lock);
-		zil_commit(zv->zv_zilog, UINT64_MAX, ZVOL_OBJ);
+		zil_commit(zv->zv_zilog, ZVOL_OBJ);
 		if ((flag & FKIOCTL) && dkc != NULL && dkc->dkc_callback) {
 			(*dkc->dkc_callback)(dkc->dkc_cookie, error);
 			error = 0;
@@ -1598,7 +1598,7 @@
 			} else {
 				zv->zv_flags &= ~ZVOL_WCE;
 				mutex_exit(&zfsdev_state_lock);
-				zil_commit(zv->zv_zilog, UINT64_MAX, ZVOL_OBJ);
+				zil_commit(zv->zv_zilog, ZVOL_OBJ);
 			}
 			return (0);
 		}
--- a/usr/src/uts/common/io/comstar/lu/stmf_sbd/sbd_zvol.c	Fri Jun 25 17:30:31 2010 -0400
+++ b/usr/src/uts/common/io/comstar/lu/stmf_sbd/sbd_zvol.c	Fri Jun 25 15:50:31 2010 -0600
@@ -378,7 +378,7 @@
 	    sizeof (arc_buf_t *) * dbuf->db_sglist_length);
 	zvio->zvio_abp = NULL;
 	if (sync && (flags & ZVIO_COMMIT))
-		zil_commit(sl->sl_zvol_zil_hdl, UINT64_MAX, ZVOL_OBJ);
+		zil_commit(sl->sl_zvol_zil_hdl, ZVOL_OBJ);
 	return (0);
 }
 
@@ -454,7 +454,7 @@
 	}
 	zfs_range_unlock(rl);
 	if (sync && (flags & ZVIO_COMMIT))
-		zil_commit(sl->sl_zvol_zil_hdl, UINT64_MAX, ZVOL_OBJ);
+		zil_commit(sl->sl_zvol_zil_hdl, ZVOL_OBJ);
 	if (error == ECKSUM)
 		error = EIO;
 	return (error);