usr/src/uts/common/fs/zfs/dmu_send.c
changeset 11007 216d8396182e
parent 10922 e2081f502306
child 11012 302f818f6d54
--- a/usr/src/uts/common/fs/zfs/dmu_send.c	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/uts/common/fs/zfs/dmu_send.c	Mon Nov 09 11:04:55 2009 -0700
@@ -38,16 +38,31 @@
 #include <sys/zfs_ioctl.h>
 #include <sys/zap.h>
 #include <sys/zio_checksum.h>
+#include <sys/avl.h>
 
 static char *dmu_recv_tag = "dmu_recv_tag";
 
+/*
+ * The list of data whose inclusion in a send stream can be pending from
+ * one call to backup_cb to another.  Multiple calls to dump_free() and
+ * dump_freeobjects() can be aggregated into a single DRR_FREE or
+ * DRR_FREEOBJECTS replay record.
+ */
+typedef enum {
+	PENDING_NONE,
+	PENDING_FREE,
+	PENDING_FREEOBJECTS
+} pendop_t;
+
 struct backuparg {
 	dmu_replay_record_t *drr;
 	vnode_t *vp;
 	offset_t *off;
 	objset_t *os;
 	zio_cksum_t zc;
+	uint64_t toguid;
 	int err;
+	pendop_t pending_op;
 };
 
 static int
@@ -68,15 +83,59 @@
 dump_free(struct backuparg *ba, uint64_t object, uint64_t offset,
     uint64_t length)
 {
-	/* write a FREE record */
+	struct drr_free *drrf = &(ba->drr->drr_u.drr_free);
+
+	/*
+	 * If there is a pending op, but it's not PENDING_FREE, push it out,
+	 * since free block aggregation can only be done for blocks of the
+	 * same type (i.e., DRR_FREE records can only be aggregated with
+	 * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
+	 * aggregated with other DRR_FREEOBJECTS records.
+	 */
+	if (ba->pending_op != PENDING_NONE && ba->pending_op != PENDING_FREE) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+		ba->pending_op = PENDING_NONE;
+	}
+
+	if (ba->pending_op == PENDING_FREE) {
+		/*
+		 * There should never be a PENDING_FREE if length is -1
+		 * (because dump_dnode is the only place where this
+		 * function is called with a -1, and only after flushing
+		 * any pending record).
+		 */
+		ASSERT(length != -1ULL);
+		/*
+		 * Check to see whether this free block can be aggregated
+		 * with pending one.
+		 */
+		if (drrf->drr_object == object && drrf->drr_offset +
+		    drrf->drr_length == offset) {
+			drrf->drr_length += length;
+			return (0);
+		} else {
+			/* not a continuation.  Push out pending record */
+			if (dump_bytes(ba, ba->drr,
+			    sizeof (dmu_replay_record_t)) != 0)
+				return (EINTR);
+			ba->pending_op = PENDING_NONE;
+		}
+	}
+	/* create a FREE record and make it pending */
 	bzero(ba->drr, sizeof (dmu_replay_record_t));
 	ba->drr->drr_type = DRR_FREE;
-	ba->drr->drr_u.drr_free.drr_object = object;
-	ba->drr->drr_u.drr_free.drr_offset = offset;
-	ba->drr->drr_u.drr_free.drr_length = length;
+	drrf->drr_object = object;
+	drrf->drr_offset = offset;
+	drrf->drr_length = length;
+	drrf->drr_toguid = ba->toguid;
+	if (length == -1ULL) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+	} else {
+		ba->pending_op = PENDING_FREE;
+	}
 
-	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
-		return (EINTR);
 	return (0);
 }
 
@@ -84,17 +143,31 @@
 dump_data(struct backuparg *ba, dmu_object_type_t type,
     uint64_t object, uint64_t offset, int blksz, void *data)
 {
+	struct drr_write *drrw = &(ba->drr->drr_u.drr_write);
+
+	/*
+	 * If there is any kind of pending aggregation (currently either
+	 * a grouping of free objects or free blocks), push it out to
+	 * the stream, since aggregation can't be done across operations
+	 * of different types.
+	 */
+	if (ba->pending_op != PENDING_NONE) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+		ba->pending_op = PENDING_NONE;
+	}
 	/* write a DATA record */
 	bzero(ba->drr, sizeof (dmu_replay_record_t));
 	ba->drr->drr_type = DRR_WRITE;
-	ba->drr->drr_u.drr_write.drr_object = object;
-	ba->drr->drr_u.drr_write.drr_type = type;
-	ba->drr->drr_u.drr_write.drr_offset = offset;
-	ba->drr->drr_u.drr_write.drr_length = blksz;
+	drrw->drr_object = object;
+	drrw->drr_type = type;
+	drrw->drr_offset = offset;
+	drrw->drr_length = blksz;
+	drrw->drr_toguid = ba->toguid;
 
-	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
+	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
 		return (EINTR);
-	if (dump_bytes(ba, data, blksz))
+	if (dump_bytes(ba, data, blksz) != 0)
 		return (EINTR);
 	return (0);
 }
@@ -102,39 +175,80 @@
 static int
 dump_freeobjects(struct backuparg *ba, uint64_t firstobj, uint64_t numobjs)
 {
+	struct drr_freeobjects *drrfo = &(ba->drr->drr_u.drr_freeobjects);
+
+	/*
+	 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
+	 * push it out, since free block aggregation can only be done for
+	 * blocks of the same type (i.e., DRR_FREE records can only be
+	 * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
+	 * can only be aggregated with other DRR_FREEOBJECTS records.
+	 */
+	if (ba->pending_op != PENDING_NONE &&
+	    ba->pending_op != PENDING_FREEOBJECTS) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+		ba->pending_op = PENDING_NONE;
+	}
+	if (ba->pending_op == PENDING_FREEOBJECTS) {
+		/*
+		 * See whether this free object array can be aggregated
+		 * with pending one
+		 */
+		if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
+			drrfo->drr_numobjs += numobjs;
+			return (0);
+		} else {
+			/* can't be aggregated.  Push out pending record */
+			if (dump_bytes(ba, ba->drr,
+			    sizeof (dmu_replay_record_t)) != 0)
+				return (EINTR);
+			ba->pending_op = PENDING_NONE;
+		}
+	}
+
 	/* write a FREEOBJECTS record */
 	bzero(ba->drr, sizeof (dmu_replay_record_t));
 	ba->drr->drr_type = DRR_FREEOBJECTS;
-	ba->drr->drr_u.drr_freeobjects.drr_firstobj = firstobj;
-	ba->drr->drr_u.drr_freeobjects.drr_numobjs = numobjs;
+	drrfo->drr_firstobj = firstobj;
+	drrfo->drr_numobjs = numobjs;
+	drrfo->drr_toguid = ba->toguid;
 
-	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
-		return (EINTR);
+	ba->pending_op = PENDING_FREEOBJECTS;
+
 	return (0);
 }
 
 static int
 dump_dnode(struct backuparg *ba, uint64_t object, dnode_phys_t *dnp)
 {
+	struct drr_object *drro = &(ba->drr->drr_u.drr_object);
+
 	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
 		return (dump_freeobjects(ba, object, 1));
 
+	if (ba->pending_op != PENDING_NONE) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+		ba->pending_op = PENDING_NONE;
+	}
+
 	/* write an OBJECT record */
 	bzero(ba->drr, sizeof (dmu_replay_record_t));
 	ba->drr->drr_type = DRR_OBJECT;
-	ba->drr->drr_u.drr_object.drr_object = object;
-	ba->drr->drr_u.drr_object.drr_type = dnp->dn_type;
-	ba->drr->drr_u.drr_object.drr_bonustype = dnp->dn_bonustype;
-	ba->drr->drr_u.drr_object.drr_blksz =
-	    dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
-	ba->drr->drr_u.drr_object.drr_bonuslen = dnp->dn_bonuslen;
-	ba->drr->drr_u.drr_object.drr_checksum = dnp->dn_checksum;
-	ba->drr->drr_u.drr_object.drr_compress = dnp->dn_compress;
+	drro->drr_object = object;
+	drro->drr_type = dnp->dn_type;
+	drro->drr_bonustype = dnp->dn_bonustype;
+	drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
+	drro->drr_bonuslen = dnp->dn_bonuslen;
+	drro->drr_checksumtype = dnp->dn_checksum;
+	drro->drr_compress = dnp->dn_compress;
+	drro->drr_toguid = ba->toguid;
 
-	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
+	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
 		return (EINTR);
 
-	if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8)))
+	if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0)
 		return (EINTR);
 
 	/* free anything past the end of the file */
@@ -256,7 +370,8 @@
 	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
 	drr->drr_type = DRR_BEGIN;
 	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
-	drr->drr_u.drr_begin.drr_version = DMU_BACKUP_STREAM_VERSION;
+	DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
+	    DMU_SUBSTREAM);
 	drr->drr_u.drr_begin.drr_creation_time =
 	    ds->ds_phys->ds_creation_time;
 	drr->drr_u.drr_begin.drr_type = tosnap->os_phys->os_type;
@@ -279,9 +394,11 @@
 	ba.vp = vp;
 	ba.os = tosnap;
 	ba.off = off;
+	ba.toguid = ds->ds_phys->ds_guid;
 	ZIO_SET_CHECKSUM(&ba.zc, 0, 0, 0, 0);
+	ba.pending_op = PENDING_NONE;
 
-	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) {
+	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)) != 0) {
 		kmem_free(drr, sizeof (dmu_replay_record_t));
 		return (ba.err);
 	}
@@ -289,6 +406,10 @@
 	err = traverse_dataset(ds, fromtxg, TRAVERSE_PRE | TRAVERSE_PREFETCH,
 	    backup_cb, &ba);
 
+	if (ba.pending_op != PENDING_NONE)
+		if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)) != 0)
+			err = EINTR;
+
 	if (err) {
 		if (err == EINTR && ba.err)
 			err = ba.err;
@@ -299,8 +420,9 @@
 	bzero(drr, sizeof (dmu_replay_record_t));
 	drr->drr_type = DRR_END;
 	drr->drr_u.drr_end.drr_checksum = ba.zc;
+	drr->drr_u.drr_end.drr_toguid = ba.toguid;
 
-	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) {
+	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)) != 0) {
 		kmem_free(drr, sizeof (dmu_replay_record_t));
 		return (ba.err);
 	}
@@ -459,13 +581,13 @@
  * succeeds; otherwise we will leak the holds on the datasets.
  */
 int
-dmu_recv_begin(char *tofs, char *tosnap, struct drr_begin *drrb,
+dmu_recv_begin(char *tofs, char *tosnap, char *top_ds, struct drr_begin *drrb,
     boolean_t force, objset_t *origin, dmu_recv_cookie_t *drc)
 {
 	int err = 0;
 	boolean_t byteswap;
 	struct recvbeginsyncarg rbsa = { 0 };
-	uint64_t version;
+	uint64_t versioninfo;
 	int flags;
 	dsl_dataset_t *ds;
 
@@ -483,17 +605,17 @@
 	rbsa.type = drrb->drr_type;
 	rbsa.tag = FTAG;
 	rbsa.dsflags = 0;
-	version = drrb->drr_version;
+	versioninfo = drrb->drr_versioninfo;
 	flags = drrb->drr_flags;
 
 	if (byteswap) {
 		rbsa.type = BSWAP_32(rbsa.type);
 		rbsa.fromguid = BSWAP_64(rbsa.fromguid);
-		version = BSWAP_64(version);
+		versioninfo = BSWAP_64(versioninfo);
 		flags = BSWAP_32(flags);
 	}
 
-	if (version != DMU_BACKUP_STREAM_VERSION ||
+	if (DMU_GET_STREAM_HDRTYPE(versioninfo) == DMU_COMPOUNDSTREAM ||
 	    rbsa.type >= DMU_OST_NUMTYPES ||
 	    ((flags & DRR_FLAG_CLONE) && origin == NULL))
 		return (EINVAL);
@@ -504,6 +626,7 @@
 	bzero(drc, sizeof (dmu_recv_cookie_t));
 	drc->drc_drrb = drrb;
 	drc->drc_tosnap = tosnap;
+	drc->drc_top_ds = top_ds;
 	drc->drc_force = force;
 
 	/*
@@ -579,8 +702,85 @@
 	uint64_t voff;
 	int bufsize; /* amount of memory allocated for buf */
 	zio_cksum_t cksum;
+	avl_tree_t guid_to_ds_map;
 };
 
+typedef struct guid_map_entry {
+	uint64_t	guid;
+	dsl_dataset_t	*gme_ds;
+	avl_node_t	avlnode;
+} guid_map_entry_t;
+
+static int
+guid_compare(const void *arg1, const void *arg2)
+{
+	const guid_map_entry_t *gmep1 = arg1;
+	const guid_map_entry_t *gmep2 = arg2;
+
+	if (gmep1->guid < gmep2->guid)
+		return (-1);
+	else if (gmep1->guid > gmep2->guid)
+		return (1);
+	return (0);
+}
+
+/*
+ * This function is a callback used by dmu_objset_find() (which
+ * enumerates the object sets) to build an avl tree that maps guids
+ * to datasets.  The resulting table is used when processing DRR_WRITE_BYREF
+ * send stream records.  These records, which are used in dedup'ed
+ * streams, do not contain data themselves, but refer to a copy
+ * of the data block that has already been written because it was
+ * earlier in the stream.  That previous copy is identified by the
+ * guid of the dataset with the referenced data.
+ */
+int
+find_ds_by_guid(char *name, void *arg)
+{
+	dsl_dataset_t *ds, *snapds;
+	avl_tree_t *guid_map = arg;
+	guid_map_entry_t *gmep;
+	guid_map_entry_t gmesrch;
+	dsl_pool_t	*dp;
+	int err;
+	uint64_t lastobj, firstobj;
+
+	if (dsl_dataset_hold(name, FTAG, &ds) != 0)
+		return (0);
+
+	dp = ds->ds_dir->dd_pool;
+	rw_enter(&dp->dp_config_rwlock, RW_READER);
+	firstobj = ds->ds_dir->dd_phys->dd_origin_obj;
+	lastobj = ds->ds_phys->ds_prev_snap_obj;
+
+	while (lastobj != firstobj) {
+		err = dsl_dataset_hold_obj(dp, lastobj, guid_map, &snapds);
+		if (err) {
+			/*
+			 * Skip this snapshot and move on. It's not
+			 * clear why this would ever happen, but the
+			 * remainder of the snapshot streadm can be
+			 * processed.
+			 */
+			rw_exit(&dp->dp_config_rwlock);
+			dsl_dataset_rele(ds, FTAG);
+			return (0);
+		}
+
+		gmesrch.guid = snapds->ds_phys->ds_guid;
+		gmep = kmem_alloc(sizeof (guid_map_entry_t), KM_SLEEP);
+		gmep->guid = snapds->ds_phys->ds_guid;
+		gmep->gme_ds = snapds;
+		avl_add(guid_map, gmep);
+		lastobj = snapds->ds_phys->ds_prev_snap_obj;
+	}
+
+	rw_exit(&dp->dp_config_rwlock);
+	dsl_dataset_rele(ds, FTAG);
+
+	return (0);
+}
+
 static void *
 restore_read(struct restorearg *ra, int len)
 {
@@ -625,7 +825,7 @@
 	switch (drr->drr_type) {
 	case DRR_BEGIN:
 		DO64(drr_begin.drr_magic);
-		DO64(drr_begin.drr_version);
+		DO64(drr_begin.drr_versioninfo);
 		DO64(drr_begin.drr_creation_time);
 		DO32(drr_begin.drr_type);
 		DO32(drr_begin.drr_flags);
@@ -639,27 +839,49 @@
 		DO32(drr_object.drr_bonustype);
 		DO32(drr_object.drr_blksz);
 		DO32(drr_object.drr_bonuslen);
+		DO64(drr_object.drr_toguid);
 		break;
 	case DRR_FREEOBJECTS:
 		DO64(drr_freeobjects.drr_firstobj);
 		DO64(drr_freeobjects.drr_numobjs);
+		DO64(drr_freeobjects.drr_toguid);
 		break;
 	case DRR_WRITE:
 		DO64(drr_write.drr_object);
 		DO32(drr_write.drr_type);
 		DO64(drr_write.drr_offset);
 		DO64(drr_write.drr_length);
+		DO64(drr_write.drr_toguid);
+		DO64(drr_write.drr_blkcksum.zc_word[0]);
+		DO64(drr_write.drr_blkcksum.zc_word[1]);
+		DO64(drr_write.drr_blkcksum.zc_word[2]);
+		DO64(drr_write.drr_blkcksum.zc_word[3]);
+		break;
+	case DRR_WRITE_BYREF:
+		DO64(drr_write_byref.drr_object);
+		DO64(drr_write_byref.drr_offset);
+		DO64(drr_write_byref.drr_length);
+		DO64(drr_write_byref.drr_toguid);
+		DO64(drr_write_byref.drr_refguid);
+		DO64(drr_write_byref.drr_refobject);
+		DO64(drr_write_byref.drr_refoffset);
+		DO64(drr_write_byref.drr_blkcksum.zc_word[0]);
+		DO64(drr_write_byref.drr_blkcksum.zc_word[1]);
+		DO64(drr_write_byref.drr_blkcksum.zc_word[2]);
+		DO64(drr_write_byref.drr_blkcksum.zc_word[3]);
 		break;
 	case DRR_FREE:
 		DO64(drr_free.drr_object);
 		DO64(drr_free.drr_offset);
 		DO64(drr_free.drr_length);
+		DO64(drr_free.drr_toguid);
 		break;
 	case DRR_END:
 		DO64(drr_end.drr_checksum.zc_word[0]);
 		DO64(drr_end.drr_checksum.zc_word[1]);
 		DO64(drr_end.drr_checksum.zc_word[2]);
 		DO64(drr_end.drr_checksum.zc_word[3]);
+		DO64(drr_end.drr_toguid);
 		break;
 	}
 #undef DO64
@@ -676,7 +898,7 @@
 	if (drro->drr_type == DMU_OT_NONE ||
 	    drro->drr_type >= DMU_OT_NUMTYPES ||
 	    drro->drr_bonustype >= DMU_OT_NUMTYPES ||
-	    drro->drr_checksum >= ZIO_CHECKSUM_FUNCTIONS ||
+	    drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
 	    drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
 	    P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
 	    drro->drr_blksz < SPA_MINBLOCKSIZE ||
@@ -726,7 +948,8 @@
 		return (err);
 	}
 
-	dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksum, tx);
+	dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksumtype,
+	    tx);
 	dmu_object_set_compress(os, drro->drr_object, drro->drr_compress, tx);
 
 	if (data != NULL) {
@@ -808,6 +1031,64 @@
 	return (0);
 }
 
+/*
+ * Handle a DRR_WRITE_BYREF record.  This record is used in dedup'ed
+ * streams to refer to a copy of the data that is already on the
+ * system because it came in earlier in the stream.  This function
+ * finds the earlier copy of the data, and uses that copy instead of
+ * data from the stream to fulfill this write.
+ */
+static int
+restore_write_byref(struct restorearg *ra, objset_t *os,
+    struct drr_write_byref *drrwbr)
+{
+	dmu_tx_t *tx;
+	int err;
+	guid_map_entry_t gmesrch;
+	guid_map_entry_t *gmep;
+	avl_index_t	where;
+	objset_t *ref_os = NULL;
+	dmu_buf_t *dbp;
+
+	if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
+		return (EINVAL);
+
+	/*
+	 * If the GUID of the referenced dataset is different from the
+	 * GUID of the target dataset, find the referenced dataset.
+	 */
+	if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
+		gmesrch.guid = drrwbr->drr_refguid;
+		if ((gmep = avl_find(&ra->guid_to_ds_map, &gmesrch,
+		    &where)) == NULL) {
+			return (EINVAL);
+		}
+		if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
+			return (EINVAL);
+	} else {
+		ref_os = os;
+	}
+
+	if (err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
+	    drrwbr->drr_refoffset, FTAG, &dbp))
+		return (err);
+
+	tx = dmu_tx_create(os);
+
+	dmu_tx_hold_write(tx, drrwbr->drr_object,
+	    drrwbr->drr_offset, drrwbr->drr_length);
+	err = dmu_tx_assign(tx, TXG_WAIT);
+	if (err) {
+		dmu_tx_abort(tx);
+		return (err);
+	}
+	dmu_write(os, drrwbr->drr_object,
+	    drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
+	dmu_buf_rele(dbp, FTAG);
+	dmu_tx_commit(tx);
+	return (0);
+}
+
 /* ARGSUSED */
 static int
 restore_free(struct restorearg *ra, objset_t *os,
@@ -837,6 +1118,8 @@
 	dmu_replay_record_t *drr;
 	objset_t *os;
 	zio_cksum_t pcksum;
+	guid_map_entry_t *gmep;
+	int featureflags;
 
 	if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC))
 		ra.byteswap = TRUE;
@@ -861,7 +1144,7 @@
 	if (ra.byteswap) {
 		struct drr_begin *drrb = drc->drc_drrb;
 		drrb->drr_magic = BSWAP_64(drrb->drr_magic);
-		drrb->drr_version = BSWAP_64(drrb->drr_version);
+		drrb->drr_versioninfo = BSWAP_64(drrb->drr_versioninfo);
 		drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
 		drrb->drr_type = BSWAP_32(drrb->drr_type);
 		drrb->drr_toguid = BSWAP_64(drrb->drr_toguid);
@@ -874,7 +1157,8 @@
 	ra.buf = kmem_alloc(ra.bufsize, KM_SLEEP);
 
 	/* these were verified in dmu_recv_begin */
-	ASSERT(drc->drc_drrb->drr_version == DMU_BACKUP_STREAM_VERSION);
+	ASSERT(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo) ==
+	    DMU_SUBSTREAM);
 	ASSERT(drc->drc_drrb->drr_type < DMU_OST_NUMTYPES);
 
 	/*
@@ -884,6 +1168,18 @@
 
 	ASSERT(drc->drc_real_ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT);
 
+	featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
+
+	/* if this stream is dedup'ed, set up the avl tree for guid mapping */
+	if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
+		avl_create(&ra.guid_to_ds_map, guid_compare,
+		    sizeof (guid_map_entry_t),
+		    offsetof(guid_map_entry_t, avlnode));
+		(void) dmu_objset_find(drc->drc_top_ds, find_ds_by_guid,
+		    (void *)&ra.guid_to_ds_map,
+		    DS_FIND_CHILDREN);
+	}
+
 	/*
 	 * Read records and process them.
 	 */
@@ -923,6 +1219,13 @@
 			ra.err = restore_write(&ra, os, &drrw);
 			break;
 		}
+		case DRR_WRITE_BYREF:
+		{
+			struct drr_write_byref drrwbr =
+			    drr->drr_u.drr_write_byref;
+			ra.err = restore_write_byref(&ra, os, &drrwbr);
+			break;
+		}
 		case DRR_FREE:
 		{
 			struct drr_free drrf = drr->drr_u.drr_free;
@@ -965,6 +1268,16 @@
 		}
 	}
 
+	if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
+		void *cookie = NULL;
+
+		while (gmep = avl_destroy_nodes(&ra.guid_to_ds_map, &cookie)) {
+			dsl_dataset_rele(gmep->gme_ds, &ra.guid_to_ds_map);
+			kmem_free(gmep, sizeof (guid_map_entry_t));
+		}
+		avl_destroy(&ra.guid_to_ds_map);
+	}
+
 	kmem_free(ra.buf, ra.bufsize);
 	*voffp = ra.voff;
 	return (ra.err);