usr/src/uts/common/fs/zfs/zil.c
changeset 10922 e2081f502306
parent 10921 8aac17999e4d
child 11066 cebb50cbe4f9
equal deleted inserted replaced
10921:8aac17999e4d 10922:e2081f502306
    23  * Use is subject to license terms.
    23  * Use is subject to license terms.
    24  */
    24  */
    25 
    25 
    26 #include <sys/zfs_context.h>
    26 #include <sys/zfs_context.h>
    27 #include <sys/spa.h>
    27 #include <sys/spa.h>
    28 #include <sys/spa_impl.h>
       
    29 #include <sys/dmu.h>
    28 #include <sys/dmu.h>
    30 #include <sys/zap.h>
    29 #include <sys/zap.h>
    31 #include <sys/arc.h>
    30 #include <sys/arc.h>
    32 #include <sys/stat.h>
    31 #include <sys/stat.h>
    33 #include <sys/resource.h>
    32 #include <sys/resource.h>
    78 static kmem_cache_t *zil_lwb_cache;
    77 static kmem_cache_t *zil_lwb_cache;
    79 
    78 
    80 static boolean_t zil_empty(zilog_t *zilog);
    79 static boolean_t zil_empty(zilog_t *zilog);
    81 
    80 
    82 static int
    81 static int
    83 zil_dva_compare(const void *x1, const void *x2)
    82 zil_bp_compare(const void *x1, const void *x2)
    84 {
    83 {
    85 	const dva_t *dva1 = x1;
    84 	const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
    86 	const dva_t *dva2 = x2;
    85 	const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
    87 
    86 
    88 	if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
    87 	if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
    89 		return (-1);
    88 		return (-1);
    90 	if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
    89 	if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
    91 		return (1);
    90 		return (1);
    97 
    96 
    98 	return (0);
    97 	return (0);
    99 }
    98 }
   100 
    99 
   101 static void
   100 static void
   102 zil_dva_tree_init(avl_tree_t *t)
   101 zil_bp_tree_init(zilog_t *zilog)
   103 {
   102 {
   104 	avl_create(t, zil_dva_compare, sizeof (zil_dva_node_t),
   103 	avl_create(&zilog->zl_bp_tree, zil_bp_compare,
   105 	    offsetof(zil_dva_node_t, zn_node));
   104 	    sizeof (zil_bp_node_t), offsetof(zil_bp_node_t, zn_node));
   106 }
   105 }
   107 
   106 
   108 static void
   107 static void
   109 zil_dva_tree_fini(avl_tree_t *t)
   108 zil_bp_tree_fini(zilog_t *zilog)
   110 {
   109 {
   111 	zil_dva_node_t *zn;
   110 	avl_tree_t *t = &zilog->zl_bp_tree;
       
   111 	zil_bp_node_t *zn;
   112 	void *cookie = NULL;
   112 	void *cookie = NULL;
   113 
   113 
   114 	while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
   114 	while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
   115 		kmem_free(zn, sizeof (zil_dva_node_t));
   115 		kmem_free(zn, sizeof (zil_bp_node_t));
   116 
   116 
   117 	avl_destroy(t);
   117 	avl_destroy(t);
   118 }
   118 }
   119 
   119 
   120 static int
   120 int
   121 zil_dva_tree_add(avl_tree_t *t, dva_t *dva)
   121 zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
   122 {
   122 {
   123 	zil_dva_node_t *zn;
   123 	avl_tree_t *t = &zilog->zl_bp_tree;
       
   124 	const dva_t *dva = BP_IDENTITY(bp);
       
   125 	zil_bp_node_t *zn;
   124 	avl_index_t where;
   126 	avl_index_t where;
   125 
   127 
   126 	if (avl_find(t, dva, &where) != NULL)
   128 	if (avl_find(t, dva, &where) != NULL)
   127 		return (EEXIST);
   129 		return (EEXIST);
   128 
   130 
   129 	zn = kmem_alloc(sizeof (zil_dva_node_t), KM_SLEEP);
   131 	zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
   130 	zn->zn_dva = *dva;
   132 	zn->zn_dva = *dva;
   131 	avl_insert(t, zn, where);
   133 	avl_insert(t, zn, where);
   132 
   134 
   133 	return (0);
   135 	return (0);
   134 }
   136 }
   149 	zc->zc_word[ZIL_ZC_OBJSET] = dmu_objset_id(zilog->zl_os);
   151 	zc->zc_word[ZIL_ZC_OBJSET] = dmu_objset_id(zilog->zl_os);
   150 	zc->zc_word[ZIL_ZC_SEQ] = 1ULL;
   152 	zc->zc_word[ZIL_ZC_SEQ] = 1ULL;
   151 }
   153 }
   152 
   154 
   153 /*
   155 /*
   154  * Read a log block, make sure it's valid, and byteswap it if necessary.
   156  * Read a log block and make sure it's valid.
   155  */
   157  */
   156 static int
   158 static int
   157 zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, arc_buf_t **abufpp)
   159 zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst)
   158 {
   160 {
   159 	blkptr_t blk = *bp;
   161 	enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
       
   162 	uint32_t aflags = ARC_WAIT;
       
   163 	arc_buf_t *abuf = NULL;
   160 	zbookmark_t zb;
   164 	zbookmark_t zb;
   161 	uint32_t aflags = ARC_WAIT;
       
   162 	int error;
   165 	int error;
   163 
   166 
   164 	zb.zb_objset = bp->blk_cksum.zc_word[ZIL_ZC_OBJSET];
   167 	if (zilog->zl_header->zh_claim_txg == 0)
   165 	zb.zb_object = 0;
   168 		zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
   166 	zb.zb_level = -1;
   169 
   167 	zb.zb_blkid = bp->blk_cksum.zc_word[ZIL_ZC_SEQ];
   170 	if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
   168 
   171 		zio_flags |= ZIO_FLAG_SPECULATIVE;
   169 	*abufpp = NULL;
   172 
   170 
   173 	SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
   171 	/*
   174 	    ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
   172 	 * We shouldn't be doing any scrubbing while we're doing log
   175 
   173 	 * replay, it's OK to not lock.
   176 	error = arc_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
   174 	 */
   177 	    ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
   175 	error = arc_read_nolock(NULL, zilog->zl_spa, &blk,
       
   176 	    arc_getbuf_func, abufpp, ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL |
       
   177 	    ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB, &aflags, &zb);
       
   178 
   178 
   179 	if (error == 0) {
   179 	if (error == 0) {
   180 		char *data = (*abufpp)->b_data;
   180 		char *data = abuf->b_data;
   181 		uint64_t blksz = BP_GET_LSIZE(bp);
   181 		uint64_t size = BP_GET_LSIZE(bp);
   182 		zil_trailer_t *ztp = (zil_trailer_t *)(data + blksz) - 1;
   182 		zil_trailer_t *ztp = (zil_trailer_t *)(data + size) - 1;
   183 		zio_cksum_t cksum = bp->blk_cksum;
   183 		zio_cksum_t cksum = bp->blk_cksum;
       
   184 
       
   185 		bcopy(data, dst, size);
       
   186 		*nbp = ztp->zit_next_blk;
   184 
   187 
   185 		/*
   188 		/*
   186 		 * Validate the checksummed log block.
   189 		 * Validate the checksummed log block.
   187 		 *
   190 		 *
   188 		 * Sequence numbers should be... sequential.  The checksum
   191 		 * Sequence numbers should be... sequential.  The checksum
   192 		 */
   195 		 */
   193 		cksum.zc_word[ZIL_ZC_SEQ]++;
   196 		cksum.zc_word[ZIL_ZC_SEQ]++;
   194 
   197 
   195 		if (bcmp(&cksum, &ztp->zit_next_blk.blk_cksum,
   198 		if (bcmp(&cksum, &ztp->zit_next_blk.blk_cksum,
   196 		    sizeof (cksum)) || BP_IS_HOLE(&ztp->zit_next_blk) ||
   199 		    sizeof (cksum)) || BP_IS_HOLE(&ztp->zit_next_blk) ||
   197 		    (ztp->zit_nused > (blksz - sizeof (zil_trailer_t)))) {
   200 		    (ztp->zit_nused > (size - sizeof (zil_trailer_t))))
   198 			error = ECKSUM;
   201 			error = ECKSUM;
   199 		}
   202 
   200 
   203 		VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
   201 		if (error) {
   204 	}
   202 			VERIFY(arc_buf_remove_ref(*abufpp, abufpp) == 1);
       
   203 			*abufpp = NULL;
       
   204 		}
       
   205 	}
       
   206 
       
   207 	dprintf("error %d on %llu:%llu\n", error, zb.zb_objset, zb.zb_blkid);
       
   208 
   205 
   209 	return (error);
   206 	return (error);
   210 }
   207 }
   211 
   208 
   212 /*
   209 /*
       
   210  * Read a TX_WRITE log data block.
       
   211  */
       
   212 static int
       
   213 zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
       
   214 {
       
   215 	enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
       
   216 	const blkptr_t *bp = &lr->lr_blkptr;
       
   217 	uint32_t aflags = ARC_WAIT;
       
   218 	arc_buf_t *abuf = NULL;
       
   219 	zbookmark_t zb;
       
   220 	int error;
       
   221 
       
   222 	if (BP_IS_HOLE(bp)) {
       
   223 		if (wbuf != NULL)
       
   224 			bzero(wbuf, MAX(BP_GET_LSIZE(bp), lr->lr_length));
       
   225 		return (0);
       
   226 	}
       
   227 
       
   228 	if (zilog->zl_header->zh_claim_txg == 0)
       
   229 		zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
       
   230 
       
   231 	SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
       
   232 	    ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
       
   233 
       
   234 	error = arc_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
       
   235 	    ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
       
   236 
       
   237 	if (error == 0) {
       
   238 		if (wbuf != NULL)
       
   239 			bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
       
   240 		(void) arc_buf_remove_ref(abuf, &abuf);
       
   241 	}
       
   242 
       
   243 	return (error);
       
   244 }
       
   245 
       
   246 /*
   213  * Parse the intent log, and call parse_func for each valid record within.
   247  * Parse the intent log, and call parse_func for each valid record within.
   214  * Return the highest sequence number.
   248  */
   215  */
   249 int
   216 uint64_t
       
   217 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
   250 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
   218     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
   251     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
   219 {
   252 {
   220 	const zil_header_t *zh = zilog->zl_header;
   253 	const zil_header_t *zh = zilog->zl_header;
   221 	uint64_t claim_seq = zh->zh_claim_seq;
   254 	boolean_t claimed = !!zh->zh_claim_txg;
   222 	uint64_t seq = 0;
   255 	uint64_t claim_blk_seq = claimed ? zh->zh_claim_blk_seq : UINT64_MAX;
   223 	uint64_t max_seq = 0;
   256 	uint64_t claim_lr_seq = claimed ? zh->zh_claim_lr_seq : UINT64_MAX;
   224 	blkptr_t blk = zh->zh_log;
   257 	uint64_t max_blk_seq = 0;
   225 	arc_buf_t *abuf;
   258 	uint64_t max_lr_seq = 0;
       
   259 	uint64_t blk_count = 0;
       
   260 	uint64_t lr_count = 0;
       
   261 	blkptr_t blk, next_blk;
   226 	char *lrbuf, *lrp;
   262 	char *lrbuf, *lrp;
   227 	zil_trailer_t *ztp;
   263 	int error = 0;
   228 	int reclen, error;
   264 
   229 
   265 	/*
   230 	if (BP_IS_HOLE(&blk))
   266 	 * Old logs didn't record the maximum zh_claim_lr_seq.
   231 		return (max_seq);
   267 	 */
       
   268 	if (!(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
       
   269 		claim_lr_seq = UINT64_MAX;
   232 
   270 
   233 	/*
   271 	/*
   234 	 * Starting at the block pointed to by zh_log we read the log chain.
   272 	 * Starting at the block pointed to by zh_log we read the log chain.
   235 	 * For each block in the chain we strongly check that block to
   273 	 * For each block in the chain we strongly check that block to
   236 	 * ensure its validity.  We stop when an invalid block is found.
   274 	 * ensure its validity.  We stop when an invalid block is found.
   237 	 * For each block pointer in the chain we call parse_blk_func().
   275 	 * For each block pointer in the chain we call parse_blk_func().
   238 	 * For each record in each valid block we call parse_lr_func().
   276 	 * For each record in each valid block we call parse_lr_func().
   239 	 * If the log has been claimed, stop if we encounter a sequence
   277 	 * If the log has been claimed, stop if we encounter a sequence
   240 	 * number greater than the highest claimed sequence number.
   278 	 * number greater than the highest claimed sequence number.
   241 	 */
   279 	 */
   242 	zil_dva_tree_init(&zilog->zl_dva_tree);
   280 	lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
   243 	for (;;) {
   281 	zil_bp_tree_init(zilog);
   244 		seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
   282 
   245 
   283 	for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
   246 		if (claim_seq != 0 && seq > claim_seq)
   284 		zil_trailer_t *ztp =
       
   285 		    (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
       
   286 		uint64_t blk_seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
       
   287 		int reclen;
       
   288 
       
   289 		if (blk_seq > claim_blk_seq)
   247 			break;
   290 			break;
   248 
   291 		if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
   249 		ASSERT(max_seq < seq);
   292 			break;
   250 		max_seq = seq;
   293 		ASSERT(max_blk_seq < blk_seq);
   251 
   294 		max_blk_seq = blk_seq;
   252 		error = zil_read_log_block(zilog, &blk, &abuf);
   295 		blk_count++;
   253 
   296 
   254 		if (parse_blk_func != NULL)
   297 		if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
   255 			parse_blk_func(zilog, &blk, arg, txg);
   298 			break;
   256 
   299 
       
   300 		error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf);
   257 		if (error)
   301 		if (error)
   258 			break;
   302 			break;
   259 
       
   260 		lrbuf = abuf->b_data;
       
   261 		ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
       
   262 		blk = ztp->zit_next_blk;
       
   263 
       
   264 		if (parse_lr_func == NULL) {
       
   265 			VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
       
   266 			continue;
       
   267 		}
       
   268 
   303 
   269 		for (lrp = lrbuf; lrp < lrbuf + ztp->zit_nused; lrp += reclen) {
   304 		for (lrp = lrbuf; lrp < lrbuf + ztp->zit_nused; lrp += reclen) {
   270 			lr_t *lr = (lr_t *)lrp;
   305 			lr_t *lr = (lr_t *)lrp;
   271 			reclen = lr->lrc_reclen;
   306 			reclen = lr->lrc_reclen;
   272 			ASSERT3U(reclen, >=, sizeof (lr_t));
   307 			ASSERT3U(reclen, >=, sizeof (lr_t));
   273 			parse_lr_func(zilog, lr, arg, txg);
   308 			if (lr->lrc_seq > claim_lr_seq)
       
   309 				goto done;
       
   310 			if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
       
   311 				goto done;
       
   312 			ASSERT(max_lr_seq < lr->lrc_seq);
       
   313 			max_lr_seq = lr->lrc_seq;
       
   314 			lr_count++;
   274 		}
   315 		}
   275 		VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
   316 	}
   276 	}
   317 done:
   277 	zil_dva_tree_fini(&zilog->zl_dva_tree);
   318 	zilog->zl_parse_error = error;
   278 
   319 	zilog->zl_parse_blk_seq = max_blk_seq;
   279 	return (max_seq);
   320 	zilog->zl_parse_lr_seq = max_lr_seq;
       
   321 	zilog->zl_parse_blk_count = blk_count;
       
   322 	zilog->zl_parse_lr_count = lr_count;
       
   323 
       
   324 	ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
       
   325 	    (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
       
   326 
       
   327 	zil_bp_tree_fini(zilog);
       
   328 	zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
       
   329 
       
   330 	return (error);
       
   331 }
       
   332 
       
   333 static int
       
   334 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
       
   335 {
       
   336 	/*
       
   337 	 * Claim log block if not already committed and not already claimed.
       
   338 	 * If tx == NULL, just verify that the block is claimable.
       
   339 	 */
       
   340 	if (bp->blk_birth < first_txg || zil_bp_tree_add(zilog, bp) != 0)
       
   341 		return (0);
       
   342 
       
   343 	return (zio_wait(zio_claim(NULL, zilog->zl_spa,
       
   344 	    tx == NULL ? 0 : first_txg, bp, spa_claim_notify, NULL,
       
   345 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB)));
       
   346 }
       
   347 
       
   348 static int
       
   349 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
       
   350 {
       
   351 	lr_write_t *lr = (lr_write_t *)lrc;
       
   352 	int error;
       
   353 
       
   354 	if (lrc->lrc_txtype != TX_WRITE)
       
   355 		return (0);
       
   356 
       
   357 	/*
       
   358 	 * If the block is not readable, don't claim it.  This can happen
       
   359 	 * in normal operation when a log block is written to disk before
       
   360 	 * some of the dmu_sync() blocks it points to.  In this case, the
       
   361 	 * transaction cannot have been committed to anyone (we would have
       
   362 	 * waited for all writes to be stable first), so it is semantically
       
   363 	 * correct to declare this the end of the log.
       
   364 	 */
       
   365 	if (lr->lr_blkptr.blk_birth >= first_txg &&
       
   366 	    (error = zil_read_log_data(zilog, lr, NULL)) != 0)
       
   367 		return (error);
       
   368 
       
   369 	return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
   280 }
   370 }
   281 
   371 
   282 /* ARGSUSED */
   372 /* ARGSUSED */
   283 static void
   373 static int
   284 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
       
   285 {
       
   286 	spa_t *spa = zilog->zl_spa;
       
   287 	int err;
       
   288 
       
   289 	/*
       
   290 	 * Claim log block if not already committed and not already claimed.
       
   291 	 */
       
   292 	if (bp->blk_birth >= first_txg &&
       
   293 	    zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp)) == 0) {
       
   294 		err = zio_wait(zio_claim(NULL, spa, first_txg, bp, NULL, NULL,
       
   295 		    ZIO_FLAG_MUSTSUCCEED));
       
   296 		ASSERT(err == 0);
       
   297 	}
       
   298 }
       
   299 
       
   300 static void
       
   301 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
       
   302 {
       
   303 	if (lrc->lrc_txtype == TX_WRITE) {
       
   304 		lr_write_t *lr = (lr_write_t *)lrc;
       
   305 		zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg);
       
   306 	}
       
   307 }
       
   308 
       
   309 /* ARGSUSED */
       
   310 static void
       
   311 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
   374 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
   312 {
   375 {
   313 	zio_free_blk(zilog->zl_spa, bp, dmu_tx_get_txg(tx));
   376 	zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
   314 }
   377 
   315 
   378 	return (0);
   316 static void
   379 }
       
   380 
       
   381 static int
   317 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
   382 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
   318 {
   383 {
       
   384 	lr_write_t *lr = (lr_write_t *)lrc;
       
   385 	blkptr_t *bp = &lr->lr_blkptr;
       
   386 
   319 	/*
   387 	/*
   320 	 * If we previously claimed it, we need to free it.
   388 	 * If we previously claimed it, we need to free it.
   321 	 */
   389 	 */
   322 	if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE) {
   390 	if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
   323 		lr_write_t *lr = (lr_write_t *)lrc;
   391 	    bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0)
   324 		blkptr_t *bp = &lr->lr_blkptr;
   392 		zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
   325 		if (bp->blk_birth >= claim_txg &&
   393 
   326 		    !zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp))) {
   394 	return (0);
   327 			(void) arc_free(NULL, zilog->zl_spa,
       
   328 			    dmu_tx_get_txg(tx), bp, NULL, NULL, ARC_WAIT);
       
   329 		}
       
   330 	}
       
   331 }
   395 }
   332 
   396 
   333 /*
   397 /*
   334  * Create an on-disk intent log.
   398  * Create an on-disk intent log.
   335  */
   399  */
   357 	 * If we don't already have an initial log block or we have one
   421 	 * If we don't already have an initial log block or we have one
   358 	 * but it's the wrong endianness then allocate one.
   422 	 * but it's the wrong endianness then allocate one.
   359 	 */
   423 	 */
   360 	if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
   424 	if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
   361 		tx = dmu_tx_create(zilog->zl_os);
   425 		tx = dmu_tx_create(zilog->zl_os);
   362 		(void) dmu_tx_assign(tx, TXG_WAIT);
   426 		VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
   363 		dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
   427 		dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
   364 		txg = dmu_tx_get_txg(tx);
   428 		txg = dmu_tx_get_txg(tx);
   365 
   429 
   366 		if (!BP_IS_HOLE(&blk)) {
   430 		if (!BP_IS_HOLE(&blk)) {
   367 			zio_free_blk(zilog->zl_spa, &blk, txg);
   431 			zio_free_zil(zilog->zl_spa, txg, &blk);
   368 			BP_ZERO(&blk);
   432 			BP_ZERO(&blk);
   369 		}
   433 		}
   370 
   434 
   371 		error = zio_alloc_blk(zilog->zl_spa, ZIL_MIN_BLKSZ, &blk,
   435 		error = zio_alloc_zil(zilog->zl_spa, txg, &blk, NULL,
   372 		    NULL, txg, zilog->zl_logbias == ZFS_LOGBIAS_LATENCY);
   436 		    ZIL_MIN_BLKSZ, zilog->zl_logbias == ZFS_LOGBIAS_LATENCY);
   373 
   437 
   374 		if (error == 0)
   438 		if (error == 0)
   375 			zil_init_log_chain(zilog, &blk);
   439 			zil_init_log_chain(zilog, &blk);
   376 	}
   440 	}
   377 
   441 
   385 		lwb->lwb_nused = 0;
   449 		lwb->lwb_nused = 0;
   386 		lwb->lwb_sz = BP_GET_LSIZE(&lwb->lwb_blk);
   450 		lwb->lwb_sz = BP_GET_LSIZE(&lwb->lwb_blk);
   387 		lwb->lwb_buf = zio_buf_alloc(lwb->lwb_sz);
   451 		lwb->lwb_buf = zio_buf_alloc(lwb->lwb_sz);
   388 		lwb->lwb_max_txg = txg;
   452 		lwb->lwb_max_txg = txg;
   389 		lwb->lwb_zio = NULL;
   453 		lwb->lwb_zio = NULL;
       
   454 		lwb->lwb_tx = NULL;
   390 
   455 
   391 		mutex_enter(&zilog->zl_lock);
   456 		mutex_enter(&zilog->zl_lock);
   392 		list_insert_tail(&zilog->zl_lwb_list, lwb);
   457 		list_insert_tail(&zilog->zl_lwb_list, lwb);
   393 		mutex_exit(&zilog->zl_lock);
   458 		mutex_exit(&zilog->zl_lock);
   394 	}
   459 	}
   426 	/*
   491 	/*
   427 	 * Wait for any previous destroy to complete.
   492 	 * Wait for any previous destroy to complete.
   428 	 */
   493 	 */
   429 	txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
   494 	txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
   430 
   495 
       
   496 	zilog->zl_old_header = *zh;		/* debugging aid */
       
   497 
   431 	if (BP_IS_HOLE(&zh->zh_log))
   498 	if (BP_IS_HOLE(&zh->zh_log))
   432 		return;
   499 		return;
   433 
   500 
   434 	tx = dmu_tx_create(zilog->zl_os);
   501 	tx = dmu_tx_create(zilog->zl_os);
   435 	(void) dmu_tx_assign(tx, TXG_WAIT);
   502 	VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
   436 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
   503 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
   437 	txg = dmu_tx_get_txg(tx);
   504 	txg = dmu_tx_get_txg(tx);
   438 
   505 
   439 	mutex_enter(&zilog->zl_lock);
   506 	mutex_enter(&zilog->zl_lock);
   440 
   507 
   441 	ASSERT3U(zilog->zl_destroy_txg, <, txg);
   508 	ASSERT3U(zilog->zl_destroy_txg, <, txg);
   442 	zilog->zl_destroy_txg = txg;
   509 	zilog->zl_destroy_txg = txg;
       
   510 	zilog->zl_keep_first = keep_first;
   443 
   511 
   444 	if (!list_is_empty(&zilog->zl_lwb_list)) {
   512 	if (!list_is_empty(&zilog->zl_lwb_list)) {
   445 		ASSERT(zh->zh_claim_txg == 0);
   513 		ASSERT(zh->zh_claim_txg == 0);
   446 		zilog->zl_keep_first = B_FALSE;
   514 		ASSERT(!keep_first);
   447 		while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
   515 		while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
   448 			list_remove(&zilog->zl_lwb_list, lwb);
   516 			list_remove(&zilog->zl_lwb_list, lwb);
   449 			if (lwb->lwb_buf != NULL)
   517 			if (lwb->lwb_buf != NULL)
   450 				zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
   518 				zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
   451 			zio_free_blk(zilog->zl_spa, &lwb->lwb_blk, txg);
   519 			zio_free_zil(zilog->zl_spa, txg, &lwb->lwb_blk);
   452 			kmem_cache_free(zil_lwb_cache, lwb);
   520 			kmem_cache_free(zil_lwb_cache, lwb);
   453 		}
   521 		}
   454 	} else {
   522 	} else if (!keep_first) {
   455 		zilog->zl_keep_first = keep_first;
   523 		(void) zil_parse(zilog, zil_free_log_block,
   456 		if (zh->zh_flags & ZIL_REPLAY_NEEDED) {
   524 		    zil_free_log_record, tx, zh->zh_claim_txg);
   457 			ASSERT(!keep_first);
       
   458 			(void) zil_parse(zilog, zil_free_log_block,
       
   459 			    zil_free_log_record, tx, zh->zh_claim_txg);
       
   460 		} else {
       
   461 			/*
       
   462 			 * Would like to assert zil_empty() but that
       
   463 			 * would force us to read the log chain which
       
   464 			 * requires us to do I/O to the log. This is
       
   465 			 * overkill since we really just want to destroy
       
   466 			 * the chain anyway.
       
   467 			 */
       
   468 			if (!keep_first) {
       
   469 				blkptr_t bp = zh->zh_log;
       
   470 				zio_free_blk(zilog->zl_spa, &bp, txg);
       
   471 			}
       
   472 		}
       
   473 	}
   525 	}
   474 	mutex_exit(&zilog->zl_lock);
   526 	mutex_exit(&zilog->zl_lock);
   475 
   527 
   476 	dmu_tx_commit(tx);
   528 	dmu_tx_commit(tx);
   477 }
       
   478 
       
   479 /*
       
   480  * return true if the initial log block is not valid
       
   481  */
       
   482 static boolean_t
       
   483 zil_empty(zilog_t *zilog)
       
   484 {
       
   485 	const zil_header_t *zh = zilog->zl_header;
       
   486 	arc_buf_t *abuf = NULL;
       
   487 
       
   488 	if (BP_IS_HOLE(&zh->zh_log))
       
   489 		return (B_TRUE);
       
   490 
       
   491 	if (zil_read_log_block(zilog, &zh->zh_log, &abuf) != 0)
       
   492 		return (B_TRUE);
       
   493 
       
   494 	VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
       
   495 	return (B_FALSE);
       
   496 }
   529 }
   497 
   530 
   498 int
   531 int
   499 zil_claim(char *osname, void *txarg)
   532 zil_claim(char *osname, void *txarg)
   500 {
   533 {
   512 	}
   545 	}
   513 
   546 
   514 	zilog = dmu_objset_zil(os);
   547 	zilog = dmu_objset_zil(os);
   515 	zh = zil_header_in_syncing_context(zilog);
   548 	zh = zil_header_in_syncing_context(zilog);
   516 
   549 
   517 	if (zilog->zl_spa->spa_log_state == SPA_LOG_CLEAR) {
   550 	if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
   518 		if (!BP_IS_HOLE(&zh->zh_log))
   551 		if (!BP_IS_HOLE(&zh->zh_log))
   519 			zio_free_blk(zilog->zl_spa, &zh->zh_log, first_txg);
   552 			zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
   520 		BP_ZERO(&zh->zh_log);
   553 		BP_ZERO(&zh->zh_log);
   521 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
   554 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
   522 		dmu_objset_rele(os, FTAG);
   555 		dmu_objset_rele(os, FTAG);
   523 		return (0);
   556 		return (0);
   524 	}
       
   525 
       
   526 	/*
       
   527 	 * Record here whether the zil has any records to replay.
       
   528 	 * If the header block pointer is null or the block points
       
   529 	 * to the stubby then we know there are no valid log records.
       
   530 	 * We use the header to store this state as the the zilog gets
       
   531 	 * freed later in dmu_objset_close().
       
   532 	 * The flags (and the rest of the header fields) are cleared in
       
   533 	 * zil_sync() as a result of a zil_destroy(), after replaying the log.
       
   534 	 *
       
   535 	 * Note, the intent log can be empty but still need the
       
   536 	 * stubby to be claimed.
       
   537 	 */
       
   538 	if (!zil_empty(zilog)) {
       
   539 		zh->zh_flags |= ZIL_REPLAY_NEEDED;
       
   540 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
       
   541 	}
   557 	}
   542 
   558 
   543 	/*
   559 	/*
   544 	 * Claim all log blocks if we haven't already done so, and remember
   560 	 * Claim all log blocks if we haven't already done so, and remember
   545 	 * the highest claimed sequence number.  This ensures that if we can
   561 	 * the highest claimed sequence number.  This ensures that if we can
   547 	 * but we can read the entire log later, we will not try to replay
   563 	 * but we can read the entire log later, we will not try to replay
   548 	 * or destroy beyond the last block we successfully claimed.
   564 	 * or destroy beyond the last block we successfully claimed.
   549 	 */
   565 	 */
   550 	ASSERT3U(zh->zh_claim_txg, <=, first_txg);
   566 	ASSERT3U(zh->zh_claim_txg, <=, first_txg);
   551 	if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
   567 	if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
       
   568 		(void) zil_parse(zilog, zil_claim_log_block,
       
   569 		    zil_claim_log_record, tx, first_txg);
   552 		zh->zh_claim_txg = first_txg;
   570 		zh->zh_claim_txg = first_txg;
   553 		zh->zh_claim_seq = zil_parse(zilog, zil_claim_log_block,
   571 		zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
   554 		    zil_claim_log_record, tx, first_txg);
   572 		zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
       
   573 		if (zilog->zl_parse_lr_count || zilog->zl_parse_blk_count > 1)
       
   574 			zh->zh_flags |= ZIL_REPLAY_NEEDED;
       
   575 		zh->zh_flags |= ZIL_CLAIM_LR_SEQ_VALID;
   555 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
   576 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
   556 	}
   577 	}
   557 
   578 
   558 	ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
   579 	ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
   559 	dmu_objset_rele(os, FTAG);
   580 	dmu_objset_rele(os, FTAG);
   563 /*
   584 /*
   564  * Check the log by walking the log chain.
   585  * Check the log by walking the log chain.
   565  * Checksum errors are ok as they indicate the end of the chain.
   586  * Checksum errors are ok as they indicate the end of the chain.
   566  * Any other error (no device or read failure) returns an error.
   587  * Any other error (no device or read failure) returns an error.
   567  */
   588  */
   568 /* ARGSUSED */
       
   569 int
   589 int
   570 zil_check_log_chain(char *osname, void *txarg)
   590 zil_check_log_chain(char *osname, void *tx)
   571 {
   591 {
   572 	zilog_t *zilog;
   592 	zilog_t *zilog;
   573 	zil_header_t *zh;
       
   574 	blkptr_t blk;
       
   575 	arc_buf_t *abuf;
       
   576 	objset_t *os;
   593 	objset_t *os;
   577 	char *lrbuf;
       
   578 	zil_trailer_t *ztp;
       
   579 	int error;
   594 	int error;
       
   595 
       
   596 	ASSERT(tx == NULL);
   580 
   597 
   581 	error = dmu_objset_hold(osname, FTAG, &os);
   598 	error = dmu_objset_hold(osname, FTAG, &os);
   582 	if (error) {
   599 	if (error) {
   583 		cmn_err(CE_WARN, "can't open objset for %s", osname);
   600 		cmn_err(CE_WARN, "can't open objset for %s", osname);
   584 		return (0);
   601 		return (0);
   585 	}
   602 	}
   586 
   603 
   587 	zilog = dmu_objset_zil(os);
   604 	zilog = dmu_objset_zil(os);
   588 	zh = zil_header_in_syncing_context(zilog);
   605 
   589 	blk = zh->zh_log;
   606 	/*
   590 	if (BP_IS_HOLE(&blk)) {
   607 	 * Because tx == NULL, zil_claim_log_block() will not actually claim
   591 		dmu_objset_rele(os, FTAG);
   608 	 * any blocks, but just determine whether it is possible to do so.
   592 		return (0); /* no chain */
   609 	 * In addition to checking the log chain, zil_claim_log_block()
   593 	}
   610 	 * will invoke zio_claim() with a done func of spa_claim_notify(),
   594 
   611 	 * which will update spa_max_claim_txg.  See spa_load() for details.
   595 	for (;;) {
   612 	 */
   596 		error = zil_read_log_block(zilog, &blk, &abuf);
   613 	error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
   597 		if (error)
   614 	    zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
   598 			break;
   615 
   599 		lrbuf = abuf->b_data;
       
   600 		ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
       
   601 		blk = ztp->zit_next_blk;
       
   602 		VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
       
   603 	}
       
   604 	dmu_objset_rele(os, FTAG);
   616 	dmu_objset_rele(os, FTAG);
   605 	if (error == ECKSUM)
   617 
   606 		return (0); /* normal end of chain */
   618 	return ((error == ECKSUM || error == ENOENT) ? 0 : error);
   607 	return (error);
       
   608 }
   619 }
   609 
   620 
   610 static int
   621 static int
   611 zil_vdev_compare(const void *x1, const void *x2)
   622 zil_vdev_compare(const void *x1, const void *x2)
   612 {
   623 {
   620 
   631 
   621 	return (0);
   632 	return (0);
   622 }
   633 }
   623 
   634 
   624 void
   635 void
   625 zil_add_block(zilog_t *zilog, blkptr_t *bp)
   636 zil_add_block(zilog_t *zilog, const blkptr_t *bp)
   626 {
   637 {
   627 	avl_tree_t *t = &zilog->zl_vdev_tree;
   638 	avl_tree_t *t = &zilog->zl_vdev_tree;
   628 	avl_index_t where;
   639 	avl_index_t where;
   629 	zil_vdev_node_t *zv, zvsearch;
   640 	zil_vdev_node_t *zv, zvsearch;
   630 	int ndvas = BP_GET_NDVAS(bp);
   641 	int ndvas = BP_GET_NDVAS(bp);
   696 static void
   707 static void
   697 zil_lwb_write_done(zio_t *zio)
   708 zil_lwb_write_done(zio_t *zio)
   698 {
   709 {
   699 	lwb_t *lwb = zio->io_private;
   710 	lwb_t *lwb = zio->io_private;
   700 	zilog_t *zilog = lwb->lwb_zilog;
   711 	zilog_t *zilog = lwb->lwb_zilog;
       
   712 	dmu_tx_t *tx = lwb->lwb_tx;
   701 
   713 
   702 	ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
   714 	ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
   703 	ASSERT(BP_GET_CHECKSUM(zio->io_bp) == ZIO_CHECKSUM_ZILOG);
   715 	ASSERT(BP_GET_CHECKSUM(zio->io_bp) == ZIO_CHECKSUM_ZILOG);
   704 	ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
   716 	ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
   705 	ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
   717 	ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
   717 	 * the lwb if lwb_buf is null.
   729 	 * the lwb if lwb_buf is null.
   718 	 */
   730 	 */
   719 	zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
   731 	zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
   720 	mutex_enter(&zilog->zl_lock);
   732 	mutex_enter(&zilog->zl_lock);
   721 	lwb->lwb_buf = NULL;
   733 	lwb->lwb_buf = NULL;
   722 	if (zio->io_error)
   734 	lwb->lwb_tx = NULL;
   723 		zilog->zl_log_error = B_TRUE;
   735 	mutex_exit(&zilog->zl_lock);
   724 
   736 
   725 	/*
   737 	/*
   726 	 * Now that we've written this log block, we have a stable pointer
   738 	 * Now that we've written this log block, we have a stable pointer
   727 	 * to the next block in the chain, so it's OK to let the txg in
   739 	 * to the next block in the chain, so it's OK to let the txg in
   728 	 * which we allocated the next block sync. We still have the
   740 	 * which we allocated the next block sync.
   729 	 * zl_lock to ensure zil_sync doesn't kmem free the lwb.
   741 	 */
   730 	 */
   742 	dmu_tx_commit(tx);
   731 	txg_rele_to_sync(&lwb->lwb_txgh);
       
   732 	mutex_exit(&zilog->zl_lock);
       
   733 }
   743 }
   734 
   744 
   735 /*
   745 /*
   736  * Initialize the io for a log block.
   746  * Initialize the io for a log block.
   737  */
   747  */
   738 static void
   748 static void
   739 zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
   749 zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
   740 {
   750 {
   741 	zbookmark_t zb;
   751 	zbookmark_t zb;
   742 
   752 
   743 	zb.zb_objset = lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET];
   753 	SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
   744 	zb.zb_object = 0;
   754 	    ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
   745 	zb.zb_level = -1;
   755 	    lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
   746 	zb.zb_blkid = lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
       
   747 
   756 
   748 	if (zilog->zl_root_zio == NULL) {
   757 	if (zilog->zl_root_zio == NULL) {
   749 		zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
   758 		zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
   750 		    ZIO_FLAG_CANFAIL);
   759 		    ZIO_FLAG_CANFAIL);
   751 	}
   760 	}
   776 {
   785 {
   777 	lwb_t *nlwb;
   786 	lwb_t *nlwb;
   778 	zil_trailer_t *ztp = (zil_trailer_t *)(lwb->lwb_buf + lwb->lwb_sz) - 1;
   787 	zil_trailer_t *ztp = (zil_trailer_t *)(lwb->lwb_buf + lwb->lwb_sz) - 1;
   779 	spa_t *spa = zilog->zl_spa;
   788 	spa_t *spa = zilog->zl_spa;
   780 	blkptr_t *bp = &ztp->zit_next_blk;
   789 	blkptr_t *bp = &ztp->zit_next_blk;
       
   790 	dmu_tx_t *tx;
   781 	uint64_t txg;
   791 	uint64_t txg;
   782 	uint64_t zil_blksz;
   792 	uint64_t zil_blksz;
   783 	int error;
   793 	int error;
   784 
   794 
   785 	ASSERT(lwb->lwb_nused <= ZIL_BLK_DATA_SZ(lwb));
   795 	ASSERT(lwb->lwb_nused <= ZIL_BLK_DATA_SZ(lwb));
   787 	/*
   797 	/*
   788 	 * Allocate the next block and save its address in this block
   798 	 * Allocate the next block and save its address in this block
   789 	 * before writing it in order to establish the log chain.
   799 	 * before writing it in order to establish the log chain.
   790 	 * Note that if the allocation of nlwb synced before we wrote
   800 	 * Note that if the allocation of nlwb synced before we wrote
   791 	 * the block that points at it (lwb), we'd leak it if we crashed.
   801 	 * the block that points at it (lwb), we'd leak it if we crashed.
   792 	 * Therefore, we don't do txg_rele_to_sync() until zil_lwb_write_done().
   802 	 * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
   793 	 */
   803 	 * We dirty the dataset to ensure that zil_sync() will be called
   794 	txg = txg_hold_open(zilog->zl_dmu_pool, &lwb->lwb_txgh);
   804 	 * to clean up in the event of allocation failure or I/O failure.
   795 	txg_rele_to_quiesce(&lwb->lwb_txgh);
   805 	 */
       
   806 	tx = dmu_tx_create(zilog->zl_os);
       
   807 	VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
       
   808 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
       
   809 	txg = dmu_tx_get_txg(tx);
       
   810 
       
   811 	lwb->lwb_tx = tx;
   796 
   812 
   797 	/*
   813 	/*
   798 	 * Pick a ZIL blocksize. We request a size that is the
   814 	 * Pick a ZIL blocksize. We request a size that is the
   799 	 * maximum of the previous used size, the current used size and
   815 	 * maximum of the previous used size, the current used size and
   800 	 * the amount waiting in the queue.
   816 	 * the amount waiting in the queue.
   806 	if (zil_blksz > ZIL_MAX_BLKSZ)
   822 	if (zil_blksz > ZIL_MAX_BLKSZ)
   807 		zil_blksz = ZIL_MAX_BLKSZ;
   823 		zil_blksz = ZIL_MAX_BLKSZ;
   808 
   824 
   809 	BP_ZERO(bp);
   825 	BP_ZERO(bp);
   810 	/* pass the old blkptr in order to spread log blocks across devs */
   826 	/* pass the old blkptr in order to spread log blocks across devs */
   811 	error = zio_alloc_blk(spa, zil_blksz, bp, &lwb->lwb_blk, txg,
   827 	error = zio_alloc_zil(spa, txg, bp, &lwb->lwb_blk, zil_blksz,
   812 	    USE_SLOG(zilog));
   828 	    USE_SLOG(zilog));
   813 	if (error) {
   829 	if (error) {
   814 		dmu_tx_t *tx = dmu_tx_create_assigned(zilog->zl_dmu_pool, txg);
       
   815 
       
   816 		/*
   830 		/*
   817 		 * We dirty the dataset to ensure that zil_sync() will
   831 		 * Since we've just experienced an allocation failure,
   818 		 * be called to remove this lwb from our zl_lwb_list.
       
   819 		 * Failing to do so, may leave an lwb with a NULL lwb_buf
       
   820 		 * hanging around on the zl_lwb_list.
       
   821 		 */
       
   822 		dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
       
   823 		dmu_tx_commit(tx);
       
   824 
       
   825 		/*
       
   826 		 * Since we've just experienced an allocation failure so we
       
   827 		 * terminate the current lwb and send it on its way.
   832 		 * terminate the current lwb and send it on its way.
   828 		 */
   833 		 */
   829 		ztp->zit_pad = 0;
   834 		ztp->zit_pad = 0;
   830 		ztp->zit_nused = lwb->lwb_nused;
   835 		ztp->zit_nused = lwb->lwb_nused;
   831 		ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
   836 		ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
   846 
   851 
   847 	/*
   852 	/*
   848 	 * Allocate a new log write buffer (lwb).
   853 	 * Allocate a new log write buffer (lwb).
   849 	 */
   854 	 */
   850 	nlwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
   855 	nlwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
   851 
       
   852 	nlwb->lwb_zilog = zilog;
   856 	nlwb->lwb_zilog = zilog;
   853 	nlwb->lwb_blk = *bp;
   857 	nlwb->lwb_blk = *bp;
   854 	nlwb->lwb_nused = 0;
   858 	nlwb->lwb_nused = 0;
   855 	nlwb->lwb_sz = BP_GET_LSIZE(&nlwb->lwb_blk);
   859 	nlwb->lwb_sz = BP_GET_LSIZE(&nlwb->lwb_blk);
   856 	nlwb->lwb_buf = zio_buf_alloc(nlwb->lwb_sz);
   860 	nlwb->lwb_buf = zio_buf_alloc(nlwb->lwb_sz);
   857 	nlwb->lwb_max_txg = txg;
   861 	nlwb->lwb_max_txg = txg;
   858 	nlwb->lwb_zio = NULL;
   862 	nlwb->lwb_zio = NULL;
       
   863 	nlwb->lwb_tx = NULL;
   859 
   864 
   860 	/*
   865 	/*
   861 	 * Put new lwb at the end of the log chain
   866 	 * Put new lwb at the end of the log chain
   862 	 */
   867 	 */
   863 	mutex_enter(&zilog->zl_lock);
   868 	mutex_enter(&zilog->zl_lock);
   868 	zil_add_block(zilog, &lwb->lwb_blk);
   873 	zil_add_block(zilog, &lwb->lwb_blk);
   869 
   874 
   870 	/*
   875 	/*
   871 	 * kick off the write for the old log block
   876 	 * kick off the write for the old log block
   872 	 */
   877 	 */
   873 	dprintf_bp(&lwb->lwb_blk, "lwb %p txg %llu: ", lwb, txg);
       
   874 	ASSERT(lwb->lwb_zio);
   878 	ASSERT(lwb->lwb_zio);
   875 	zio_nowait(lwb->lwb_zio);
   879 	zio_nowait(lwb->lwb_zio);
   876 
   880 
   877 	return (nlwb);
   881 	return (nlwb);
   878 }
   882 }
   879 
   883 
   880 static lwb_t *
   884 static lwb_t *
   881 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
   885 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
   882 {
   886 {
   883 	lr_t *lrc = &itx->itx_lr; /* common log record */
   887 	lr_t *lrc = &itx->itx_lr; /* common log record */
   884 	lr_write_t *lr = (lr_write_t *)lrc;
   888 	lr_write_t *lrw = (lr_write_t *)lrc;
       
   889 	char *lr_buf;
   885 	uint64_t txg = lrc->lrc_txg;
   890 	uint64_t txg = lrc->lrc_txg;
   886 	uint64_t reclen = lrc->lrc_reclen;
   891 	uint64_t reclen = lrc->lrc_reclen;
   887 	uint64_t dlen;
   892 	uint64_t dlen = 0;
   888 
   893 
   889 	if (lwb == NULL)
   894 	if (lwb == NULL)
   890 		return (NULL);
   895 		return (NULL);
       
   896 
   891 	ASSERT(lwb->lwb_buf != NULL);
   897 	ASSERT(lwb->lwb_buf != NULL);
   892 
   898 
   893 	if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
   899 	if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
   894 		dlen = P2ROUNDUP_TYPED(
   900 		dlen = P2ROUNDUP_TYPED(
   895 		    lr->lr_length, sizeof (uint64_t), uint64_t);
   901 		    lrw->lr_length, sizeof (uint64_t), uint64_t);
   896 	else
       
   897 		dlen = 0;
       
   898 
   902 
   899 	zilog->zl_cur_used += (reclen + dlen);
   903 	zilog->zl_cur_used += (reclen + dlen);
   900 
   904 
   901 	zil_lwb_write_init(zilog, lwb);
   905 	zil_lwb_write_init(zilog, lwb);
   902 
   906 
   913 			txg_wait_synced(zilog->zl_dmu_pool, txg);
   917 			txg_wait_synced(zilog->zl_dmu_pool, txg);
   914 			return (lwb);
   918 			return (lwb);
   915 		}
   919 		}
   916 	}
   920 	}
   917 
   921 
   918 	/*
   922 	lr_buf = lwb->lwb_buf + lwb->lwb_nused;
   919 	 * Update the lrc_seq, to be log record sequence number. See zil.h
   923 	bcopy(lrc, lr_buf, reclen);
   920 	 * Then copy the record to the log buffer.
   924 	lrc = (lr_t *)lr_buf;
   921 	 */
   925 	lrw = (lr_write_t *)lrc;
   922 	lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
       
   923 	bcopy(lrc, lwb->lwb_buf + lwb->lwb_nused, reclen);
       
   924 
   926 
   925 	/*
   927 	/*
   926 	 * If it's a write, fetch the data or get its blkptr as appropriate.
   928 	 * If it's a write, fetch the data or get its blkptr as appropriate.
   927 	 */
   929 	 */
   928 	if (lrc->lrc_txtype == TX_WRITE) {
   930 	if (lrc->lrc_txtype == TX_WRITE) {
   930 			txg_wait_synced(zilog->zl_dmu_pool, txg);
   932 			txg_wait_synced(zilog->zl_dmu_pool, txg);
   931 		if (itx->itx_wr_state != WR_COPIED) {
   933 		if (itx->itx_wr_state != WR_COPIED) {
   932 			char *dbuf;
   934 			char *dbuf;
   933 			int error;
   935 			int error;
   934 
   936 
   935 			/* alignment is guaranteed */
       
   936 			lr = (lr_write_t *)(lwb->lwb_buf + lwb->lwb_nused);
       
   937 			if (dlen) {
   937 			if (dlen) {
   938 				ASSERT(itx->itx_wr_state == WR_NEED_COPY);
   938 				ASSERT(itx->itx_wr_state == WR_NEED_COPY);
   939 				dbuf = lwb->lwb_buf + lwb->lwb_nused + reclen;
   939 				dbuf = lr_buf + reclen;
   940 				lr->lr_common.lrc_reclen += dlen;
   940 				lrw->lr_common.lrc_reclen += dlen;
   941 			} else {
   941 			} else {
   942 				ASSERT(itx->itx_wr_state == WR_INDIRECT);
   942 				ASSERT(itx->itx_wr_state == WR_INDIRECT);
   943 				dbuf = NULL;
   943 				dbuf = NULL;
   944 			}
   944 			}
   945 			error = zilog->zl_get_data(
   945 			error = zilog->zl_get_data(
   946 			    itx->itx_private, lr, dbuf, lwb->lwb_zio);
   946 			    itx->itx_private, lrw, dbuf, lwb->lwb_zio);
   947 			if (error == EIO) {
   947 			if (error == EIO) {
   948 				txg_wait_synced(zilog->zl_dmu_pool, txg);
   948 				txg_wait_synced(zilog->zl_dmu_pool, txg);
   949 				return (lwb);
   949 				return (lwb);
   950 			}
   950 			}
   951 			if (error) {
   951 			if (error) {
   954 				return (lwb);
   954 				return (lwb);
   955 			}
   955 			}
   956 		}
   956 		}
   957 	}
   957 	}
   958 
   958 
       
   959 	/*
       
   960 	 * We're actually making an entry, so update lrc_seq to be the
       
   961 	 * log record sequence number.  Note that this is generally not
       
   962 	 * equal to the itx sequence number because not all transactions
       
   963 	 * are synchronous, and sometimes spa_sync() gets there first.
       
   964 	 */
       
   965 	lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
   959 	lwb->lwb_nused += reclen + dlen;
   966 	lwb->lwb_nused += reclen + dlen;
   960 	lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
   967 	lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
   961 	ASSERT3U(lwb->lwb_nused, <=, ZIL_BLK_DATA_SZ(lwb));
   968 	ASSERT3U(lwb->lwb_nused, <=, ZIL_BLK_DATA_SZ(lwb));
   962 	ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
   969 	ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
   963 
   970 
   978 	itx->itx_lr.lrc_seq = 0;	/* defensive */
   985 	itx->itx_lr.lrc_seq = 0;	/* defensive */
   979 
   986 
   980 	return (itx);
   987 	return (itx);
   981 }
   988 }
   982 
   989 
       
   990 void
       
   991 zil_itx_destroy(itx_t *itx)
       
   992 {
       
   993 	kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
       
   994 }
       
   995 
   983 uint64_t
   996 uint64_t
   984 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
   997 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
   985 {
   998 {
   986 	uint64_t seq;
   999 	uint64_t seq;
   987 
  1000 
   988 	ASSERT(itx->itx_lr.lrc_seq == 0);
  1001 	ASSERT(itx->itx_lr.lrc_seq == 0);
       
  1002 	ASSERT(!zilog->zl_replay);
   989 
  1003 
   990 	mutex_enter(&zilog->zl_lock);
  1004 	mutex_enter(&zilog->zl_lock);
   991 	list_insert_tail(&zilog->zl_itx_list, itx);
  1005 	list_insert_tail(&zilog->zl_itx_list, itx);
   992 	zilog->zl_itx_list_sz += itx->itx_sod;
  1006 	zilog->zl_itx_list_sz += itx->itx_sod;
   993 	itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
  1007 	itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
  1032 	mutex_exit(&zilog->zl_lock);
  1046 	mutex_exit(&zilog->zl_lock);
  1033 
  1047 
  1034 	/* destroy sync'd log transactions */
  1048 	/* destroy sync'd log transactions */
  1035 	while ((itx = list_head(&clean_list)) != NULL) {
  1049 	while ((itx = list_head(&clean_list)) != NULL) {
  1036 		list_remove(&clean_list, itx);
  1050 		list_remove(&clean_list, itx);
  1037 		kmem_free(itx, offsetof(itx_t, itx_lr)
  1051 		zil_itx_destroy(itx);
  1038 		    + itx->itx_lr.lrc_reclen);
       
  1039 	}
  1052 	}
  1040 	list_destroy(&clean_list);
  1053 	list_destroy(&clean_list);
  1041 }
  1054 }
  1042 
  1055 
  1043 /*
  1056 /*
  1062 static void
  1075 static void
  1063 zil_commit_writer(zilog_t *zilog, uint64_t seq, uint64_t foid)
  1076 zil_commit_writer(zilog_t *zilog, uint64_t seq, uint64_t foid)
  1064 {
  1077 {
  1065 	uint64_t txg;
  1078 	uint64_t txg;
  1066 	uint64_t commit_seq = 0;
  1079 	uint64_t commit_seq = 0;
  1067 	itx_t *itx, *itx_next = (itx_t *)-1;
  1080 	itx_t *itx, *itx_next;
  1068 	lwb_t *lwb;
  1081 	lwb_t *lwb;
  1069 	spa_t *spa;
  1082 	spa_t *spa;
       
  1083 	int error = 0;
  1070 
  1084 
  1071 	zilog->zl_writer = B_TRUE;
  1085 	zilog->zl_writer = B_TRUE;
  1072 	ASSERT(zilog->zl_root_zio == NULL);
  1086 	ASSERT(zilog->zl_root_zio == NULL);
  1073 	spa = zilog->zl_spa;
  1087 	spa = zilog->zl_spa;
  1074 
  1088 
  1092 		}
  1106 		}
  1093 	}
  1107 	}
  1094 
  1108 
  1095 	/* Loop through in-memory log transactions filling log blocks. */
  1109 	/* Loop through in-memory log transactions filling log blocks. */
  1096 	DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
  1110 	DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
  1097 	for (;;) {
  1111 
       
  1112 	for (itx = list_head(&zilog->zl_itx_list); itx; itx = itx_next) {
  1098 		/*
  1113 		/*
  1099 		 * Find the next itx to push:
  1114 		 * Save the next pointer.  Even though we drop zl_lock below,
  1100 		 * Push all transactions related to specified foid and all
  1115 		 * all threads that can remove itx list entries (other writers
  1101 		 * other transactions except TX_WRITE, TX_TRUNCATE,
  1116 		 * and zil_itx_clean()) can't do so until they have zl_writer.
  1102 		 * TX_SETATTR and TX_ACL for all other files.
       
  1103 		 */
  1117 		 */
  1104 		if (itx_next != (itx_t *)-1)
  1118 		itx_next = list_next(&zilog->zl_itx_list, itx);
  1105 			itx = itx_next;
  1119 
  1106 		else
  1120 		/*
  1107 			itx = list_head(&zilog->zl_itx_list);
  1121 		 * Determine whether to push this itx.
  1108 		for (; itx != NULL; itx = list_next(&zilog->zl_itx_list, itx)) {
  1122 		 * Push all transactions related to specified foid and
  1109 			if (foid == 0) /* push all foids? */
  1123 		 * all other transactions except those that can be logged
  1110 				break;
  1124 		 * out of order (TX_WRITE, TX_TRUNCATE, TX_SETATTR, TX_ACL)
  1111 			if (itx->itx_sync) /* push all O_[D]SYNC */
  1125 		 * for all other files.
  1112 				break;
  1126 		 *
  1113 			switch (itx->itx_lr.lrc_txtype) {
  1127 		 * If foid == 0 (meaning "push all foids") or
  1114 			case TX_SETATTR:
  1128 		 * itx->itx_sync is set (meaning O_[D]SYNC), push regardless.
  1115 			case TX_WRITE:
  1129 		 */
  1116 			case TX_TRUNCATE:
  1130 		if (foid != 0 && !itx->itx_sync &&
  1117 			case TX_ACL:
  1131 		    TX_OOO(itx->itx_lr.lrc_txtype) &&
  1118 				/* lr_foid is same offset for these records */
  1132 		    ((lr_ooo_t *)&itx->itx_lr)->lr_foid != foid)
  1119 				if (((lr_write_t *)&itx->itx_lr)->lr_foid
  1133 			continue; /* skip this record */
  1120 				    != foid) {
       
  1121 					continue; /* skip this record */
       
  1122 				}
       
  1123 			}
       
  1124 			break;
       
  1125 		}
       
  1126 		if (itx == NULL)
       
  1127 			break;
       
  1128 
  1134 
  1129 		if ((itx->itx_lr.lrc_seq > seq) &&
  1135 		if ((itx->itx_lr.lrc_seq > seq) &&
  1130 		    ((lwb == NULL) || (lwb->lwb_nused == 0) ||
  1136 		    ((lwb == NULL) || (lwb->lwb_nused == 0) ||
  1131 		    (lwb->lwb_nused + itx->itx_sod > ZIL_BLK_DATA_SZ(lwb)))) {
  1137 		    (lwb->lwb_nused + itx->itx_sod > ZIL_BLK_DATA_SZ(lwb))))
  1132 			break;
  1138 			break;
  1133 		}
  1139 
  1134 
       
  1135 		/*
       
  1136 		 * Save the next pointer.  Even though we soon drop
       
  1137 		 * zl_lock all threads that may change the list
       
  1138 		 * (another writer or zil_itx_clean) can't do so until
       
  1139 		 * they have zl_writer.
       
  1140 		 */
       
  1141 		itx_next = list_next(&zilog->zl_itx_list, itx);
       
  1142 		list_remove(&zilog->zl_itx_list, itx);
  1140 		list_remove(&zilog->zl_itx_list, itx);
  1143 		zilog->zl_itx_list_sz -= itx->itx_sod;
  1141 		zilog->zl_itx_list_sz -= itx->itx_sod;
       
  1142 
  1144 		mutex_exit(&zilog->zl_lock);
  1143 		mutex_exit(&zilog->zl_lock);
       
  1144 
  1145 		txg = itx->itx_lr.lrc_txg;
  1145 		txg = itx->itx_lr.lrc_txg;
  1146 		ASSERT(txg);
  1146 		ASSERT(txg);
  1147 
  1147 
  1148 		if (txg > spa_last_synced_txg(spa) ||
  1148 		if (txg > spa_last_synced_txg(spa) ||
  1149 		    txg > spa_freeze_txg(spa))
  1149 		    txg > spa_freeze_txg(spa))
  1150 			lwb = zil_lwb_commit(zilog, itx, lwb);
  1150 			lwb = zil_lwb_commit(zilog, itx, lwb);
  1151 		kmem_free(itx, offsetof(itx_t, itx_lr)
  1151 
  1152 		    + itx->itx_lr.lrc_reclen);
  1152 		zil_itx_destroy(itx);
       
  1153 
  1153 		mutex_enter(&zilog->zl_lock);
  1154 		mutex_enter(&zilog->zl_lock);
  1154 	}
  1155 	}
  1155 	DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
  1156 	DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
  1156 	/* determine commit sequence number */
  1157 	/* determine commit sequence number */
  1157 	itx = list_head(&zilog->zl_itx_list);
  1158 	itx = list_head(&zilog->zl_itx_list);
  1158 	if (itx)
  1159 	if (itx)
  1159 		commit_seq = itx->itx_lr.lrc_seq;
  1160 		commit_seq = itx->itx_lr.lrc_seq - 1;
  1160 	else
  1161 	else
  1161 		commit_seq = zilog->zl_itx_seq;
  1162 		commit_seq = zilog->zl_itx_seq;
  1162 	mutex_exit(&zilog->zl_lock);
  1163 	mutex_exit(&zilog->zl_lock);
  1163 
  1164 
  1164 	/* write the last block out */
  1165 	/* write the last block out */
  1171 	/*
  1172 	/*
  1172 	 * Wait if necessary for the log blocks to be on stable storage.
  1173 	 * Wait if necessary for the log blocks to be on stable storage.
  1173 	 */
  1174 	 */
  1174 	if (zilog->zl_root_zio) {
  1175 	if (zilog->zl_root_zio) {
  1175 		DTRACE_PROBE1(zil__cw3, zilog_t *, zilog);
  1176 		DTRACE_PROBE1(zil__cw3, zilog_t *, zilog);
  1176 		(void) zio_wait(zilog->zl_root_zio);
  1177 		error = zio_wait(zilog->zl_root_zio);
  1177 		zilog->zl_root_zio = NULL;
  1178 		zilog->zl_root_zio = NULL;
  1178 		DTRACE_PROBE1(zil__cw4, zilog_t *, zilog);
  1179 		DTRACE_PROBE1(zil__cw4, zilog_t *, zilog);
  1179 		zil_flush_vdevs(zilog);
  1180 		zil_flush_vdevs(zilog);
  1180 	}
  1181 	}
  1181 
  1182 
  1182 	if (zilog->zl_log_error || lwb == NULL) {
  1183 	if (error || lwb == NULL)
  1183 		zilog->zl_log_error = 0;
       
  1184 		txg_wait_synced(zilog->zl_dmu_pool, 0);
  1184 		txg_wait_synced(zilog->zl_dmu_pool, 0);
  1185 	}
       
  1186 
  1185 
  1187 	mutex_enter(&zilog->zl_lock);
  1186 	mutex_enter(&zilog->zl_lock);
  1188 	zilog->zl_writer = B_FALSE;
  1187 	zilog->zl_writer = B_FALSE;
  1189 
  1188 
  1190 	ASSERT3U(commit_seq, >=, zilog->zl_commit_seq);
  1189 	ASSERT3U(commit_seq, >=, zilog->zl_commit_seq);
  1191 	zilog->zl_commit_seq = commit_seq;
  1190 	zilog->zl_commit_seq = commit_seq;
       
  1191 
       
  1192 	/*
       
  1193 	 * Remember the highest committed log sequence number for ztest.
       
  1194 	 * We only update this value when all the log writes succeeded,
       
  1195 	 * because ztest wants to ASSERT that it got the whole log chain.
       
  1196 	 */
       
  1197 	if (error == 0 && lwb != NULL)
       
  1198 		zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
  1192 }
  1199 }
  1193 
  1200 
  1194 /*
  1201 /*
  1195  * Push zfs transactions to stable storage up to the supplied sequence number.
  1202  * Push zfs transactions to stable storage up to the supplied sequence number.
  1196  * If foid is 0 push out all transactions, otherwise push only those
  1203  * If foid is 0 push out all transactions, otherwise push only those
  1206 
  1213 
  1207 	seq = MIN(seq, zilog->zl_itx_seq);	/* cap seq at largest itx seq */
  1214 	seq = MIN(seq, zilog->zl_itx_seq);	/* cap seq at largest itx seq */
  1208 
  1215 
  1209 	while (zilog->zl_writer) {
  1216 	while (zilog->zl_writer) {
  1210 		cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
  1217 		cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
  1211 		if (seq < zilog->zl_commit_seq) {
  1218 		if (seq <= zilog->zl_commit_seq) {
  1212 			mutex_exit(&zilog->zl_lock);
  1219 			mutex_exit(&zilog->zl_lock);
  1213 			return;
  1220 			return;
  1214 		}
  1221 		}
  1215 	}
  1222 	}
  1216 	zil_commit_writer(zilog, seq, foid); /* drops zl_lock */
  1223 	zil_commit_writer(zilog, seq, foid); /* drops zl_lock */
  1218 	cv_broadcast(&zilog->zl_cv_writer);
  1225 	cv_broadcast(&zilog->zl_cv_writer);
  1219 	mutex_exit(&zilog->zl_lock);
  1226 	mutex_exit(&zilog->zl_lock);
  1220 }
  1227 }
  1221 
  1228 
  1222 /*
  1229 /*
       
  1230  * Report whether all transactions are committed.
       
  1231  */
       
  1232 static boolean_t
       
  1233 zil_is_committed(zilog_t *zilog)
       
  1234 {
       
  1235 	lwb_t *lwb;
       
  1236 	boolean_t committed;
       
  1237 
       
  1238 	mutex_enter(&zilog->zl_lock);
       
  1239 
       
  1240 	while (zilog->zl_writer)
       
  1241 		cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
       
  1242 
       
  1243 	if (!list_is_empty(&zilog->zl_itx_list))
       
  1244 		committed = B_FALSE;		/* unpushed transactions */
       
  1245 	else if ((lwb = list_head(&zilog->zl_lwb_list)) == NULL)
       
  1246 		committed = B_TRUE;		/* intent log never used */
       
  1247 	else if (list_next(&zilog->zl_lwb_list, lwb) != NULL)
       
  1248 		committed = B_FALSE;		/* zil_sync() not done yet */
       
  1249 	else
       
  1250 		committed = B_TRUE;		/* everything synced */
       
  1251 
       
  1252 	mutex_exit(&zilog->zl_lock);
       
  1253 	return (committed);
       
  1254 }
       
  1255 
       
  1256 /*
  1223  * Called in syncing context to free committed log blocks and update log header.
  1257  * Called in syncing context to free committed log blocks and update log header.
  1224  */
  1258  */
  1225 void
  1259 void
  1226 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
  1260 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
  1227 {
  1261 {
  1228 	zil_header_t *zh = zil_header_in_syncing_context(zilog);
  1262 	zil_header_t *zh = zil_header_in_syncing_context(zilog);
  1229 	uint64_t txg = dmu_tx_get_txg(tx);
  1263 	uint64_t txg = dmu_tx_get_txg(tx);
  1230 	spa_t *spa = zilog->zl_spa;
  1264 	spa_t *spa = zilog->zl_spa;
       
  1265 	uint64_t *replayed_seq = &zilog->zl_replayed_seq[txg & TXG_MASK];
  1231 	lwb_t *lwb;
  1266 	lwb_t *lwb;
  1232 
  1267 
  1233 	/*
  1268 	/*
  1234 	 * We don't zero out zl_destroy_txg, so make sure we don't try
  1269 	 * We don't zero out zl_destroy_txg, so make sure we don't try
  1235 	 * to destroy it twice.
  1270 	 * to destroy it twice.
  1239 
  1274 
  1240 	mutex_enter(&zilog->zl_lock);
  1275 	mutex_enter(&zilog->zl_lock);
  1241 
  1276 
  1242 	ASSERT(zilog->zl_stop_sync == 0);
  1277 	ASSERT(zilog->zl_stop_sync == 0);
  1243 
  1278 
  1244 	zh->zh_replay_seq = zilog->zl_replayed_seq[txg & TXG_MASK];
  1279 	if (*replayed_seq != 0) {
       
  1280 		ASSERT(zh->zh_replay_seq < *replayed_seq);
       
  1281 		zh->zh_replay_seq = *replayed_seq;
       
  1282 		*replayed_seq = 0;
       
  1283 	}
  1245 
  1284 
  1246 	if (zilog->zl_destroy_txg == txg) {
  1285 	if (zilog->zl_destroy_txg == txg) {
  1247 		blkptr_t blk = zh->zh_log;
  1286 		blkptr_t blk = zh->zh_log;
  1248 
  1287 
  1249 		ASSERT(list_head(&zilog->zl_lwb_list) == NULL);
  1288 		ASSERT(list_head(&zilog->zl_lwb_list) == NULL);
  1268 	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
  1307 	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
  1269 		zh->zh_log = lwb->lwb_blk;
  1308 		zh->zh_log = lwb->lwb_blk;
  1270 		if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
  1309 		if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
  1271 			break;
  1310 			break;
  1272 		list_remove(&zilog->zl_lwb_list, lwb);
  1311 		list_remove(&zilog->zl_lwb_list, lwb);
  1273 		zio_free_blk(spa, &lwb->lwb_blk, txg);
  1312 		zio_free_zil(spa, txg, &lwb->lwb_blk);
  1274 		kmem_cache_free(zil_lwb_cache, lwb);
  1313 		kmem_cache_free(zil_lwb_cache, lwb);
  1275 
  1314 
  1276 		/*
  1315 		/*
  1277 		 * If we don't have anything left in the lwb list then
  1316 		 * If we don't have anything left in the lwb list then
  1278 		 * we've had an allocation failure and we need to zero
  1317 		 * we've had an allocation failure and we need to zero
  1391 	 * (so zil_sync() will be called) and wait for that txg to sync.
  1430 	 * (so zil_sync() will be called) and wait for that txg to sync.
  1392 	 */
  1431 	 */
  1393 	if (!zil_is_committed(zilog)) {
  1432 	if (!zil_is_committed(zilog)) {
  1394 		uint64_t txg;
  1433 		uint64_t txg;
  1395 		dmu_tx_t *tx = dmu_tx_create(zilog->zl_os);
  1434 		dmu_tx_t *tx = dmu_tx_create(zilog->zl_os);
  1396 		(void) dmu_tx_assign(tx, TXG_WAIT);
  1435 		VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
  1397 		dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
  1436 		dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
  1398 		txg = dmu_tx_get_txg(tx);
  1437 		txg = dmu_tx_get_txg(tx);
  1399 		dmu_tx_commit(tx);
  1438 		dmu_tx_commit(tx);
  1400 		txg_wait_synced(zilog->zl_dmu_pool, txg);
  1439 		txg_wait_synced(zilog->zl_dmu_pool, txg);
  1401 	}
  1440 	}
  1464 	ASSERT(zilog->zl_suspend != 0);
  1503 	ASSERT(zilog->zl_suspend != 0);
  1465 	zilog->zl_suspend--;
  1504 	zilog->zl_suspend--;
  1466 	mutex_exit(&zilog->zl_lock);
  1505 	mutex_exit(&zilog->zl_lock);
  1467 }
  1506 }
  1468 
  1507 
  1469 /*
       
  1470  * Read in the data for the dmu_sync()ed block, and change the log
       
  1471  * record to write this whole block.
       
  1472  */
       
  1473 void
       
  1474 zil_get_replay_data(zilog_t *zilog, lr_write_t *lr)
       
  1475 {
       
  1476 	blkptr_t *wbp = &lr->lr_blkptr;
       
  1477 	char *wbuf = (char *)(lr + 1); /* data follows lr_write_t */
       
  1478 	uint64_t blksz;
       
  1479 
       
  1480 	if (BP_IS_HOLE(wbp)) {	/* compressed to a hole */
       
  1481 		blksz = BP_GET_LSIZE(&lr->lr_blkptr);
       
  1482 		/*
       
  1483 		 * If the blksz is zero then we must be replaying a log
       
  1484 		 * from an version prior to setting the blksize of null blocks.
       
  1485 		 * So we just zero the actual write size reqeusted.
       
  1486 		 */
       
  1487 		if (blksz == 0) {
       
  1488 			bzero(wbuf, lr->lr_length);
       
  1489 			return;
       
  1490 		}
       
  1491 		bzero(wbuf, blksz);
       
  1492 	} else {
       
  1493 		/*
       
  1494 		 * A subsequent write may have overwritten this block, in which
       
  1495 		 * case wbp may have been been freed and reallocated, and our
       
  1496 		 * read of wbp may fail with a checksum error.  We can safely
       
  1497 		 * ignore this because the later write will provide the
       
  1498 		 * correct data.
       
  1499 		 */
       
  1500 		zbookmark_t zb;
       
  1501 
       
  1502 		zb.zb_objset = dmu_objset_id(zilog->zl_os);
       
  1503 		zb.zb_object = lr->lr_foid;
       
  1504 		zb.zb_level = 0;
       
  1505 		zb.zb_blkid = -1; /* unknown */
       
  1506 
       
  1507 		blksz = BP_GET_LSIZE(&lr->lr_blkptr);
       
  1508 		(void) zio_wait(zio_read(NULL, zilog->zl_spa, wbp, wbuf, blksz,
       
  1509 		    NULL, NULL, ZIO_PRIORITY_SYNC_READ,
       
  1510 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE, &zb));
       
  1511 	}
       
  1512 	lr->lr_offset -= lr->lr_offset % blksz;
       
  1513 	lr->lr_length = blksz;
       
  1514 }
       
  1515 
       
  1516 typedef struct zil_replay_arg {
  1508 typedef struct zil_replay_arg {
  1517 	objset_t	*zr_os;
       
  1518 	zil_replay_func_t **zr_replay;
  1509 	zil_replay_func_t **zr_replay;
  1519 	void		*zr_arg;
  1510 	void		*zr_arg;
  1520 	boolean_t	zr_byteswap;
  1511 	boolean_t	zr_byteswap;
  1521 	char		*zr_lrbuf;
  1512 	char		*zr_lr;
  1522 } zil_replay_arg_t;
  1513 } zil_replay_arg_t;
  1523 
  1514 
  1524 static void
  1515 static int
       
  1516 zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
       
  1517 {
       
  1518 	char name[MAXNAMELEN];
       
  1519 
       
  1520 	zilog->zl_replaying_seq--;	/* didn't actually replay this one */
       
  1521 
       
  1522 	dmu_objset_name(zilog->zl_os, name);
       
  1523 
       
  1524 	cmn_err(CE_WARN, "ZFS replay transaction error %d, "
       
  1525 	    "dataset %s, seq 0x%llx, txtype %llu %s\n", error, name,
       
  1526 	    (u_longlong_t)lr->lrc_seq,
       
  1527 	    (u_longlong_t)(lr->lrc_txtype & ~TX_CI),
       
  1528 	    (lr->lrc_txtype & TX_CI) ? "CI" : "");
       
  1529 
       
  1530 	return (error);
       
  1531 }
       
  1532 
       
  1533 static int
  1525 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
  1534 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
  1526 {
  1535 {
  1527 	zil_replay_arg_t *zr = zra;
  1536 	zil_replay_arg_t *zr = zra;
  1528 	const zil_header_t *zh = zilog->zl_header;
  1537 	const zil_header_t *zh = zilog->zl_header;
  1529 	uint64_t reclen = lr->lrc_reclen;
  1538 	uint64_t reclen = lr->lrc_reclen;
  1530 	uint64_t txtype = lr->lrc_txtype;
  1539 	uint64_t txtype = lr->lrc_txtype;
  1531 	char *name;
  1540 	int error = 0;
  1532 	int pass, error;
  1541 
  1533 
  1542 	zilog->zl_replaying_seq = lr->lrc_seq;
  1534 	if (!zilog->zl_replay)			/* giving up */
  1543 
  1535 		return;
  1544 	if (lr->lrc_seq <= zh->zh_replay_seq)	/* already replayed */
       
  1545 		return (0);
  1536 
  1546 
  1537 	if (lr->lrc_txg < claim_txg)		/* already committed */
  1547 	if (lr->lrc_txg < claim_txg)		/* already committed */
  1538 		return;
  1548 		return (0);
  1539 
       
  1540 	if (lr->lrc_seq <= zh->zh_replay_seq)	/* already replayed */
       
  1541 		return;
       
  1542 
  1549 
  1543 	/* Strip case-insensitive bit, still present in log record */
  1550 	/* Strip case-insensitive bit, still present in log record */
  1544 	txtype &= ~TX_CI;
  1551 	txtype &= ~TX_CI;
  1545 
  1552 
  1546 	if (txtype == 0 || txtype >= TX_MAX_TYPE) {
  1553 	if (txtype == 0 || txtype >= TX_MAX_TYPE)
  1547 		error = EINVAL;
  1554 		return (zil_replay_error(zilog, lr, EINVAL));
  1548 		goto bad;
  1555 
       
  1556 	/*
       
  1557 	 * If this record type can be logged out of order, the object
       
  1558 	 * (lr_foid) may no longer exist.  That's legitimate, not an error.
       
  1559 	 */
       
  1560 	if (TX_OOO(txtype)) {
       
  1561 		error = dmu_object_info(zilog->zl_os,
       
  1562 		    ((lr_ooo_t *)lr)->lr_foid, NULL);
       
  1563 		if (error == ENOENT || error == EEXIST)
       
  1564 			return (0);
  1549 	}
  1565 	}
  1550 
  1566 
  1551 	/*
  1567 	/*
  1552 	 * Make a copy of the data so we can revise and extend it.
  1568 	 * Make a copy of the data so we can revise and extend it.
  1553 	 */
  1569 	 */
  1554 	bcopy(lr, zr->zr_lrbuf, reclen);
  1570 	bcopy(lr, zr->zr_lr, reclen);
       
  1571 
       
  1572 	/*
       
  1573 	 * If this is a TX_WRITE with a blkptr, suck in the data.
       
  1574 	 */
       
  1575 	if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
       
  1576 		error = zil_read_log_data(zilog, (lr_write_t *)lr,
       
  1577 		    zr->zr_lr + reclen);
       
  1578 		if (error)
       
  1579 			return (zil_replay_error(zilog, lr, error));
       
  1580 	}
  1555 
  1581 
  1556 	/*
  1582 	/*
  1557 	 * The log block containing this lr may have been byteswapped
  1583 	 * The log block containing this lr may have been byteswapped
  1558 	 * so that we can easily examine common fields like lrc_txtype.
  1584 	 * so that we can easily examine common fields like lrc_txtype.
  1559 	 * However, the log is a mix of different data types, and only the
  1585 	 * However, the log is a mix of different record types, and only the
  1560 	 * replay vectors know how to byteswap their records.  Therefore, if
  1586 	 * replay vectors know how to byteswap their records.  Therefore, if
  1561 	 * the lr was byteswapped, undo it before invoking the replay vector.
  1587 	 * the lr was byteswapped, undo it before invoking the replay vector.
  1562 	 */
  1588 	 */
  1563 	if (zr->zr_byteswap)
  1589 	if (zr->zr_byteswap)
  1564 		byteswap_uint64_array(zr->zr_lrbuf, reclen);
  1590 		byteswap_uint64_array(zr->zr_lr, reclen);
  1565 
  1591 
  1566 	/*
  1592 	/*
  1567 	 * We must now do two things atomically: replay this log record,
  1593 	 * We must now do two things atomically: replay this log record,
  1568 	 * and update the log header sequence number to reflect the fact that
  1594 	 * and update the log header sequence number to reflect the fact that
  1569 	 * we did so. At the end of each replay function the sequence number
  1595 	 * we did so. At the end of each replay function the sequence number
  1570 	 * is updated if we are in replay mode.
  1596 	 * is updated if we are in replay mode.
  1571 	 */
  1597 	 */
  1572 	for (pass = 1; pass <= 2; pass++) {
  1598 	error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, zr->zr_byteswap);
  1573 		zilog->zl_replaying_seq = lr->lrc_seq;
  1599 	if (error) {
  1574 		/* Only byteswap (if needed) on the 1st pass.  */
       
  1575 		error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lrbuf,
       
  1576 		    zr->zr_byteswap && pass == 1);
       
  1577 
       
  1578 		if (!error)
       
  1579 			return;
       
  1580 
       
  1581 		/*
  1600 		/*
  1582 		 * The DMU's dnode layer doesn't see removes until the txg
  1601 		 * The DMU's dnode layer doesn't see removes until the txg
  1583 		 * commits, so a subsequent claim can spuriously fail with
  1602 		 * commits, so a subsequent claim can spuriously fail with
  1584 		 * EEXIST. So if we receive any error we try syncing out
  1603 		 * EEXIST. So if we receive any error we try syncing out
  1585 		 * any removes then retry the transaction.
  1604 		 * any removes then retry the transaction.  Note that we
       
  1605 		 * specify B_FALSE for byteswap now, so we don't do it twice.
  1586 		 */
  1606 		 */
  1587 		if (pass == 1)
  1607 		txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
  1588 			txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
  1608 		error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, B_FALSE);
  1589 	}
  1609 		if (error)
  1590 
  1610 			return (zil_replay_error(zilog, lr, error));
  1591 bad:
  1611 	}
  1592 	ASSERT(error);
  1612 	return (0);
  1593 	name = kmem_alloc(MAXNAMELEN, KM_SLEEP);
       
  1594 	dmu_objset_name(zr->zr_os, name);
       
  1595 	cmn_err(CE_WARN, "ZFS replay transaction error %d, "
       
  1596 	    "dataset %s, seq 0x%llx, txtype %llu %s\n",
       
  1597 	    error, name, (u_longlong_t)lr->lrc_seq, (u_longlong_t)txtype,
       
  1598 	    (lr->lrc_txtype & TX_CI) ? "CI" : "");
       
  1599 	zilog->zl_replay = B_FALSE;
       
  1600 	kmem_free(name, MAXNAMELEN);
       
  1601 }
  1613 }
  1602 
  1614 
  1603 /* ARGSUSED */
  1615 /* ARGSUSED */
  1604 static void
  1616 static int
  1605 zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
  1617 zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
  1606 {
  1618 {
  1607 	zilog->zl_replay_blks++;
  1619 	zilog->zl_replay_blks++;
       
  1620 
       
  1621 	return (0);
  1608 }
  1622 }
  1609 
  1623 
  1610 /*
  1624 /*
  1611  * If this dataset has a non-empty intent log, replay it and destroy it.
  1625  * If this dataset has a non-empty intent log, replay it and destroy it.
  1612  */
  1626  */
  1620 	if ((zh->zh_flags & ZIL_REPLAY_NEEDED) == 0) {
  1634 	if ((zh->zh_flags & ZIL_REPLAY_NEEDED) == 0) {
  1621 		zil_destroy(zilog, B_TRUE);
  1635 		zil_destroy(zilog, B_TRUE);
  1622 		return;
  1636 		return;
  1623 	}
  1637 	}
  1624 
  1638 
  1625 	zr.zr_os = os;
       
  1626 	zr.zr_replay = replay_func;
  1639 	zr.zr_replay = replay_func;
  1627 	zr.zr_arg = arg;
  1640 	zr.zr_arg = arg;
  1628 	zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
  1641 	zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
  1629 	zr.zr_lrbuf = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
  1642 	zr.zr_lr = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
  1630 
  1643 
  1631 	/*
  1644 	/*
  1632 	 * Wait for in-progress removes to sync before starting replay.
  1645 	 * Wait for in-progress removes to sync before starting replay.
  1633 	 */
  1646 	 */
  1634 	txg_wait_synced(zilog->zl_dmu_pool, 0);
  1647 	txg_wait_synced(zilog->zl_dmu_pool, 0);
  1636 	zilog->zl_replay = B_TRUE;
  1649 	zilog->zl_replay = B_TRUE;
  1637 	zilog->zl_replay_time = lbolt;
  1650 	zilog->zl_replay_time = lbolt;
  1638 	ASSERT(zilog->zl_replay_blks == 0);
  1651 	ASSERT(zilog->zl_replay_blks == 0);
  1639 	(void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
  1652 	(void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
  1640 	    zh->zh_claim_txg);
  1653 	    zh->zh_claim_txg);
  1641 	kmem_free(zr.zr_lrbuf, 2 * SPA_MAXBLOCKSIZE);
  1654 	kmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
  1642 
  1655 
  1643 	zil_destroy(zilog, B_FALSE);
  1656 	zil_destroy(zilog, B_FALSE);
  1644 	txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
  1657 	txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
  1645 	zilog->zl_replay = B_FALSE;
  1658 	zilog->zl_replay = B_FALSE;
  1646 }
  1659 }
  1647 
  1660 
  1648 /*
  1661 boolean_t
  1649  * Report whether all transactions are committed
  1662 zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
  1650  */
  1663 {
  1651 int
  1664 	if (zilog == NULL)
  1652 zil_is_committed(zilog_t *zilog)
  1665 		return (B_TRUE);
  1653 {
  1666 
  1654 	lwb_t *lwb;
  1667 	if (zilog->zl_replay) {
  1655 	int ret;
  1668 		dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
  1656 
  1669 		zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
  1657 	mutex_enter(&zilog->zl_lock);
  1670 		    zilog->zl_replaying_seq;
  1658 	while (zilog->zl_writer)
  1671 		return (B_TRUE);
  1659 		cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
  1672 	}
  1660 
  1673 
  1661 	/* recent unpushed intent log transactions? */
  1674 	return (B_FALSE);
  1662 	if (!list_is_empty(&zilog->zl_itx_list)) {
       
  1663 		ret = B_FALSE;
       
  1664 		goto out;
       
  1665 	}
       
  1666 
       
  1667 	/* intent log never used? */
       
  1668 	lwb = list_head(&zilog->zl_lwb_list);
       
  1669 	if (lwb == NULL) {
       
  1670 		ret = B_TRUE;
       
  1671 		goto out;
       
  1672 	}
       
  1673 
       
  1674 	/*
       
  1675 	 * more than 1 log buffer means zil_sync() hasn't yet freed
       
  1676 	 * entries after a txg has committed
       
  1677 	 */
       
  1678 	if (list_next(&zilog->zl_lwb_list, lwb)) {
       
  1679 		ret = B_FALSE;
       
  1680 		goto out;
       
  1681 	}
       
  1682 
       
  1683 	ASSERT(zil_empty(zilog));
       
  1684 	ret = B_TRUE;
       
  1685 out:
       
  1686 	cv_broadcast(&zilog->zl_cv_writer);
       
  1687 	mutex_exit(&zilog->zl_lock);
       
  1688 	return (ret);
       
  1689 }
  1675 }
  1690 
  1676 
  1691 /* ARGSUSED */
  1677 /* ARGSUSED */
  1692 int
  1678 int
  1693 zil_vdev_offline(char *osname, void *arg)
  1679 zil_vdev_offline(char *osname, void *arg)