PSARC/2009/557 ZFS send dedup
authorLori Alt <Lori.Alt@Sun.COM>
Mon, 09 Nov 2009 11:04:55 -0700
changeset 11007 216d8396182e
parent 11006 4fe66eb82610
child 11008 a22f9e01151a
PSARC/2009/557 ZFS send dedup 6812638 zfs send intra-stream dedup 6887817 want snapshot filtering for zfs send 6812603 zfs send can aggregate free records
usr/src/cmd/zfs/zfs_main.c
usr/src/cmd/zstreamdump/zstreamdump.c
usr/src/lib/libzfs/Makefile.com
usr/src/lib/libzfs/common/libzfs.h
usr/src/lib/libzfs/common/libzfs_sendrecv.c
usr/src/lib/libzfs/common/libzfs_util.c
usr/src/uts/common/fs/zfs/dmu_send.c
usr/src/uts/common/fs/zfs/sys/dmu.h
usr/src/uts/common/fs/zfs/sys/zfs_ioctl.h
usr/src/uts/common/fs/zfs/zfs_ioctl.c
--- a/usr/src/cmd/zfs/zfs_main.c	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/cmd/zfs/zfs_main.c	Mon Nov 09 11:04:55 2009 -0700
@@ -232,7 +232,7 @@
 	case HELP_ROLLBACK:
 		return (gettext("\trollback [-rRf] <snapshot>\n"));
 	case HELP_SEND:
-		return (gettext("\tsend [-R] [-[iI] snapshot] <snapshot>\n"));
+		return (gettext("\tsend [-RD] [-[iI] snapshot] <snapshot>\n"));
 	case HELP_SET:
 		return (gettext("\tset <property=value> "
 		    "<filesystem|volume|snapshot> ...\n"));
@@ -2488,8 +2488,8 @@
 }
 
 /*
- * zfs send [-v] -R [-i|-I <@snap>] <fs@snap>
- * zfs send [-v] [-i|-I <@snap>] <fs@snap>
+ * zfs send [-vD] -R [-i|-I <@snap>] <fs@snap>
+ * zfs send [-vD] [-i|-I <@snap>] <fs@snap>
  *
  * Send a backup stream to stdout.
  */
@@ -2500,14 +2500,11 @@
 	char *toname = NULL;
 	char *cp;
 	zfs_handle_t *zhp;
-	boolean_t doall = B_FALSE;
-	boolean_t replicate = B_FALSE;
-	boolean_t fromorigin = B_FALSE;
-	boolean_t verbose = B_FALSE;
+	sendflags_t flags = { 0 };
 	int c, err;
 
 	/* check options */
-	while ((c = getopt(argc, argv, ":i:I:Rv")) != -1) {
+	while ((c = getopt(argc, argv, ":i:I:RDv")) != -1) {
 		switch (c) {
 		case 'i':
 			if (fromname)
@@ -2518,13 +2515,16 @@
 			if (fromname)
 				usage(B_FALSE);
 			fromname = optarg;
-			doall = B_TRUE;
+			flags.doall = B_TRUE;
 			break;
 		case 'R':
-			replicate = B_TRUE;
+			flags.replicate = B_TRUE;
 			break;
 		case 'v':
-			verbose = B_TRUE;
+			flags.verbose = B_TRUE;
+			break;
+		case 'D':
+			flags.dedup = B_TRUE;
 			break;
 		case ':':
 			(void) fprintf(stderr, gettext("missing argument for "
@@ -2584,7 +2584,7 @@
 
 		if (strcmp(origin, fromname) == 0) {
 			fromname = NULL;
-			fromorigin = B_TRUE;
+			flags.fromorigin = B_TRUE;
 		} else {
 			*cp = '\0';
 			if (cp != fromname && strcmp(argv[0], fromname)) {
@@ -2602,11 +2602,10 @@
 		}
 	}
 
-	if (replicate && fromname == NULL)
-		doall = B_TRUE;
-
-	err = zfs_send(zhp, fromname, toname, replicate, doall, fromorigin,
-	    verbose, STDOUT_FILENO);
+	if (flags.replicate && fromname == NULL)
+		flags.doall = B_TRUE;
+
+	err = zfs_send(zhp, fromname, toname, flags, STDOUT_FILENO, NULL, 0);
 	zfs_close(zhp);
 
 	return (err != 0);
--- a/usr/src/cmd/zstreamdump/zstreamdump.c	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/cmd/zstreamdump/zstreamdump.c	Mon Nov 09 11:04:55 2009 -0700
@@ -86,6 +86,7 @@
 	struct drr_object *drro = &thedrr.drr_u.drr_object;
 	struct drr_freeobjects *drrfo = &thedrr.drr_u.drr_freeobjects;
 	struct drr_write *drrw = &thedrr.drr_u.drr_write;
+	struct drr_write_byref *drrwbr = &thedrr.drr_u.drr_write_byref;
 	struct drr_free *drrf = &thedrr.drr_u.drr_free;
 	char c;
 	boolean_t verbose = B_FALSE;
@@ -172,7 +173,8 @@
 		case DRR_BEGIN:
 			if (do_byteswap) {
 				drrb->drr_magic = BSWAP_64(drrb->drr_magic);
-				drrb->drr_version = BSWAP_64(drrb->drr_version);
+				drrb->drr_versioninfo =
+				    BSWAP_64(drrb->drr_versioninfo);
 				drrb->drr_creation_time =
 				    BSWAP_64(drrb->drr_creation_time);
 				drrb->drr_type = BSWAP_32(drrb->drr_type);
@@ -183,8 +185,10 @@
 			}
 
 			(void) printf("BEGIN record\n");
-			(void) printf("\tversion = %llx\n",
-			    (u_longlong_t)drrb->drr_version);
+			(void) printf("\thdrtype = %lld\n",
+			    DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo));
+			(void) printf("\tfeatures = %llx\n",
+			    DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo));
 			(void) printf("\tmagic = %llx\n",
 			    (u_longlong_t)drrb->drr_magic);
 			(void) printf("\tcreation_time = %llx\n",
@@ -199,8 +203,8 @@
 			if (verbose)
 				(void) printf("\n");
 
-			if (drrb->drr_version == 2 &&
-			    drr->drr_payloadlen != 0) {
+			if ((DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
+			    DMU_COMPOUNDSTREAM) && drr->drr_payloadlen != 0) {
 				nvlist_t *nv;
 				int sz = drr->drr_payloadlen;
 
@@ -264,6 +268,7 @@
 				drro->drr_blksz = BSWAP_32(drro->drr_blksz);
 				drro->drr_bonuslen =
 				    BSWAP_32(drro->drr_bonuslen);
+				drro->drr_toguid = BSWAP_64(drro->drr_toguid);
 			}
 			if (verbose) {
 				(void) printf("OBJECT object = %llu type = %u "
@@ -286,6 +291,7 @@
 				    BSWAP_64(drrfo->drr_firstobj);
 				drrfo->drr_numobjs =
 				    BSWAP_64(drrfo->drr_numobjs);
+				drrfo->drr_toguid = BSWAP_64(drrfo->drr_toguid);
 			}
 			if (verbose) {
 				(void) printf("FREEOBJECTS firstobj = %llu "
@@ -301,6 +307,7 @@
 				drrw->drr_type = BSWAP_32(drrw->drr_type);
 				drrw->drr_offset = BSWAP_64(drrw->drr_offset);
 				drrw->drr_length = BSWAP_64(drrw->drr_length);
+				drrw->drr_toguid = BSWAP_64(drrw->drr_toguid);
 			}
 			if (verbose) {
 				(void) printf("WRITE object = %llu type = %u "
@@ -314,6 +321,38 @@
 			total_write_size += drrw->drr_length;
 			break;
 
+		case DRR_WRITE_BYREF:
+			if (do_byteswap) {
+				drrwbr->drr_object =
+				    BSWAP_64(drrwbr->drr_object);
+				drrwbr->drr_offset =
+				    BSWAP_64(drrwbr->drr_offset);
+				drrwbr->drr_length =
+				    BSWAP_64(drrwbr->drr_length);
+				drrwbr->drr_toguid =
+				    BSWAP_64(drrwbr->drr_toguid);
+				drrwbr->drr_refguid =
+				    BSWAP_64(drrwbr->drr_refguid);
+				drrwbr->drr_refobject =
+				    BSWAP_64(drrwbr->drr_refobject);
+				drrwbr->drr_refoffset =
+				    BSWAP_64(drrwbr->drr_refoffset);
+			}
+			if (verbose) {
+				(void) printf("WRITE_BYREF object = %llu "
+				    "offset = %llu length = %llu\n"
+				    "toguid = %llx refguid = %llx\n"
+				    "refobject = %llu refoffset = %llu\n",
+				    (u_longlong_t)drrwbr->drr_object,
+				    (u_longlong_t)drrwbr->drr_offset,
+				    (u_longlong_t)drrwbr->drr_length,
+				    (u_longlong_t)drrwbr->drr_toguid,
+				    (u_longlong_t)drrwbr->drr_refguid,
+				    (u_longlong_t)drrwbr->drr_refobject,
+				    (u_longlong_t)drrwbr->drr_refoffset);
+			}
+			break;
+
 		case DRR_FREE:
 			if (do_byteswap) {
 				drrf->drr_object = BSWAP_64(drrf->drr_object);
--- a/usr/src/lib/libzfs/Makefile.com	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/lib/libzfs/Makefile.com	Mon Nov 09 11:04:55 2009 -0700
@@ -67,7 +67,7 @@
 C99MODE=	-xc99=%all
 C99LMODE=	-Xc99=%all
 LDLIBS +=	-lc -lm -ldevid -lgen -lnvpair -luutil -lavl -lefi \
-	-ladm -lidmap -ltsol
+	-ladm -lidmap -ltsol -lmd -lumem
 CPPFLAGS +=	$(INCS) -D_REENTRANT
 
 SRCS=	$(OBJS_COMMON:%.o=$(SRCDIR)/%.c)	\
--- a/usr/src/lib/libzfs/common/libzfs.h	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/lib/libzfs/common/libzfs.h	Mon Nov 09 11:04:55 2009 -0700
@@ -118,6 +118,8 @@
 	EZFS_REFTAG_RELE,	/* snapshot release: tag not found */
 	EZFS_REFTAG_HOLD,	/* snapshot hold: tag already exists */
 	EZFS_TAGTOOLONG,	/* snapshot hold/rele: tag too long */
+	EZFS_PIPEFAILED,	/* pipe create failed */
+	EZFS_THREADCREATEFAILED, /* thread create failed */
 	EZFS_UNKNOWN
 };
 
@@ -470,8 +472,29 @@
 extern int zfs_snapshot(libzfs_handle_t *, const char *, boolean_t, nvlist_t *);
 extern int zfs_rollback(zfs_handle_t *, zfs_handle_t *, boolean_t);
 extern int zfs_rename(zfs_handle_t *, const char *, boolean_t);
+
+typedef struct sendflags {
+	/* print informational messages (ie, -v was specified) */
+	int verbose : 1;
+
+	/* recursive send  (i.e., -R) */
+	int replicate : 1;
+
+	/* for incrementals, do all intermediate snapshots */
+	int doall : 1; /* (i.e., -I) */
+
+	/* if dataset is a clone, do incremental from its origin */
+	int fromorigin : 1;
+
+	/* do deduplication */
+	int dedup : 1;
+} sendflags_t;
+
+typedef boolean_t (snapfilter_cb_t)(zfs_handle_t *, void *);
+
 extern int zfs_send(zfs_handle_t *, const char *, const char *,
-    boolean_t, boolean_t, boolean_t, boolean_t, int);
+    sendflags_t, int, snapfilter_cb_t, void *);
+
 extern int zfs_promote(zfs_handle_t *);
 extern int zfs_hold(zfs_handle_t *, const char *, const char *, boolean_t,
     boolean_t);
--- a/usr/src/lib/libzfs/common/libzfs_sendrecv.c	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/lib/libzfs/common/libzfs_sendrecv.c	Mon Nov 09 11:04:55 2009 -0700
@@ -35,6 +35,8 @@
 #include <stddef.h>
 #include <fcntl.h>
 #include <sys/mount.h>
+#include <pthread.h>
+#include <umem.h>
 
 #include <libzfs.h>
 
@@ -42,10 +44,351 @@
 #include "zfs_prop.h"
 #include "zfs_fletcher.h"
 #include "libzfs_impl.h"
+#include <sha2.h>
 
 static int zfs_receive_impl(libzfs_handle_t *, const char *, recvflags_t,
     int, avl_tree_t *, char **);
 
+static const zio_cksum_t zero_cksum = { 0 };
+
+typedef struct dedup_arg {
+	int	inputfd;
+	int	outputfd;
+	libzfs_handle_t  *dedup_hdl;
+} dedup_arg_t;
+
+typedef struct dataref {
+	uint64_t ref_guid;
+	uint64_t ref_object;
+	uint64_t ref_offset;
+} dataref_t;
+
+typedef struct dedup_entry {
+	struct dedup_entry	*dde_next;
+	zio_cksum_t dde_chksum;
+	dataref_t dde_ref;
+} dedup_entry_t;
+
+#define	MAX_DDT_PHYSMEM_PERCENT		20
+#define	SMALLEST_POSSIBLE_MAX_DDT_MB		128
+
+typedef struct dedup_table {
+	dedup_entry_t	**dedup_hash_array;
+	umem_cache_t	*ddecache;
+	uint64_t	max_ddt_size;  /* max dedup table size in bytes */
+	uint64_t	cur_ddt_size;  /* current dedup table size in bytes */
+	uint64_t	ddt_count;
+	int		numhashbits;
+	boolean_t	ddt_full;
+} dedup_table_t;
+
+static int
+high_order_bit(uint64_t n)
+{
+	int count;
+
+	for (count = 0; n != 0; count++)
+		n >>= 1;
+	return (count);
+}
+
+static size_t
+ssread(void *buf, size_t len, FILE *stream)
+{
+	size_t outlen;
+
+	if ((outlen = fread(buf, len, 1, stream)) == 0)
+		return (0);
+
+	return (outlen);
+}
+
+static void
+ddt_hash_append(libzfs_handle_t *hdl, dedup_table_t *ddt, dedup_entry_t **ddepp,
+    zio_cksum_t *cs, dataref_t *dr)
+{
+	dedup_entry_t	*dde;
+
+	if (ddt->cur_ddt_size >= ddt->max_ddt_size) {
+		if (ddt->ddt_full == B_FALSE) {
+			zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+			    "Dedup table full.  Deduplication will continue "
+			    "with existing table entries"));
+			ddt->ddt_full = B_TRUE;
+		}
+		return;
+	}
+
+	if ((dde = umem_cache_alloc(ddt->ddecache, UMEM_DEFAULT))
+	    != NULL) {
+		assert(*ddepp == NULL);
+		dde->dde_next = NULL;
+		dde->dde_chksum = *cs;
+		dde->dde_ref = *dr;
+		*ddepp = dde;
+		ddt->cur_ddt_size += sizeof (dedup_entry_t);
+		ddt->ddt_count++;
+	}
+}
+
+/*
+ * Using the specified dedup table, do a lookup for an entry with
+ * the checksum cs.  If found, return the block's reference info
+ * in *dr. Otherwise, insert a new entry in the dedup table, using
+ * the reference information specified by *dr.
+ *
+ * return value:  true - entry was found
+ *		  false - entry was not found
+ */
+static boolean_t
+ddt_update(libzfs_handle_t *hdl, dedup_table_t *ddt, zio_cksum_t *cs,
+    dataref_t *dr)
+{
+	uint32_t hashcode;
+	dedup_entry_t **ddepp;
+
+	hashcode = BF64_GET(cs->zc_word[0], 0, ddt->numhashbits);
+
+	for (ddepp = &(ddt->dedup_hash_array[hashcode]); *ddepp != NULL;
+	    ddepp = &((*ddepp)->dde_next)) {
+		if (ZIO_CHECKSUM_EQUAL(((*ddepp)->dde_chksum), *cs)) {
+			*dr = (*ddepp)->dde_ref;
+			return (B_TRUE);
+		}
+	}
+	ddt_hash_append(hdl, ddt, ddepp, cs, dr);
+	return (B_FALSE);
+}
+
+static int
+cksum_and_write(const void *buf, uint64_t len, zio_cksum_t *zc, int outfd)
+{
+	fletcher_4_incremental_native(buf, len, zc);
+	return (write(outfd, buf, len));
+}
+
+/*
+ * This function is started in a separate thread when the dedup option
+ * has been requested.  The main send thread determines the list of
+ * snapshots to be included in the send stream and makes the ioctl calls
+ * for each one.  But instead of having the ioctl send the output to the
+ * the output fd specified by the caller of zfs_send()), the
+ * ioctl is told to direct the output to a pipe, which is read by the
+ * alternate thread running THIS function.  This function does the
+ * dedup'ing by:
+ *  1. building a dedup table (the DDT)
+ *  2. doing checksums on each data block and inserting a record in the DDT
+ *  3. looking for matching checksums, and
+ *  4.  sending a DRR_WRITE_BYREF record instead of a write record whenever
+ *      a duplicate block is found.
+ * The output of this function then goes to the output fd requested
+ * by the caller of zfs_send().
+ */
+static void *
+cksummer(void *arg)
+{
+	dedup_arg_t *dda = arg;
+	char *buf = malloc(1<<20);
+	dmu_replay_record_t thedrr;
+	dmu_replay_record_t *drr = &thedrr;
+	struct drr_begin *drrb = &thedrr.drr_u.drr_begin;
+	struct drr_end *drre = &thedrr.drr_u.drr_end;
+	struct drr_object *drro = &thedrr.drr_u.drr_object;
+	struct drr_write *drrw = &thedrr.drr_u.drr_write;
+	FILE *ofp;
+	int outfd;
+	dmu_replay_record_t wbr_drr;
+	struct drr_write_byref *wbr_drrr = &wbr_drr.drr_u.drr_write_byref;
+	dedup_table_t ddt;
+	zio_cksum_t stream_cksum;
+	uint64_t physmem = sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE);
+	uint64_t numbuckets;
+
+	ddt.max_ddt_size =
+	    MAX((physmem * MAX_DDT_PHYSMEM_PERCENT)/100,
+	    SMALLEST_POSSIBLE_MAX_DDT_MB<<20);
+
+	numbuckets = ddt.max_ddt_size/(sizeof (dedup_entry_t));
+
+	/*
+	 * numbuckets must be a power of 2.  Increase number to
+	 * a power of 2 if necessary.
+	 */
+	if (!ISP2(numbuckets))
+		numbuckets = 1 << high_order_bit(numbuckets);
+
+	ddt.dedup_hash_array = calloc(numbuckets, sizeof (dedup_entry_t *));
+	ddt.ddecache = umem_cache_create("dde", sizeof (dedup_entry_t), 0,
+	    NULL, NULL, NULL, NULL, NULL, 0);
+	ddt.cur_ddt_size = numbuckets * sizeof (dedup_entry_t *);
+	ddt.numhashbits = high_order_bit(numbuckets) - 1;
+	ddt.ddt_full = B_FALSE;
+
+	/* Initialize the write-by-reference block. */
+	wbr_drr.drr_type = DRR_WRITE_BYREF;
+	wbr_drr.drr_payloadlen = 0;
+
+	outfd = dda->outputfd;
+	ofp = fdopen(dda->inputfd, "r");
+	while (ssread(drr, sizeof (dmu_replay_record_t), ofp) != 0) {
+
+		switch (drr->drr_type) {
+		case DRR_BEGIN:
+		{
+			int	fflags;
+			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
+
+			/* set the DEDUP feature flag for this stream */
+			fflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
+			fflags |= DMU_BACKUP_FEATURE_DEDUP;
+			DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, fflags);
+
+			if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
+			    &stream_cksum, outfd) == -1)
+				goto out;
+			if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
+			    DMU_COMPOUNDSTREAM && drr->drr_payloadlen != 0) {
+				int sz = drr->drr_payloadlen;
+
+				if (sz > 1<<20) {
+					free(buf);
+					buf = malloc(sz);
+				}
+				(void) ssread(buf, sz, ofp);
+				if (ferror(stdin))
+					perror("fread");
+				if (cksum_and_write(buf, sz, &stream_cksum,
+				    outfd) == -1)
+					goto out;
+			}
+			break;
+		}
+
+		case DRR_END:
+		{
+			/* use the recalculated checksum */
+			ZIO_SET_CHECKSUM(&drre->drr_checksum,
+			    stream_cksum.zc_word[0], stream_cksum.zc_word[1],
+			    stream_cksum.zc_word[2], stream_cksum.zc_word[3]);
+			if ((write(outfd, drr,
+			    sizeof (dmu_replay_record_t))) == -1)
+				goto out;
+			break;
+		}
+
+		case DRR_OBJECT:
+		{
+			if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
+			    &stream_cksum, outfd) == -1)
+				goto out;
+			if (drro->drr_bonuslen > 0) {
+				(void) ssread(buf,
+				    P2ROUNDUP((uint64_t)drro->drr_bonuslen, 8),
+				    ofp);
+				if (cksum_and_write(buf,
+				    P2ROUNDUP((uint64_t)drro->drr_bonuslen, 8),
+				    &stream_cksum, outfd) == -1)
+					goto out;
+			}
+			break;
+		}
+
+		case DRR_FREEOBJECTS:
+		{
+			if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
+			    &stream_cksum, outfd) == -1)
+				goto out;
+			break;
+		}
+
+		case DRR_WRITE:
+		{
+			dataref_t	dataref;
+
+			(void) ssread(buf, drrw->drr_length, ofp);
+			/*
+			 * If the block doesn't already have a dedup
+			 * checksum, calculate one.
+			 */
+			if (ZIO_CHECKSUM_EQUAL(drrw->drr_blkcksum,
+			    zero_cksum)) {
+				SHA256_CTX	ctx;
+				zio_cksum_t	tmpsha256;
+
+				SHA256Init(&ctx);
+				SHA256Update(&ctx, buf, drrw->drr_length);
+				SHA256Final(&tmpsha256, &ctx);
+				drrw->drr_blkcksum.zc_word[0] =
+				    BE_64(tmpsha256.zc_word[0]);
+				drrw->drr_blkcksum.zc_word[1] =
+				    BE_64(tmpsha256.zc_word[1]);
+				drrw->drr_blkcksum.zc_word[2] =
+				    BE_64(tmpsha256.zc_word[2]);
+				drrw->drr_blkcksum.zc_word[3] =
+				    BE_64(tmpsha256.zc_word[3]);
+			}
+
+			dataref.ref_guid = drrw->drr_toguid;
+			dataref.ref_object = drrw->drr_object;
+			dataref.ref_offset = drrw->drr_offset;
+
+			if (ddt_update(dda->dedup_hdl, &ddt,
+			    &drrw->drr_blkcksum, &dataref)) {
+				/* block already present in stream */
+				wbr_drrr->drr_object = drrw->drr_object;
+				wbr_drrr->drr_offset = drrw->drr_offset;
+				wbr_drrr->drr_length = drrw->drr_length;
+				wbr_drrr->drr_toguid = drrw->drr_toguid;
+				wbr_drrr->drr_refguid = dataref.ref_guid;
+				wbr_drrr->drr_refobject =
+				    dataref.ref_object;
+				wbr_drrr->drr_refoffset =
+				    dataref.ref_offset;
+
+				wbr_drrr->drr_blkcksum = drrw->drr_blkcksum;
+
+				if (cksum_and_write(&wbr_drr,
+				    sizeof (dmu_replay_record_t), &stream_cksum,
+				    outfd) == -1)
+					goto out;
+			} else {
+				/* block not previously seen */
+				if (cksum_and_write(drr,
+				    sizeof (dmu_replay_record_t), &stream_cksum,
+				    outfd) == -1)
+					goto out;
+				if (cksum_and_write(buf,
+				    drrw->drr_length,
+				    &stream_cksum, outfd) == -1)
+					goto out;
+			}
+			break;
+		}
+
+		case DRR_FREE:
+		{
+			if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
+			    &stream_cksum, outfd) == -1)
+				goto out;
+			break;
+		}
+
+		default:
+			(void) printf("INVALID record type 0x%x\n",
+			    drr->drr_type);
+			/* should never happen, so assert */
+			assert(B_FALSE);
+		}
+	}
+out:
+	umem_cache_destroy(ddt.ddecache);
+	free(ddt.dedup_hash_array);
+	free(buf);
+	(void) fclose(ofp);
+
+	return (NULL);
+}
+
 /*
  * Routines for dealing with the AVL tree of fs-nvlists
  */
@@ -451,13 +794,15 @@
 	/* these are all just the short snapname (the part after the @) */
 	const char *fromsnap;
 	const char *tosnap;
-	char lastsnap[ZFS_MAXNAMELEN];
+	char prevsnap[ZFS_MAXNAMELEN];
 	boolean_t seenfrom, seento, replicate, doall, fromorigin;
 	boolean_t verbose;
 	int outfd;
 	boolean_t err;
 	nvlist_t *fss;
 	avl_tree_t *fsavl;
+	snapfilter_cb_t *filter_cb;
+	void *filter_cb_arg;
 } send_dump_data_t;
 
 /*
@@ -535,7 +880,7 @@
 	if (sdd->fromsnap && !sdd->seenfrom &&
 	    strcmp(sdd->fromsnap, thissnap) == 0) {
 		sdd->seenfrom = B_TRUE;
-		(void) strcpy(sdd->lastsnap, thissnap);
+		(void) strcpy(sdd->prevsnap, thissnap);
 		zfs_close(zhp);
 		return (0);
 	}
@@ -545,20 +890,38 @@
 		return (0);
 	}
 
+	if (strcmp(sdd->tosnap, thissnap) == 0)
+		sdd->seento = B_TRUE;
+
+	/*
+	 * If a filter function exists, call it to determine whether
+	 * this snapshot will be sent.
+	 */
+	if (sdd->filter_cb != NULL &&
+	    sdd->filter_cb(zhp, sdd->filter_cb_arg) == B_FALSE) {
+		/*
+		 * This snapshot is filtered out.  Don't send it, and don't
+		 * set prevsnap, so it will be as if this snapshot didn't
+		 * exist, and the next accepted snapshot will be sent as
+		 * an incremental from the last accepted one, or as the
+		 * first (and full) snapshot in the case of a replication,
+		 * non-incremental send.
+		 */
+		zfs_close(zhp);
+		return (0);
+	}
+
 	/* send it */
 	if (sdd->verbose) {
 		(void) fprintf(stderr, "sending from @%s to %s\n",
-		    sdd->lastsnap, zhp->zfs_name);
+		    sdd->prevsnap, zhp->zfs_name);
 	}
 
-	err = dump_ioctl(zhp, sdd->lastsnap,
-	    sdd->lastsnap[0] == '\0' && (sdd->fromorigin || sdd->replicate),
+	err = dump_ioctl(zhp, sdd->prevsnap,
+	    sdd->prevsnap[0] == '\0' && (sdd->fromorigin || sdd->replicate),
 	    sdd->outfd);
 
-	if (!sdd->seento && strcmp(sdd->tosnap, thissnap) == 0)
-		sdd->seento = B_TRUE;
-
-	(void) strcpy(sdd->lastsnap, thissnap);
+	(void) strcpy(sdd->prevsnap, thissnap);
 	zfs_close(zhp);
 	return (err);
 }
@@ -598,7 +961,7 @@
 	}
 
 	if (sdd->doall) {
-		sdd->seenfrom = sdd->seento = sdd->lastsnap[0] = 0;
+		sdd->seenfrom = sdd->seento = sdd->prevsnap[0] = 0;
 		if (sdd->fromsnap == NULL || missingfrom)
 			sdd->seenfrom = B_TRUE;
 
@@ -635,10 +998,14 @@
 		if (snapzhp == NULL) {
 			rv = -1;
 		} else {
-			rv = dump_ioctl(snapzhp,
-			    missingfrom ? NULL : sdd->fromsnap,
-			    sdd->fromorigin || missingfrom,
-			    sdd->outfd);
+			if (sdd->filter_cb == NULL ||
+			    sdd->filter_cb(snapzhp, sdd->filter_cb_arg) ==
+			    B_TRUE) {
+				rv = dump_ioctl(snapzhp,
+				    missingfrom ? NULL : sdd->fromsnap,
+				    sdd->fromorigin || missingfrom,
+				    sdd->outfd);
+			}
 			sdd->seento = B_TRUE;
 			zfs_close(snapzhp);
 		}
@@ -678,12 +1045,12 @@
 		origin_nv = fsavl_find(sdd->fsavl, origin_guid, NULL);
 		if (origin_nv &&
 		    nvlist_lookup_boolean(origin_nv, "sent") == ENOENT) {
-			/*
-			 * origin has not been sent yet;
-			 * skip this clone.
-			 */
-			needagain = B_TRUE;
-			continue;
+				/*
+				 * origin has not been sent yet;
+				 * skip this clone.
+				 */
+				needagain = B_TRUE;
+				continue;
 		}
 
 		zhp = zfs_open(rzhp->zfs_hdl, fsname, ZFS_TYPE_DATASET);
@@ -714,15 +1081,15 @@
  *	 is TRUE.
  *
  * The send stream is recursive (i.e. dumps a hierarchy of snapshots) and
- * uses a special header (with a version field of DMU_BACKUP_HEADER_VERSION)
+ * uses a special header (with a hdrtype field of DMU_COMPOUNDSTREAM)
  * if "replicate" is set.  If "doall" is set, dump all the intermediate
- * snapshots. The DMU_BACKUP_HEADER_VERSION header is used in the "doall"
+ * snapshots. The DMU_COMPOUNDSTREAM header is used in the "doall"
  * case too.
  */
 int
 zfs_send(zfs_handle_t *zhp, const char *fromsnap, const char *tosnap,
-    boolean_t replicate, boolean_t doall, boolean_t fromorigin,
-    boolean_t verbose, int outfd)
+    sendflags_t flags, int outfd, snapfilter_cb_t filter_func,
+    void *cb_arg)
 {
 	char errbuf[1024];
 	send_dump_data_t sdd = { 0 };
@@ -733,6 +1100,10 @@
 	static uint64_t holdseq;
 	int spa_version;
 	boolean_t holdsnaps = B_FALSE;
+	pthread_t tid;
+	int pipefd[2];
+	dedup_arg_t dda = { 0 };
+	int featureflags = 0;
 
 	(void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
 	    "cannot send '%s'"), zhp->zfs_name);
@@ -747,13 +1118,32 @@
 	    spa_version >= SPA_VERSION_USERREFS)
 		holdsnaps = B_TRUE;
 
-	if (replicate || doall) {
+	if (flags.dedup) {
+		featureflags |= DMU_BACKUP_FEATURE_DEDUP;
+		if (err = pipe(pipefd)) {
+			zfs_error_aux(zhp->zfs_hdl, strerror(errno));
+			return (zfs_error(zhp->zfs_hdl, EZFS_PIPEFAILED,
+			    errbuf));
+		}
+		dda.outputfd = outfd;
+		dda.inputfd = pipefd[1];
+		dda.dedup_hdl = zhp->zfs_hdl;
+		if (err = pthread_create(&tid, NULL, cksummer, &dda)) {
+			(void) close(pipefd[0]);
+			(void) close(pipefd[1]);
+			zfs_error_aux(zhp->zfs_hdl, strerror(errno));
+			return (zfs_error(zhp->zfs_hdl,
+			    EZFS_THREADCREATEFAILED, errbuf));
+		}
+	}
+
+	if (flags.replicate || flags.doall) {
 		dmu_replay_record_t drr = { 0 };
 		char *packbuf = NULL;
 		size_t buflen = 0;
 		zio_cksum_t zc = { 0 };
 
-		assert(fromsnap || doall);
+		assert(fromsnap || flags.doall);
 
 		if (holdsnaps) {
 			(void) snprintf(holdtag, sizeof (holdtag),
@@ -762,9 +1152,11 @@
 			err = zfs_hold_range(zhp, fromsnap, tosnap,
 			    holdtag, B_TRUE);
 			if (err)
-				return (err);
+				goto err_out;
 		}
-		if (replicate) {
+
+
+		if (flags.replicate) {
 			nvlist_t *hdrnv;
 
 			VERIFY(0 == nvlist_alloc(&hdrnv, NV_UNIQUE_NAME, 0));
@@ -781,7 +1173,7 @@
 					(void) zfs_release_range(zhp, fromsnap,
 					    tosnap, holdtag);
 				}
-				return (err);
+				goto err_out;
 			}
 			VERIFY(0 == nvlist_add_nvlist(hdrnv, "fss", fss));
 			err = nvlist_pack(hdrnv, &packbuf, &buflen,
@@ -794,26 +1186,26 @@
 					(void) zfs_release_range(zhp, fromsnap,
 					    tosnap, holdtag);
 				}
-				return (zfs_standard_error(zhp->zfs_hdl,
-				    err, errbuf));
+				goto stderr_out;
 			}
 		}
 
 		/* write first begin record */
 		drr.drr_type = DRR_BEGIN;
 		drr.drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
-		drr.drr_u.drr_begin.drr_version = DMU_BACKUP_HEADER_VERSION;
+		DMU_SET_STREAM_HDRTYPE(drr.drr_u.drr_begin.drr_versioninfo,
+		    DMU_COMPOUNDSTREAM);
+		DMU_SET_FEATUREFLAGS(drr.drr_u.drr_begin.drr_versioninfo,
+		    featureflags);
 		(void) snprintf(drr.drr_u.drr_begin.drr_toname,
 		    sizeof (drr.drr_u.drr_begin.drr_toname),
 		    "%s@%s", zhp->zfs_name, tosnap);
 		drr.drr_payloadlen = buflen;
-		fletcher_4_incremental_native(&drr, sizeof (drr), &zc);
-		err = write(outfd, &drr, sizeof (drr));
+		err = cksum_and_write(&drr, sizeof (drr), &zc, outfd);
 
 		/* write header nvlist */
-		if (err != -1) {
-			fletcher_4_incremental_native(packbuf, buflen, &zc);
-			err = write(outfd, packbuf, buflen);
+		if (err != -1 && flags.replicate) {
+			err = cksum_and_write(packbuf, buflen, &zc, outfd);
 		}
 		free(packbuf);
 		if (err == -1) {
@@ -823,8 +1215,8 @@
 				(void) zfs_release_range(zhp, fromsnap, tosnap,
 				    holdtag);
 			}
-			return (zfs_standard_error(zhp->zfs_hdl,
-			    errno, errbuf));
+			err = errno;
+			goto stderr_out;
 		}
 
 		/* write end record */
@@ -840,8 +1232,8 @@
 					(void) zfs_release_range(zhp, fromsnap,
 					    tosnap, holdtag);
 				}
-				return (zfs_standard_error(zhp->zfs_hdl,
-				    errno, errbuf));
+				err = errno;
+				goto stderr_out;
 			}
 		}
 	}
@@ -849,18 +1241,27 @@
 	/* dump each stream */
 	sdd.fromsnap = fromsnap;
 	sdd.tosnap = tosnap;
-	sdd.outfd = outfd;
-	sdd.replicate = replicate;
-	sdd.doall = doall;
-	sdd.fromorigin = fromorigin;
+	if (flags.dedup)
+		sdd.outfd = pipefd[0];
+	else
+		sdd.outfd = outfd;
+	sdd.replicate = flags.replicate;
+	sdd.doall = flags.doall;
+	sdd.fromorigin = flags.fromorigin;
 	sdd.fss = fss;
 	sdd.fsavl = fsavl;
-	sdd.verbose = verbose;
+	sdd.verbose = flags.verbose;
+	sdd.filter_cb = filter_func;
+	sdd.filter_cb_arg = cb_arg;
 	err = dump_filesystems(zhp, &sdd);
 	fsavl_destroy(fsavl);
 	nvlist_free(fss);
 
-	if (replicate || doall) {
+	if (flags.dedup) {
+		(void) close(pipefd[0]);
+		(void) pthread_join(tid, NULL);
+	}
+	if (flags.replicate || flags.doall) {
 		/*
 		 * write final end record.  NB: want to do this even if
 		 * there was some error, because it might not be totally
@@ -879,6 +1280,16 @@
 	}
 
 	return (err || sdd.err);
+
+stderr_out:
+	err = zfs_standard_error(zhp->zfs_hdl, err, errbuf);
+err_out:
+	if (flags.dedup) {
+		(void) pthread_cancel(tid);
+		(void) pthread_join(tid, NULL);
+		(void) close(pipefd[0]);
+	}
+	return (err);
 }
 
 /*
@@ -1459,7 +1870,8 @@
 
 	assert(drr->drr_type == DRR_BEGIN);
 	assert(drr->drr_u.drr_begin.drr_magic == DMU_BACKUP_MAGIC);
-	assert(drr->drr_u.drr_begin.drr_version == DMU_BACKUP_HEADER_VERSION);
+	assert(DMU_GET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo) ==
+	    DMU_COMPOUNDSTREAM);
 
 	/*
 	 * Read in the nvlist from the stream.
@@ -1582,6 +1994,10 @@
 {
 	dmu_replay_record_t *drr;
 	void *buf = malloc(1<<20);
+	char errbuf[1024];
+
+	(void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
+	    "cannot receive:"));
 
 	/* XXX would be great to use lseek if possible... */
 	drr = buf;
@@ -1594,7 +2010,11 @@
 		switch (drr->drr_type) {
 		case DRR_BEGIN:
 			/* NB: not to be used on v2 stream packages */
-			assert(drr->drr_payloadlen == 0);
+			if (drr->drr_payloadlen != 0) {
+				zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+				    "invalid substream header"));
+				return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
+			}
 			break;
 
 		case DRR_END:
@@ -1621,12 +2041,15 @@
 			    drr->drr_u.drr_write.drr_length, B_FALSE, NULL);
 			break;
 
+		case DRR_WRITE_BYREF:
 		case DRR_FREEOBJECTS:
 		case DRR_FREE:
 			break;
 
 		default:
-			assert(!"invalid record type");
+			zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+			    "invalid record type"));
+			return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
 		}
 	}
 
@@ -1728,6 +2151,7 @@
 	/*
 	 * Determine name of destination snapshot, store in zc_value.
 	 */
+	(void) strcpy(zc.zc_top_ds, tosnap);
 	(void) strcpy(zc.zc_value, tosnap);
 	(void) strncat(zc.zc_value, drrb->drr_toname+choplen,
 	    sizeof (zc.zc_value));
@@ -2068,6 +2492,8 @@
 	struct drr_begin *drrb = &drr.drr_u.drr_begin;
 	char errbuf[1024];
 	zio_cksum_t zcksum = { 0 };
+	uint64_t featureflags;
+	int hdrtype;
 
 	(void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
 	    "cannot receive"));
@@ -2105,7 +2531,7 @@
 		drr.drr_type = BSWAP_32(drr.drr_type);
 		drr.drr_payloadlen = BSWAP_32(drr.drr_payloadlen);
 		drrb->drr_magic = BSWAP_64(drrb->drr_magic);
-		drrb->drr_version = BSWAP_64(drrb->drr_version);
+		drrb->drr_versioninfo = BSWAP_64(drrb->drr_versioninfo);
 		drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
 		drrb->drr_type = BSWAP_32(drrb->drr_type);
 		drrb->drr_flags = BSWAP_32(drrb->drr_flags);
@@ -2119,23 +2545,31 @@
 		return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
 	}
 
+	featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
+	hdrtype = DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo);
+
+	if (!DMU_STREAM_SUPPORTED(featureflags) ||
+	    (hdrtype != DMU_SUBSTREAM && hdrtype != DMU_COMPOUNDSTREAM)) {
+		zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+		    "stream has unsupported feature, feature flags = %lx"),
+		    featureflags);
+		return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
+	}
+
 	if (strchr(drrb->drr_toname, '@') == NULL) {
 		zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, "invalid "
 		    "stream (bad snapshot name)"));
 		return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
 	}
 
-	if (drrb->drr_version == DMU_BACKUP_STREAM_VERSION) {
+	if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == DMU_SUBSTREAM) {
 		return (zfs_receive_one(hdl, infd, tosnap, flags,
 		    &drr, &drr_noswap, stream_avl, top_zfs));
-	} else if (drrb->drr_version == DMU_BACKUP_HEADER_VERSION) {
+	} else {  /* must be DMU_COMPOUNDSTREAM */
+		assert(DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
+		    DMU_COMPOUNDSTREAM);
 		return (zfs_receive_package(hdl, infd, tosnap, flags,
 		    &drr, &zcksum, top_zfs));
-	} else {
-		zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
-		    "stream is unsupported version %llu"),
-		    drrb->drr_version);
-		return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
 	}
 }
 
--- a/usr/src/lib/libzfs/common/libzfs_util.c	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/lib/libzfs/common/libzfs_util.c	Mon Nov 09 11:04:55 2009 -0700
@@ -216,6 +216,10 @@
 		    "dataset"));
 	case EZFS_TAGTOOLONG:
 		return (dgettext(TEXT_DOMAIN, "tag too long"));
+	case EZFS_PIPEFAILED:
+		return (dgettext(TEXT_DOMAIN, "pipe create failed"));
+	case EZFS_THREADCREATEFAILED:
+		return (dgettext(TEXT_DOMAIN, "thread create failed"));
 	case EZFS_UNKNOWN:
 		return (dgettext(TEXT_DOMAIN, "unknown error"));
 	default:
--- a/usr/src/uts/common/fs/zfs/dmu_send.c	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/uts/common/fs/zfs/dmu_send.c	Mon Nov 09 11:04:55 2009 -0700
@@ -38,16 +38,31 @@
 #include <sys/zfs_ioctl.h>
 #include <sys/zap.h>
 #include <sys/zio_checksum.h>
+#include <sys/avl.h>
 
 static char *dmu_recv_tag = "dmu_recv_tag";
 
+/*
+ * The list of data whose inclusion in a send stream can be pending from
+ * one call to backup_cb to another.  Multiple calls to dump_free() and
+ * dump_freeobjects() can be aggregated into a single DRR_FREE or
+ * DRR_FREEOBJECTS replay record.
+ */
+typedef enum {
+	PENDING_NONE,
+	PENDING_FREE,
+	PENDING_FREEOBJECTS
+} pendop_t;
+
 struct backuparg {
 	dmu_replay_record_t *drr;
 	vnode_t *vp;
 	offset_t *off;
 	objset_t *os;
 	zio_cksum_t zc;
+	uint64_t toguid;
 	int err;
+	pendop_t pending_op;
 };
 
 static int
@@ -68,15 +83,59 @@
 dump_free(struct backuparg *ba, uint64_t object, uint64_t offset,
     uint64_t length)
 {
-	/* write a FREE record */
+	struct drr_free *drrf = &(ba->drr->drr_u.drr_free);
+
+	/*
+	 * If there is a pending op, but it's not PENDING_FREE, push it out,
+	 * since free block aggregation can only be done for blocks of the
+	 * same type (i.e., DRR_FREE records can only be aggregated with
+	 * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
+	 * aggregated with other DRR_FREEOBJECTS records.
+	 */
+	if (ba->pending_op != PENDING_NONE && ba->pending_op != PENDING_FREE) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+		ba->pending_op = PENDING_NONE;
+	}
+
+	if (ba->pending_op == PENDING_FREE) {
+		/*
+		 * There should never be a PENDING_FREE if length is -1
+		 * (because dump_dnode is the only place where this
+		 * function is called with a -1, and only after flushing
+		 * any pending record).
+		 */
+		ASSERT(length != -1ULL);
+		/*
+		 * Check to see whether this free block can be aggregated
+		 * with pending one.
+		 */
+		if (drrf->drr_object == object && drrf->drr_offset +
+		    drrf->drr_length == offset) {
+			drrf->drr_length += length;
+			return (0);
+		} else {
+			/* not a continuation.  Push out pending record */
+			if (dump_bytes(ba, ba->drr,
+			    sizeof (dmu_replay_record_t)) != 0)
+				return (EINTR);
+			ba->pending_op = PENDING_NONE;
+		}
+	}
+	/* create a FREE record and make it pending */
 	bzero(ba->drr, sizeof (dmu_replay_record_t));
 	ba->drr->drr_type = DRR_FREE;
-	ba->drr->drr_u.drr_free.drr_object = object;
-	ba->drr->drr_u.drr_free.drr_offset = offset;
-	ba->drr->drr_u.drr_free.drr_length = length;
+	drrf->drr_object = object;
+	drrf->drr_offset = offset;
+	drrf->drr_length = length;
+	drrf->drr_toguid = ba->toguid;
+	if (length == -1ULL) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+	} else {
+		ba->pending_op = PENDING_FREE;
+	}
 
-	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
-		return (EINTR);
 	return (0);
 }
 
@@ -84,17 +143,31 @@
 dump_data(struct backuparg *ba, dmu_object_type_t type,
     uint64_t object, uint64_t offset, int blksz, void *data)
 {
+	struct drr_write *drrw = &(ba->drr->drr_u.drr_write);
+
+	/*
+	 * If there is any kind of pending aggregation (currently either
+	 * a grouping of free objects or free blocks), push it out to
+	 * the stream, since aggregation can't be done across operations
+	 * of different types.
+	 */
+	if (ba->pending_op != PENDING_NONE) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+		ba->pending_op = PENDING_NONE;
+	}
 	/* write a DATA record */
 	bzero(ba->drr, sizeof (dmu_replay_record_t));
 	ba->drr->drr_type = DRR_WRITE;
-	ba->drr->drr_u.drr_write.drr_object = object;
-	ba->drr->drr_u.drr_write.drr_type = type;
-	ba->drr->drr_u.drr_write.drr_offset = offset;
-	ba->drr->drr_u.drr_write.drr_length = blksz;
+	drrw->drr_object = object;
+	drrw->drr_type = type;
+	drrw->drr_offset = offset;
+	drrw->drr_length = blksz;
+	drrw->drr_toguid = ba->toguid;
 
-	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
+	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
 		return (EINTR);
-	if (dump_bytes(ba, data, blksz))
+	if (dump_bytes(ba, data, blksz) != 0)
 		return (EINTR);
 	return (0);
 }
@@ -102,39 +175,80 @@
 static int
 dump_freeobjects(struct backuparg *ba, uint64_t firstobj, uint64_t numobjs)
 {
+	struct drr_freeobjects *drrfo = &(ba->drr->drr_u.drr_freeobjects);
+
+	/*
+	 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
+	 * push it out, since free block aggregation can only be done for
+	 * blocks of the same type (i.e., DRR_FREE records can only be
+	 * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
+	 * can only be aggregated with other DRR_FREEOBJECTS records.
+	 */
+	if (ba->pending_op != PENDING_NONE &&
+	    ba->pending_op != PENDING_FREEOBJECTS) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+		ba->pending_op = PENDING_NONE;
+	}
+	if (ba->pending_op == PENDING_FREEOBJECTS) {
+		/*
+		 * See whether this free object array can be aggregated
+		 * with pending one
+		 */
+		if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
+			drrfo->drr_numobjs += numobjs;
+			return (0);
+		} else {
+			/* can't be aggregated.  Push out pending record */
+			if (dump_bytes(ba, ba->drr,
+			    sizeof (dmu_replay_record_t)) != 0)
+				return (EINTR);
+			ba->pending_op = PENDING_NONE;
+		}
+	}
+
 	/* write a FREEOBJECTS record */
 	bzero(ba->drr, sizeof (dmu_replay_record_t));
 	ba->drr->drr_type = DRR_FREEOBJECTS;
-	ba->drr->drr_u.drr_freeobjects.drr_firstobj = firstobj;
-	ba->drr->drr_u.drr_freeobjects.drr_numobjs = numobjs;
+	drrfo->drr_firstobj = firstobj;
+	drrfo->drr_numobjs = numobjs;
+	drrfo->drr_toguid = ba->toguid;
 
-	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
-		return (EINTR);
+	ba->pending_op = PENDING_FREEOBJECTS;
+
 	return (0);
 }
 
 static int
 dump_dnode(struct backuparg *ba, uint64_t object, dnode_phys_t *dnp)
 {
+	struct drr_object *drro = &(ba->drr->drr_u.drr_object);
+
 	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
 		return (dump_freeobjects(ba, object, 1));
 
+	if (ba->pending_op != PENDING_NONE) {
+		if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
+			return (EINTR);
+		ba->pending_op = PENDING_NONE;
+	}
+
 	/* write an OBJECT record */
 	bzero(ba->drr, sizeof (dmu_replay_record_t));
 	ba->drr->drr_type = DRR_OBJECT;
-	ba->drr->drr_u.drr_object.drr_object = object;
-	ba->drr->drr_u.drr_object.drr_type = dnp->dn_type;
-	ba->drr->drr_u.drr_object.drr_bonustype = dnp->dn_bonustype;
-	ba->drr->drr_u.drr_object.drr_blksz =
-	    dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
-	ba->drr->drr_u.drr_object.drr_bonuslen = dnp->dn_bonuslen;
-	ba->drr->drr_u.drr_object.drr_checksum = dnp->dn_checksum;
-	ba->drr->drr_u.drr_object.drr_compress = dnp->dn_compress;
+	drro->drr_object = object;
+	drro->drr_type = dnp->dn_type;
+	drro->drr_bonustype = dnp->dn_bonustype;
+	drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
+	drro->drr_bonuslen = dnp->dn_bonuslen;
+	drro->drr_checksumtype = dnp->dn_checksum;
+	drro->drr_compress = dnp->dn_compress;
+	drro->drr_toguid = ba->toguid;
 
-	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
+	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0)
 		return (EINTR);
 
-	if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8)))
+	if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0)
 		return (EINTR);
 
 	/* free anything past the end of the file */
@@ -256,7 +370,8 @@
 	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
 	drr->drr_type = DRR_BEGIN;
 	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
-	drr->drr_u.drr_begin.drr_version = DMU_BACKUP_STREAM_VERSION;
+	DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
+	    DMU_SUBSTREAM);
 	drr->drr_u.drr_begin.drr_creation_time =
 	    ds->ds_phys->ds_creation_time;
 	drr->drr_u.drr_begin.drr_type = tosnap->os_phys->os_type;
@@ -279,9 +394,11 @@
 	ba.vp = vp;
 	ba.os = tosnap;
 	ba.off = off;
+	ba.toguid = ds->ds_phys->ds_guid;
 	ZIO_SET_CHECKSUM(&ba.zc, 0, 0, 0, 0);
+	ba.pending_op = PENDING_NONE;
 
-	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) {
+	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)) != 0) {
 		kmem_free(drr, sizeof (dmu_replay_record_t));
 		return (ba.err);
 	}
@@ -289,6 +406,10 @@
 	err = traverse_dataset(ds, fromtxg, TRAVERSE_PRE | TRAVERSE_PREFETCH,
 	    backup_cb, &ba);
 
+	if (ba.pending_op != PENDING_NONE)
+		if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)) != 0)
+			err = EINTR;
+
 	if (err) {
 		if (err == EINTR && ba.err)
 			err = ba.err;
@@ -299,8 +420,9 @@
 	bzero(drr, sizeof (dmu_replay_record_t));
 	drr->drr_type = DRR_END;
 	drr->drr_u.drr_end.drr_checksum = ba.zc;
+	drr->drr_u.drr_end.drr_toguid = ba.toguid;
 
-	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) {
+	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)) != 0) {
 		kmem_free(drr, sizeof (dmu_replay_record_t));
 		return (ba.err);
 	}
@@ -459,13 +581,13 @@
  * succeeds; otherwise we will leak the holds on the datasets.
  */
 int
-dmu_recv_begin(char *tofs, char *tosnap, struct drr_begin *drrb,
+dmu_recv_begin(char *tofs, char *tosnap, char *top_ds, struct drr_begin *drrb,
     boolean_t force, objset_t *origin, dmu_recv_cookie_t *drc)
 {
 	int err = 0;
 	boolean_t byteswap;
 	struct recvbeginsyncarg rbsa = { 0 };
-	uint64_t version;
+	uint64_t versioninfo;
 	int flags;
 	dsl_dataset_t *ds;
 
@@ -483,17 +605,17 @@
 	rbsa.type = drrb->drr_type;
 	rbsa.tag = FTAG;
 	rbsa.dsflags = 0;
-	version = drrb->drr_version;
+	versioninfo = drrb->drr_versioninfo;
 	flags = drrb->drr_flags;
 
 	if (byteswap) {
 		rbsa.type = BSWAP_32(rbsa.type);
 		rbsa.fromguid = BSWAP_64(rbsa.fromguid);
-		version = BSWAP_64(version);
+		versioninfo = BSWAP_64(versioninfo);
 		flags = BSWAP_32(flags);
 	}
 
-	if (version != DMU_BACKUP_STREAM_VERSION ||
+	if (DMU_GET_STREAM_HDRTYPE(versioninfo) == DMU_COMPOUNDSTREAM ||
 	    rbsa.type >= DMU_OST_NUMTYPES ||
 	    ((flags & DRR_FLAG_CLONE) && origin == NULL))
 		return (EINVAL);
@@ -504,6 +626,7 @@
 	bzero(drc, sizeof (dmu_recv_cookie_t));
 	drc->drc_drrb = drrb;
 	drc->drc_tosnap = tosnap;
+	drc->drc_top_ds = top_ds;
 	drc->drc_force = force;
 
 	/*
@@ -579,8 +702,85 @@
 	uint64_t voff;
 	int bufsize; /* amount of memory allocated for buf */
 	zio_cksum_t cksum;
+	avl_tree_t guid_to_ds_map;
 };
 
+typedef struct guid_map_entry {
+	uint64_t	guid;
+	dsl_dataset_t	*gme_ds;
+	avl_node_t	avlnode;
+} guid_map_entry_t;
+
+static int
+guid_compare(const void *arg1, const void *arg2)
+{
+	const guid_map_entry_t *gmep1 = arg1;
+	const guid_map_entry_t *gmep2 = arg2;
+
+	if (gmep1->guid < gmep2->guid)
+		return (-1);
+	else if (gmep1->guid > gmep2->guid)
+		return (1);
+	return (0);
+}
+
+/*
+ * This function is a callback used by dmu_objset_find() (which
+ * enumerates the object sets) to build an avl tree that maps guids
+ * to datasets.  The resulting table is used when processing DRR_WRITE_BYREF
+ * send stream records.  These records, which are used in dedup'ed
+ * streams, do not contain data themselves, but refer to a copy
+ * of the data block that has already been written because it was
+ * earlier in the stream.  That previous copy is identified by the
+ * guid of the dataset with the referenced data.
+ */
+int
+find_ds_by_guid(char *name, void *arg)
+{
+	dsl_dataset_t *ds, *snapds;
+	avl_tree_t *guid_map = arg;
+	guid_map_entry_t *gmep;
+	guid_map_entry_t gmesrch;
+	dsl_pool_t	*dp;
+	int err;
+	uint64_t lastobj, firstobj;
+
+	if (dsl_dataset_hold(name, FTAG, &ds) != 0)
+		return (0);
+
+	dp = ds->ds_dir->dd_pool;
+	rw_enter(&dp->dp_config_rwlock, RW_READER);
+	firstobj = ds->ds_dir->dd_phys->dd_origin_obj;
+	lastobj = ds->ds_phys->ds_prev_snap_obj;
+
+	while (lastobj != firstobj) {
+		err = dsl_dataset_hold_obj(dp, lastobj, guid_map, &snapds);
+		if (err) {
+			/*
+			 * Skip this snapshot and move on. It's not
+			 * clear why this would ever happen, but the
+			 * remainder of the snapshot streadm can be
+			 * processed.
+			 */
+			rw_exit(&dp->dp_config_rwlock);
+			dsl_dataset_rele(ds, FTAG);
+			return (0);
+		}
+
+		gmesrch.guid = snapds->ds_phys->ds_guid;
+		gmep = kmem_alloc(sizeof (guid_map_entry_t), KM_SLEEP);
+		gmep->guid = snapds->ds_phys->ds_guid;
+		gmep->gme_ds = snapds;
+		avl_add(guid_map, gmep);
+		lastobj = snapds->ds_phys->ds_prev_snap_obj;
+	}
+
+	rw_exit(&dp->dp_config_rwlock);
+	dsl_dataset_rele(ds, FTAG);
+
+	return (0);
+}
+
 static void *
 restore_read(struct restorearg *ra, int len)
 {
@@ -625,7 +825,7 @@
 	switch (drr->drr_type) {
 	case DRR_BEGIN:
 		DO64(drr_begin.drr_magic);
-		DO64(drr_begin.drr_version);
+		DO64(drr_begin.drr_versioninfo);
 		DO64(drr_begin.drr_creation_time);
 		DO32(drr_begin.drr_type);
 		DO32(drr_begin.drr_flags);
@@ -639,27 +839,49 @@
 		DO32(drr_object.drr_bonustype);
 		DO32(drr_object.drr_blksz);
 		DO32(drr_object.drr_bonuslen);
+		DO64(drr_object.drr_toguid);
 		break;
 	case DRR_FREEOBJECTS:
 		DO64(drr_freeobjects.drr_firstobj);
 		DO64(drr_freeobjects.drr_numobjs);
+		DO64(drr_freeobjects.drr_toguid);
 		break;
 	case DRR_WRITE:
 		DO64(drr_write.drr_object);
 		DO32(drr_write.drr_type);
 		DO64(drr_write.drr_offset);
 		DO64(drr_write.drr_length);
+		DO64(drr_write.drr_toguid);
+		DO64(drr_write.drr_blkcksum.zc_word[0]);
+		DO64(drr_write.drr_blkcksum.zc_word[1]);
+		DO64(drr_write.drr_blkcksum.zc_word[2]);
+		DO64(drr_write.drr_blkcksum.zc_word[3]);
+		break;
+	case DRR_WRITE_BYREF:
+		DO64(drr_write_byref.drr_object);
+		DO64(drr_write_byref.drr_offset);
+		DO64(drr_write_byref.drr_length);
+		DO64(drr_write_byref.drr_toguid);
+		DO64(drr_write_byref.drr_refguid);
+		DO64(drr_write_byref.drr_refobject);
+		DO64(drr_write_byref.drr_refoffset);
+		DO64(drr_write_byref.drr_blkcksum.zc_word[0]);
+		DO64(drr_write_byref.drr_blkcksum.zc_word[1]);
+		DO64(drr_write_byref.drr_blkcksum.zc_word[2]);
+		DO64(drr_write_byref.drr_blkcksum.zc_word[3]);
 		break;
 	case DRR_FREE:
 		DO64(drr_free.drr_object);
 		DO64(drr_free.drr_offset);
 		DO64(drr_free.drr_length);
+		DO64(drr_free.drr_toguid);
 		break;
 	case DRR_END:
 		DO64(drr_end.drr_checksum.zc_word[0]);
 		DO64(drr_end.drr_checksum.zc_word[1]);
 		DO64(drr_end.drr_checksum.zc_word[2]);
 		DO64(drr_end.drr_checksum.zc_word[3]);
+		DO64(drr_end.drr_toguid);
 		break;
 	}
 #undef DO64
@@ -676,7 +898,7 @@
 	if (drro->drr_type == DMU_OT_NONE ||
 	    drro->drr_type >= DMU_OT_NUMTYPES ||
 	    drro->drr_bonustype >= DMU_OT_NUMTYPES ||
-	    drro->drr_checksum >= ZIO_CHECKSUM_FUNCTIONS ||
+	    drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
 	    drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
 	    P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
 	    drro->drr_blksz < SPA_MINBLOCKSIZE ||
@@ -726,7 +948,8 @@
 		return (err);
 	}
 
-	dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksum, tx);
+	dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksumtype,
+	    tx);
 	dmu_object_set_compress(os, drro->drr_object, drro->drr_compress, tx);
 
 	if (data != NULL) {
@@ -808,6 +1031,64 @@
 	return (0);
 }
 
+/*
+ * Handle a DRR_WRITE_BYREF record.  This record is used in dedup'ed
+ * streams to refer to a copy of the data that is already on the
+ * system because it came in earlier in the stream.  This function
+ * finds the earlier copy of the data, and uses that copy instead of
+ * data from the stream to fulfill this write.
+ */
+static int
+restore_write_byref(struct restorearg *ra, objset_t *os,
+    struct drr_write_byref *drrwbr)
+{
+	dmu_tx_t *tx;
+	int err;
+	guid_map_entry_t gmesrch;
+	guid_map_entry_t *gmep;
+	avl_index_t	where;
+	objset_t *ref_os = NULL;
+	dmu_buf_t *dbp;
+
+	if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
+		return (EINVAL);
+
+	/*
+	 * If the GUID of the referenced dataset is different from the
+	 * GUID of the target dataset, find the referenced dataset.
+	 */
+	if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
+		gmesrch.guid = drrwbr->drr_refguid;
+		if ((gmep = avl_find(&ra->guid_to_ds_map, &gmesrch,
+		    &where)) == NULL) {
+			return (EINVAL);
+		}
+		if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
+			return (EINVAL);
+	} else {
+		ref_os = os;
+	}
+
+	if (err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
+	    drrwbr->drr_refoffset, FTAG, &dbp))
+		return (err);
+
+	tx = dmu_tx_create(os);
+
+	dmu_tx_hold_write(tx, drrwbr->drr_object,
+	    drrwbr->drr_offset, drrwbr->drr_length);
+	err = dmu_tx_assign(tx, TXG_WAIT);
+	if (err) {
+		dmu_tx_abort(tx);
+		return (err);
+	}
+	dmu_write(os, drrwbr->drr_object,
+	    drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
+	dmu_buf_rele(dbp, FTAG);
+	dmu_tx_commit(tx);
+	return (0);
+}
+
 /* ARGSUSED */
 static int
 restore_free(struct restorearg *ra, objset_t *os,
@@ -837,6 +1118,8 @@
 	dmu_replay_record_t *drr;
 	objset_t *os;
 	zio_cksum_t pcksum;
+	guid_map_entry_t *gmep;
+	int featureflags;
 
 	if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC))
 		ra.byteswap = TRUE;
@@ -861,7 +1144,7 @@
 	if (ra.byteswap) {
 		struct drr_begin *drrb = drc->drc_drrb;
 		drrb->drr_magic = BSWAP_64(drrb->drr_magic);
-		drrb->drr_version = BSWAP_64(drrb->drr_version);
+		drrb->drr_versioninfo = BSWAP_64(drrb->drr_versioninfo);
 		drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
 		drrb->drr_type = BSWAP_32(drrb->drr_type);
 		drrb->drr_toguid = BSWAP_64(drrb->drr_toguid);
@@ -874,7 +1157,8 @@
 	ra.buf = kmem_alloc(ra.bufsize, KM_SLEEP);
 
 	/* these were verified in dmu_recv_begin */
-	ASSERT(drc->drc_drrb->drr_version == DMU_BACKUP_STREAM_VERSION);
+	ASSERT(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo) ==
+	    DMU_SUBSTREAM);
 	ASSERT(drc->drc_drrb->drr_type < DMU_OST_NUMTYPES);
 
 	/*
@@ -884,6 +1168,18 @@
 
 	ASSERT(drc->drc_real_ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT);
 
+	featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
+
+	/* if this stream is dedup'ed, set up the avl tree for guid mapping */
+	if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
+		avl_create(&ra.guid_to_ds_map, guid_compare,
+		    sizeof (guid_map_entry_t),
+		    offsetof(guid_map_entry_t, avlnode));
+		(void) dmu_objset_find(drc->drc_top_ds, find_ds_by_guid,
+		    (void *)&ra.guid_to_ds_map,
+		    DS_FIND_CHILDREN);
+	}
+
 	/*
 	 * Read records and process them.
 	 */
@@ -923,6 +1219,13 @@
 			ra.err = restore_write(&ra, os, &drrw);
 			break;
 		}
+		case DRR_WRITE_BYREF:
+		{
+			struct drr_write_byref drrwbr =
+			    drr->drr_u.drr_write_byref;
+			ra.err = restore_write_byref(&ra, os, &drrwbr);
+			break;
+		}
 		case DRR_FREE:
 		{
 			struct drr_free drrf = drr->drr_u.drr_free;
@@ -965,6 +1268,16 @@
 		}
 	}
 
+	if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
+		void *cookie = NULL;
+
+		while (gmep = avl_destroy_nodes(&ra.guid_to_ds_map, &cookie)) {
+			dsl_dataset_rele(gmep->gme_ds, &ra.guid_to_ds_map);
+			kmem_free(gmep, sizeof (guid_map_entry_t));
+		}
+		avl_destroy(&ra.guid_to_ds_map);
+	}
+
 	kmem_free(ra.buf, ra.bufsize);
 	*voffp = ra.voff;
 	return (ra.err);
--- a/usr/src/uts/common/fs/zfs/sys/dmu.h	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/uts/common/fs/zfs/sys/dmu.h	Mon Nov 09 11:04:55 2009 -0700
@@ -679,11 +679,12 @@
 	struct dsl_dataset *drc_real_ds;
 	struct drr_begin *drc_drrb;
 	char *drc_tosnap;
+	char *drc_top_ds;
 	boolean_t drc_newfs;
 	boolean_t drc_force;
 } dmu_recv_cookie_t;
 
-int dmu_recv_begin(char *tofs, char *tosnap, struct drr_begin *,
+int dmu_recv_begin(char *tofs, char *tosnap, char *topds, struct drr_begin *,
     boolean_t force, objset_t *origin, dmu_recv_cookie_t *);
 int dmu_recv_stream(dmu_recv_cookie_t *drc, struct vnode *vp, offset_t *voffp);
 int dmu_recv_end(dmu_recv_cookie_t *drc);
--- a/usr/src/uts/common/fs/zfs/sys/zfs_ioctl.h	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/uts/common/fs/zfs/sys/zfs_ioctl.h	Mon Nov 09 11:04:55 2009 -0700
@@ -30,6 +30,7 @@
 #include <sys/dmu.h>
 #include <sys/zio.h>
 #include <sys/dsl_deleg.h>
+#include <sys/spa.h>
 
 #ifdef _KERNEL
 #include <sys/nvpair.h>
@@ -45,8 +46,56 @@
 #define	ZFS_SNAPDIR_HIDDEN		0
 #define	ZFS_SNAPDIR_VISIBLE		1
 
-#define	DMU_BACKUP_STREAM_VERSION (1ULL)
-#define	DMU_BACKUP_HEADER_VERSION (2ULL)
+/*
+ * Field manipulation macros for the drr_versioninfo field of the
+ * send stream header.
+ */
+
+/*
+ * Header types for zfs send streams.
+ */
+typedef enum drr_headertype {
+	DMU_SUBSTREAM = 0x1,
+	DMU_COMPOUNDSTREAM = 0x2
+} drr_headertype_t;
+
+#define	DMU_GET_STREAM_HDRTYPE(vi)	BF64_GET((vi), 0, 2)
+#define	DMU_SET_STREAM_HDRTYPE(vi, x)	BF64_SET((vi), 0, 2, x)
+
+#define	DMU_GET_FEATUREFLAGS(vi)	BF64_GET((vi), 2, 30)
+#define	DMU_SET_FEATUREFLAGS(vi, x)	BF64_SET((vi), 2, 30, x)
+
+/*
+ * Feature flags for zfs send streams (flags in drr_versioninfo)
+ */
+
+#define	DMU_BACKUP_FEATURE_DEDUP	(0x1)
+
+/*
+ * Mask of all supported backup features
+ */
+#define	DMU_BACKUP_FEATURE_MASK	(DMU_BACKUP_FEATURE_DEDUP)
+
+/* Are all features in the given flag word currently supported? */
+#define	DMU_STREAM_SUPPORTED(x)	(!((x) & ~DMU_BACKUP_FEATURE_MASK))
+
+/*
+ * The drr_versioninfo field of the dmu_replay_record has the
+ * following layout:
+ *
+ *	64	56	48	40	32	24	16	8	0
+ *	+-------+-------+-------+-------+-------+-------+-------+-------+
+ *  	|		reserved	|        feature-flags	    |C|S|
+ *	+-------+-------+-------+-------+-------+-------+-------+-------+
+ *
+ * The low order two bits indicate the header type: SUBSTREAM (0x1)
+ * or COMPOUNDSTREAM (0x2).  Using two bits for this is historical:
+ * this field used to be a version number, where the two version types
+ * were 1 and 2.  Using two bits for this allows earlier versions of
+ * the code to be able to recognize send streams that don't use any
+ * of the features indicated by feature flags.
+ */
+
 #define	DMU_BACKUP_MAGIC 0x2F5bacbacULL
 
 #define	DRR_FLAG_CLONE		(1<<0)
@@ -58,13 +107,14 @@
 typedef struct dmu_replay_record {
 	enum {
 		DRR_BEGIN, DRR_OBJECT, DRR_FREEOBJECTS,
-		DRR_WRITE, DRR_FREE, DRR_END, DRR_NUMTYPES
+		DRR_WRITE, DRR_FREE, DRR_END, DRR_WRITE_BYREF,
+		DRR_NUMTYPES
 	} drr_type;
 	uint32_t drr_payloadlen;
 	union {
 		struct drr_begin {
 			uint64_t drr_magic;
-			uint64_t drr_version;
+			uint64_t drr_versioninfo; /* was drr_version */
 			uint64_t drr_creation_time;
 			dmu_objset_type_t drr_type;
 			uint32_t drr_flags;
@@ -74,6 +124,7 @@
 		} drr_begin;
 		struct drr_end {
 			zio_cksum_t drr_checksum;
+			uint64_t drr_toguid;
 		} drr_end;
 		struct drr_object {
 			uint64_t drr_object;
@@ -81,14 +132,16 @@
 			dmu_object_type_t drr_bonustype;
 			uint32_t drr_blksz;
 			uint32_t drr_bonuslen;
-			uint8_t drr_checksum;
+			uint8_t drr_checksumtype;
 			uint8_t drr_compress;
 			uint8_t drr_pad[6];
+			uint64_t drr_toguid;
 			/* bonus content follows */
 		} drr_object;
 		struct drr_freeobjects {
 			uint64_t drr_firstobj;
 			uint64_t drr_numobjs;
+			uint64_t drr_toguid;
 		} drr_freeobjects;
 		struct drr_write {
 			uint64_t drr_object;
@@ -96,13 +149,32 @@
 			uint32_t drr_pad;
 			uint64_t drr_offset;
 			uint64_t drr_length;
+			uint64_t drr_toguid;
+			uint8_t drr_checksumtype;
+			uint8_t drr_pad2[7];
+			zio_cksum_t drr_blkcksum;
 			/* content follows */
 		} drr_write;
 		struct drr_free {
 			uint64_t drr_object;
 			uint64_t drr_offset;
 			uint64_t drr_length;
+			uint64_t drr_toguid;
 		} drr_free;
+		struct drr_write_byref {
+			/* where to put the data */
+			uint64_t drr_object;
+			uint64_t drr_offset;
+			uint64_t drr_length;
+			uint64_t drr_toguid;
+			/* where to find the prior copy of the data */
+			uint64_t drr_refguid;
+			uint64_t drr_refobject;
+			uint64_t drr_refoffset;
+			uint8_t drr_checksumtype;
+			uint8_t drr_pad[7];
+			zio_cksum_t drr_blkcksum;
+		} drr_write_byref;
 	} drr_u;
 } dmu_replay_record_t;
 
@@ -150,6 +222,7 @@
 	char		zc_name[MAXPATHLEN];
 	char		zc_value[MAXPATHLEN * 2];
 	char		zc_string[MAXNAMELEN];
+	char		zc_top_ds[MAXPATHLEN];
 	uint64_t	zc_guid;
 	uint64_t	zc_nvlist_conf;		/* really (char *) */
 	uint64_t	zc_nvlist_conf_size;
--- a/usr/src/uts/common/fs/zfs/zfs_ioctl.c	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/uts/common/fs/zfs/zfs_ioctl.c	Mon Nov 09 11:04:55 2009 -0700
@@ -2922,8 +2922,8 @@
 			goto out;
 	}
 
-	error = dmu_recv_begin(tofs, tosnap, &zc->zc_begin_record,
-	    force, origin, &drc);
+	error = dmu_recv_begin(tofs, tosnap, zc->zc_top_ds,
+	    &zc->zc_begin_record, force, origin, &drc);
 	if (origin)
 		dmu_objset_rele(origin, FTAG);
 	if (error)