usr/src/uts/common/fs/zfs/dmu_send.c
changeset 5367 c40abbe796be
parent 5326 6752aa2bd5bc
child 5378 111aa1baa84a
--- a/usr/src/uts/common/fs/zfs/dmu_send.c	Mon Oct 29 16:16:37 2007 -0700
+++ b/usr/src/uts/common/fs/zfs/dmu_send.c	Mon Oct 29 17:12:17 2007 -0700
@@ -41,9 +41,12 @@
 #include <sys/zap.h>
 #include <sys/zio_checksum.h>
 
+static char *dmu_recv_tag = "dmu_recv_tag";
+
 struct backuparg {
 	dmu_replay_record_t *drr;
 	vnode_t *vp;
+	offset_t *off;
 	objset_t *os;
 	zio_cksum_t zc;
 	int err;
@@ -59,6 +62,7 @@
 	ba->err = vn_rdwr(UIO_WRITE, ba->vp,
 	    (caddr_t)buf, len,
 	    0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
+	*ba->off += len;
 	return (ba->err);
 }
 
@@ -217,13 +221,15 @@
 }
 
 int
-dmu_sendbackup(objset_t *tosnap, objset_t *fromsnap, vnode_t *vp)
+dmu_sendbackup(objset_t *tosnap, objset_t *fromsnap, boolean_t fromorigin,
+    vnode_t *vp, offset_t *off)
 {
 	dsl_dataset_t *ds = tosnap->os->os_dsl_dataset;
 	dsl_dataset_t *fromds = fromsnap ? fromsnap->os->os_dsl_dataset : NULL;
 	dmu_replay_record_t *drr;
 	struct backuparg ba;
 	int err;
+	uint64_t fromtxg = 0;
 
 	/* tosnap must be a snapshot */
 	if (ds->ds_phys->ds_next_snap_obj == 0)
@@ -231,25 +237,51 @@
 
 	/* fromsnap must be an earlier snapshot from the same fs as tosnap */
 	if (fromds && (ds->ds_dir != fromds->ds_dir ||
-	    fromds->ds_phys->ds_creation_txg >=
-	    ds->ds_phys->ds_creation_txg))
+	    fromds->ds_phys->ds_creation_txg >= ds->ds_phys->ds_creation_txg))
 		return (EXDEV);
 
+	if (fromorigin) {
+		if (fromsnap)
+			return (EINVAL);
+
+		if (ds->ds_dir->dd_phys->dd_origin_obj != NULL) {
+			dsl_pool_t *dp = ds->ds_dir->dd_pool;
+			rw_enter(&dp->dp_config_rwlock, RW_READER);
+			err = dsl_dataset_open_obj(dp,
+			    ds->ds_dir->dd_phys->dd_origin_obj, NULL,
+			    DS_MODE_NONE, FTAG, &fromds);
+			rw_exit(&dp->dp_config_rwlock);
+			if (err)
+				return (err);
+		} else {
+			fromorigin = B_FALSE;
+		}
+	}
+
+
 	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
 	drr->drr_type = DRR_BEGIN;
 	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
-	drr->drr_u.drr_begin.drr_version = DMU_BACKUP_VERSION;
+	drr->drr_u.drr_begin.drr_version = DMU_BACKUP_STREAM_VERSION;
 	drr->drr_u.drr_begin.drr_creation_time =
 	    ds->ds_phys->ds_creation_time;
 	drr->drr_u.drr_begin.drr_type = tosnap->os->os_phys->os_type;
+	if (fromorigin)
+		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
 	drr->drr_u.drr_begin.drr_toguid = ds->ds_phys->ds_guid;
 	if (fromds)
 		drr->drr_u.drr_begin.drr_fromguid = fromds->ds_phys->ds_guid;
 	dsl_dataset_name(ds, drr->drr_u.drr_begin.drr_toname);
 
+	if (fromds)
+		fromtxg = fromds->ds_phys->ds_creation_txg;
+	if (fromorigin)
+		dsl_dataset_close(fromds, DS_MODE_NONE, FTAG);
+
 	ba.drr = drr;
 	ba.vp = vp;
 	ba.os = tosnap;
+	ba.off = off;
 	ZIO_SET_CHECKSUM(&ba.zc, 0, 0, 0, 0);
 
 	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) {
@@ -257,8 +289,7 @@
 		return (ba.err);
 	}
 
-	err = traverse_dsl_dataset(ds,
-	    fromds ? fromds->ds_phys->ds_creation_txg : 0,
+	err = traverse_dsl_dataset(ds, fromtxg,
 	    ADVANCE_PRE | ADVANCE_HOLES | ADVANCE_DATA | ADVANCE_NOLOCK,
 	    backup_cb, &ba);
 
@@ -283,67 +314,203 @@
 	return (0);
 }
 
-struct restorearg {
-	int err;
-	int byteswap;
-	vnode_t *vp;
-	char *buf;
-	uint64_t voff;
-	int buflen; /* number of valid bytes in buf */
-	int bufoff; /* next offset to read */
-	int bufsize; /* amount of memory allocated for buf */
-	zio_cksum_t zc;
+struct recvbeginsyncarg {
+	const char *tofs;
+	const char *tosnap;
+	dsl_dataset_t *origin;
+	uint64_t fromguid;
+	dmu_objset_type_t type;
+	void *tag;
+	boolean_t force;
+	char clonelastname[MAXNAMELEN];
+	dsl_dataset_t *ds; /* the ds to recv into; returned from the syncfunc */
 };
 
+static dsl_dataset_t *
+recv_full_sync_impl(dsl_pool_t *dp, uint64_t dsobj, dmu_objset_type_t type,
+    cred_t *cr, dmu_tx_t *tx)
+{
+	dsl_dataset_t *ds;
+
+	VERIFY(0 == dsl_dataset_open_obj(dp, dsobj, NULL,
+	    DS_MODE_EXCLUSIVE, dmu_recv_tag, &ds));
+
+	if (type != DMU_OST_NONE) {
+		(void) dmu_objset_create_impl(dp->dp_spa,
+		    ds, &ds->ds_phys->ds_bp, type, tx);
+	}
+
+	dmu_buf_will_dirty(ds->ds_dbuf, tx);
+	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
+
+	spa_history_internal_log(LOG_DS_REPLAY_FULL_SYNC,
+	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "dataset = %lld",
+	    ds->ds_phys->ds_dir_obj);
+
+	return (ds);
+}
+
+/* ARGSUSED */
 static int
-replay_incremental_check(dsl_dataset_t *ds, struct drr_begin *drrb)
+recv_full_check(void *arg1, void *arg2, dmu_tx_t *tx)
+{
+	dsl_dir_t *dd = arg1;
+	struct recvbeginsyncarg *rbsa = arg2;
+	objset_t *mos = dd->dd_pool->dp_meta_objset;
+	uint64_t val;
+	int err;
+
+	err = zap_lookup(mos, dd->dd_phys->dd_child_dir_zapobj,
+	    strrchr(rbsa->tofs, '/') + 1, sizeof (uint64_t), 1, &val);
+
+	if (err != ENOENT)
+		return (err ? err : EEXIST);
+
+	if (rbsa->origin) {
+		/* make sure it's a snap in the same pool */
+		if (rbsa->origin->ds_dir->dd_pool != dd->dd_pool)
+			return (EXDEV);
+		if (rbsa->origin->ds_phys->ds_num_children == 0)
+			return (EINVAL);
+		if (rbsa->origin->ds_phys->ds_guid != rbsa->fromguid)
+			return (ENODEV);
+	}
+
+	return (0);
+}
+
+static void
+recv_full_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
+{
+	dsl_dir_t *dd = arg1;
+	struct recvbeginsyncarg *rbsa = arg2;
+	uint64_t dsobj;
+
+	dsobj = dsl_dataset_create_sync(dd, strrchr(rbsa->tofs, '/') + 1,
+	    rbsa->origin, cr, tx);
+
+	rbsa->ds = recv_full_sync_impl(dd->dd_pool, dsobj,
+	    rbsa->origin ? DMU_OST_NONE : rbsa->type, cr, tx);
+}
+
+static int
+recv_full_existing_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
-	const char *snapname;
+	dsl_dataset_t *ds = arg1;
+	struct recvbeginsyncarg *rbsa = arg2;
+	int err;
+
+	/* must be a head ds */
+	if (ds->ds_phys->ds_next_snap_obj != 0)
+		return (EINVAL);
+
+	/* must not be a clone ds */
+	if (ds->ds_prev != NULL)
+		return (EINVAL);
+
+	err = dsl_dataset_destroy_check(ds, rbsa->tag, tx);
+	if (err)
+		return (err);
+
+	if (rbsa->origin) {
+		/* make sure it's a snap in the same pool */
+		if (rbsa->origin->ds_dir->dd_pool != ds->ds_dir->dd_pool)
+			return (EXDEV);
+		if (rbsa->origin->ds_phys->ds_num_children == 0)
+			return (EINVAL);
+		if (rbsa->origin->ds_phys->ds_guid != rbsa->fromguid)
+			return (ENODEV);
+	}
+
+	return (0);
+}
+
+static void
+recv_full_existing_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
+{
+	dsl_dataset_t *ds = arg1;
+	struct recvbeginsyncarg *rbsa = arg2;
+	dsl_dir_t *dd = ds->ds_dir;
+	uint64_t dsobj;
+
+	/*
+	 * NB: caller must provide an extra hold on the dsl_dir_t, so it
+	 * won't go away when dsl_dataset_destroy_sync() closes the
+	 * dataset.
+	 */
+	dsl_dataset_destroy_sync(ds, rbsa->tag, cr, tx);
+
+	dsobj = dsl_dataset_create_sync_impl(dd, rbsa->origin, tx);
+
+	rbsa->ds = recv_full_sync_impl(dd->dd_pool, dsobj,
+	    rbsa->origin ? DMU_OST_NONE : rbsa->type, cr, tx);
+}
+
+/* ARGSUSED */
+static int
+recv_incremental_check(void *arg1, void *arg2, dmu_tx_t *tx)
+{
+	dsl_dataset_t *ds = arg1;
+	struct recvbeginsyncarg *rbsa = arg2;
 	int err;
 	uint64_t val;
 
+	/* must not have any changes since most recent snapshot */
+	if (!rbsa->force && dsl_dataset_modified_since_lastsnap(ds))
+		return (ETXTBSY);
+
 	/* must already be a snapshot of this fs */
 	if (ds->ds_phys->ds_prev_snap_obj == 0)
 		return (ENODEV);
 
 	/* most recent snapshot must match fromguid */
-	if (ds->ds_prev->ds_phys->ds_guid != drrb->drr_fromguid)
+	if (ds->ds_prev->ds_phys->ds_guid != rbsa->fromguid)
 		return (ENODEV);
 
 	/* new snapshot name must not exist */
-	snapname = strrchr(drrb->drr_toname, '@');
-	if (snapname == NULL)
+	err = zap_lookup(ds->ds_dir->dd_pool->dp_meta_objset,
+	    ds->ds_phys->ds_snapnames_zapobj, rbsa->tosnap, 8, 1, &val);
+	if (err == 0)
 		return (EEXIST);
-
-	snapname++;
-	err = zap_lookup(ds->ds_dir->dd_pool->dp_meta_objset,
-	    ds->ds_phys->ds_snapnames_zapobj, snapname, 8, 1, &val);
-	if (err == 0)
-	return (EEXIST);
 	if (err != ENOENT)
-	return (err);
-
+		return (err);
 	return (0);
 }
 
 /* ARGSUSED */
-static int
-replay_offline_incremental_check(void *arg1, void *arg2, dmu_tx_t *tx)
+static void
+recv_online_incremental_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
-	dsl_dataset_t *ds = arg1;
-	struct drr_begin *drrb = arg2;
+	dsl_dataset_t *ohds = arg1;
+	struct recvbeginsyncarg *rbsa = arg2;
+	dsl_pool_t *dp = ohds->ds_dir->dd_pool;
+	dsl_dataset_t *ods, *cds;
+	uint64_t dsobj;
 
-	/* must not have any changes since most recent snapshot */
-	if (dsl_dataset_modified_since_lastsnap(ds))
-		return (ETXTBSY);
+	/* create the temporary clone */
+	VERIFY(0 == dsl_dataset_open_obj(dp, ohds->ds_phys->ds_prev_snap_obj,
+	    NULL, DS_MODE_STANDARD, FTAG, &ods));
+	dsobj = dsl_dataset_create_sync(ohds->ds_dir,
+	    rbsa->clonelastname, ods, cr, tx);
+	dsl_dataset_close(ods, DS_MODE_STANDARD, FTAG);
 
-	return (replay_incremental_check(ds, drrb));
+	/* open the temporary clone */
+	VERIFY(0 == dsl_dataset_open_obj(dp, dsobj, NULL,
+	    DS_MODE_EXCLUSIVE, dmu_recv_tag, &cds));
+
+	dmu_buf_will_dirty(cds->ds_dbuf, tx);
+	cds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
+
+	rbsa->ds = cds;
+
+	spa_history_internal_log(LOG_DS_REPLAY_INC_SYNC,
+	    dp->dp_spa, tx, cr, "dataset = %lld",
+	    cds->ds_phys->ds_dir_obj);
 }
 
 /* ARGSUSED */
 static void
-replay_offline_incremental_sync(void *arg1, void *arg2, cred_t *cr,
-    dmu_tx_t *tx)
+recv_offline_incremental_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
 	dsl_dataset_t *ds = arg1;
 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
@@ -354,200 +521,203 @@
 	    ds->ds_phys->ds_dir_obj);
 }
 
-/* ARGSUSED */
-static int
-replay_full_check(void *arg1, void *arg2, dmu_tx_t *tx)
+/*
+ * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
+ * succeeds; otherwise we will leak the holds on the datasets.
+ */
+int
+dmu_recv_begin(char *tofs, char *tosnap, struct drr_begin *drrb,
+    boolean_t force, objset_t *origin, boolean_t online, dmu_recv_cookie_t *drc)
 {
-	dsl_dir_t *dd = arg1;
-	struct drr_begin *drrb = arg2;
-	objset_t *mos = dd->dd_pool->dp_meta_objset;
-	char *cp;
-	uint64_t val;
-	int err;
+	int err = 0;
+	boolean_t byteswap;
+	struct recvbeginsyncarg rbsa;
+	uint64_t version;
+	int flags;
+	dsl_dataset_t *ds;
+
+	if (drrb->drr_magic == DMU_BACKUP_MAGIC)
+		byteswap = FALSE;
+	else if (drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC))
+		byteswap = TRUE;
+	else
+		return (EINVAL);
+
+	rbsa.tofs = tofs;
+	rbsa.tosnap = tosnap;
+	rbsa.origin = origin ? origin->os->os_dsl_dataset : NULL;
+	rbsa.fromguid = drrb->drr_fromguid;
+	rbsa.type = drrb->drr_type;
+	rbsa.tag = FTAG;
+	version = drrb->drr_version;
+	flags = drrb->drr_flags;
+
+	if (byteswap) {
+		rbsa.type = BSWAP_32(rbsa.type);
+		rbsa.fromguid = BSWAP_64(rbsa.fromguid);
+		version = BSWAP_64(version);
+		flags = BSWAP_32(flags);
+	}
+
+	if (version != DMU_BACKUP_STREAM_VERSION ||
+	    rbsa.type >= DMU_OST_NUMTYPES ||
+	    ((flags & DRR_FLAG_CLONE) && origin == NULL))
+		return (EINVAL);
+
+	bzero(drc, sizeof (dmu_recv_cookie_t));
+	drc->drc_drrb = drrb;
+	drc->drc_tosnap = tosnap;
+	drc->drc_force = force;
+
+	/*
+	 * Process the begin in syncing context.
+	 */
+	if (rbsa.fromguid && !(flags & DRR_FLAG_CLONE) && !online) {
+		/* offline incremental receive */
+		err = dsl_dataset_open(tofs,
+		    DS_MODE_EXCLUSIVE, dmu_recv_tag, &ds);
+		if (err)
+			return (err);
 
-	cp = strchr(drrb->drr_toname, '@');
-	*cp = '\0';
-	err = zap_lookup(mos, dd->dd_phys->dd_child_dir_zapobj,
-	    strrchr(drrb->drr_toname, '/') + 1,
-	    sizeof (uint64_t), 1, &val);
-	*cp = '@';
+		/*
+		 * Only do the rollback if the most recent snapshot
+		 * matches the incremental source
+		 */
+		if (force) {
+			if (ds->ds_prev == NULL ||
+			    ds->ds_prev->ds_phys->ds_guid !=
+			    rbsa.fromguid) {
+				dsl_dataset_close(ds, DS_MODE_EXCLUSIVE,
+				    dmu_recv_tag);
+				return (ENODEV);
+			}
+			(void) dsl_dataset_rollback(ds, DMU_OST_NONE);
+		}
+		rbsa.force = B_FALSE;
+		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
+		    recv_incremental_check,
+		    recv_offline_incremental_sync,
+		    ds, &rbsa, 1);
+		if (err) {
+			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, dmu_recv_tag);
+			return (err);
+		}
+		drc->drc_logical_ds = drc->drc_real_ds = ds;
+	} else if (rbsa.fromguid && !(flags & DRR_FLAG_CLONE)) {
+		/* online incremental receive */
+
+		/* tmp clone name is: tofs/%tosnap" */
+		(void) snprintf(rbsa.clonelastname, sizeof (rbsa.clonelastname),
+		    "%%%s", tosnap);
+
+		/* open the dataset we are logically receiving into */
+		err = dsl_dataset_open(tofs,
+		    DS_MODE_STANDARD, dmu_recv_tag, &ds);
+		if (err)
+			return (err);
 
-	if (err != ENOENT)
-		return (err ? err : EEXIST);
+		rbsa.force = force;
+		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
+		    recv_incremental_check,
+		    recv_online_incremental_sync, ds, &rbsa, 5);
+		if (err) {
+			dsl_dataset_close(ds, DS_MODE_STANDARD, dmu_recv_tag);
+			return (err);
+		}
+		drc->drc_logical_ds = ds;
+		drc->drc_real_ds = rbsa.ds;
+	} else {
+		/* create new fs -- full backup or clone */
+		dsl_dir_t *dd = NULL;
+		const char *tail;
+
+		err = dsl_dir_open(tofs, FTAG, &dd, &tail);
+		if (err)
+			return (err);
+		if (tail == NULL) {
+			if (!force) {
+				dsl_dir_close(dd, FTAG);
+				return (EEXIST);
+			}
+
+			rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
+			err = dsl_dataset_open_obj(dd->dd_pool,
+			    dd->dd_phys->dd_head_dataset_obj, NULL,
+			    DS_MODE_EXCLUSIVE | DS_MODE_INCONSISTENT,
+			    FTAG, &ds);
+			rw_exit(&dd->dd_pool->dp_config_rwlock);
+			if (err) {
+				dsl_dir_close(dd, FTAG);
+				return (err);
+			}
+
+			err = dsl_sync_task_do(dd->dd_pool,
+			    recv_full_existing_check,
+			    recv_full_existing_sync, ds, &rbsa, 5);
+			/* if successful, sync task closes the ds for us */
+			if (err)
+				dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
+		} else {
+			err = dsl_sync_task_do(dd->dd_pool, recv_full_check,
+			    recv_full_sync, dd, &rbsa, 5);
+			if (err)
+				return (err);
+		}
+		dsl_dir_close(dd, FTAG);
+		if (err)
+			return (err);
+		drc->drc_logical_ds = drc->drc_real_ds = rbsa.ds;
+		drc->drc_newfs = B_TRUE;
+	}
+
+	/* downgrade our hold on the ds from EXCLUSIVE to PRIMARY */
+	dsl_dataset_downgrade(drc->drc_real_ds,
+	    DS_MODE_EXCLUSIVE, DS_MODE_PRIMARY);
 
 	return (0);
 }
 
-static void
-replay_full_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
-{
-	dsl_dir_t *dd = arg1;
-	struct drr_begin *drrb = arg2;
-	char *cp;
-	dsl_dataset_t *ds;
-	uint64_t dsobj;
-
-	cp = strchr(drrb->drr_toname, '@');
-	*cp = '\0';
-	dsobj = dsl_dataset_create_sync(dd, strrchr(drrb->drr_toname, '/') + 1,
-	    NULL, tx);
-
-	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, dsobj, NULL,
-	    DS_MODE_EXCLUSIVE, FTAG, &ds));
-
-	(void) dmu_objset_create_impl(dsl_dataset_get_spa(ds),
-	    ds, &ds->ds_phys->ds_bp, drrb->drr_type, tx);
-
-	dmu_buf_will_dirty(ds->ds_dbuf, tx);
-	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
-
-	spa_history_internal_log(LOG_DS_REPLAY_FULL_SYNC,
-	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "dataset = %lld",
-	    ds->ds_phys->ds_dir_obj);
-
-	*cp = '@';
-
-	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-}
-
-struct onlineincarg {
-	dsl_dir_t *dd;
-	dsl_dataset_t *ohds;
-	boolean_t force;
-	const char *cosname;
+struct restorearg {
+	int err;
+	int byteswap;
+	vnode_t *vp;
+	char *buf;
+	uint64_t voff;
+	int bufsize; /* amount of memory allocated for buf */
+	zio_cksum_t cksum;
 };
 
-/* ARGSUSED */
-static int
-replay_online_incremental_check(void *arg1, void *arg2, dmu_tx_t *tx)
-{
-	struct onlineincarg *oia = arg1;
-
-	if (dsl_dataset_modified_since_lastsnap(oia->ohds) && !oia->force)
-		return (ETXTBSY);
-
-	return (replay_incremental_check(oia->ohds, arg2));
-}
-
-/* ARGSUSED */
-static void
-replay_online_incremental_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
-{
-	struct onlineincarg *oia = arg1;
-	dsl_dataset_t *ohds = oia->ohds;
-	dsl_dir_t *dd = oia->dd;
-	dsl_dataset_t *ods, *ds;
-	uint64_t dsobj;
-
-	VERIFY(0 == dsl_dataset_open_obj(ohds->ds_dir->dd_pool,
-	    ohds->ds_phys->ds_prev_snap_obj, NULL,
-	    DS_MODE_STANDARD, FTAG, &ods));
-
-	dsobj = dsl_dataset_create_sync(dd, strrchr(oia->cosname, '/') + 1,
-	    ods, tx);
-
-	/* open the temporary clone */
-	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, dsobj, NULL,
-	    DS_MODE_EXCLUSIVE, FTAG, &ds));
-
-	dmu_buf_will_dirty(ds->ds_dbuf, tx);
-	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
-
-	spa_history_internal_log(LOG_DS_REPLAY_INC_SYNC,
-	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "dataset = %lld",
-	    ds->ds_phys->ds_dir_obj);
-
-	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-	dsl_dataset_close(ods, DS_MODE_STANDARD, FTAG);
-}
-
-static int
-replay_end_check(void *arg1, void *arg2, dmu_tx_t *tx)
-{
-	objset_t *os = arg1;
-	struct drr_begin *drrb = arg2;
-	char *snapname;
-
-	/* XXX verify that drr_toname is in dd */
-
-	snapname = strchr(drrb->drr_toname, '@');
-	if (snapname == NULL)
-		return (EINVAL);
-	snapname++;
-
-	return (dsl_dataset_snapshot_check(os, snapname, tx));
-}
-
-static void
-replay_end_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
-{
-	objset_t *os = arg1;
-	struct drr_begin *drrb = arg2;
-	char *snapname;
-	dsl_dataset_t *ds, *hds;
-
-	snapname = strchr(drrb->drr_toname, '@') + 1;
-
-	dsl_dataset_snapshot_sync(os, snapname, cr, tx);
-
-	/* set snapshot's creation time and guid */
-	hds = os->os->os_dsl_dataset;
-	VERIFY(0 == dsl_dataset_open_obj(hds->ds_dir->dd_pool,
-	    hds->ds_phys->ds_prev_snap_obj, NULL,
-	    DS_MODE_PRIMARY | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
-	    FTAG, &ds));
-
-	dmu_buf_will_dirty(ds->ds_dbuf, tx);
-	ds->ds_phys->ds_creation_time = drrb->drr_creation_time;
-	ds->ds_phys->ds_guid = drrb->drr_toguid;
-	ds->ds_phys->ds_flags &= ~DS_FLAG_INCONSISTENT;
-
-	/* log the end of the receive */
-	spa_history_internal_log(LOG_DS_RECEIVE, ds->ds_dir->dd_pool->dp_spa,
-	    tx, cr, "dataset = %llu", ds->ds_phys->ds_dir_obj);
-
-	dsl_dataset_close(ds, DS_MODE_PRIMARY, FTAG);
-
-	dmu_buf_will_dirty(hds->ds_dbuf, tx);
-	hds->ds_phys->ds_flags &= ~DS_FLAG_INCONSISTENT;
-}
-
 static void *
 restore_read(struct restorearg *ra, int len)
 {
 	void *rv;
+	int done = 0;
 
 	/* some things will require 8-byte alignment, so everything must */
 	ASSERT3U(len % 8, ==, 0);
 
-	while (ra->buflen - ra->bufoff < len) {
+	while (done < len) {
 		ssize_t resid;
-		int leftover = ra->buflen - ra->bufoff;
 
-		(void) memmove(ra->buf, ra->buf + ra->bufoff, leftover);
 		ra->err = vn_rdwr(UIO_READ, ra->vp,
-		    (caddr_t)ra->buf + leftover, ra->bufsize - leftover,
+		    (caddr_t)ra->buf + done, len - done,
 		    ra->voff, UIO_SYSSPACE, FAPPEND,
 		    RLIM64_INFINITY, CRED(), &resid);
 
-		ra->voff += ra->bufsize - leftover - resid;
-		ra->buflen = ra->bufsize - resid;
-		ra->bufoff = 0;
-		if (resid == ra->bufsize - leftover)
+		if (resid == len - done)
 			ra->err = EINVAL;
+		ra->voff += len - done - resid;
+		done = len - resid;
 		if (ra->err)
 			return (NULL);
-		/* Could compute checksum here? */
 	}
 
-	ASSERT3U(ra->bufoff % 8, ==, 0);
-	ASSERT3U(ra->buflen - ra->bufoff, >=, len);
-	rv = ra->buf + ra->bufoff;
-	ra->bufoff += len;
+	ASSERT3U(done, ==, len);
+	rv = ra->buf;
 	if (ra->byteswap)
-		fletcher_4_incremental_byteswap(rv, len, &ra->zc);
+		fletcher_4_incremental_byteswap(rv, len, &ra->cksum);
 	else
-		fletcher_4_incremental_native(rv, len, &ra->zc);
+		fletcher_4_incremental_native(rv, len, &ra->cksum);
 	return (rv);
 }
 
@@ -557,12 +727,14 @@
 #define	DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
 #define	DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
 	drr->drr_type = BSWAP_32(drr->drr_type);
+	drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
 	switch (drr->drr_type) {
 	case DRR_BEGIN:
 		DO64(drr_begin.drr_magic);
 		DO64(drr_begin.drr_version);
 		DO64(drr_begin.drr_creation_time);
 		DO32(drr_begin.drr_type);
+		DO32(drr_begin.drr_flags);
 		DO64(drr_begin.drr_toguid);
 		DO64(drr_begin.drr_fromguid);
 		break;
@@ -786,52 +958,67 @@
 	return (err);
 }
 
-int
-dmu_recvbackup(char *tosnap, struct drr_begin *drrb, uint64_t *sizep,
-    boolean_t force, boolean_t online, vnode_t *vp, uint64_t voffset,
-    char *cosname)
+static void
+recv_abort_cleanup(dmu_recv_cookie_t *drc)
 {
-	struct restorearg ra;
-	dmu_replay_record_t *drr;
-	char *cp;
-	objset_t *os = NULL;
-	zio_cksum_t pzc;
-	char *clonebuf = NULL;
-	size_t len;
+	if (drc->drc_newfs || drc->drc_real_ds != drc->drc_logical_ds) {
+		/*
+		 * online incremental or new fs: destroy the fs (which
+		 * may be a clone) that we created
+		 */
+		(void) dsl_dataset_destroy(drc->drc_real_ds, dmu_recv_tag);
+		if (drc->drc_real_ds != drc->drc_logical_ds) {
+			dsl_dataset_close(drc->drc_logical_ds,
+			    DS_MODE_STANDARD, dmu_recv_tag);
+		}
+	} else {
+		/*
+		 * offline incremental: rollback to most recent snapshot.
+		 */
+		int lmode = DS_MODE_PRIMARY;
+		if (dsl_dataset_tryupgrade(drc->drc_real_ds,
+		    DS_MODE_PRIMARY, DS_MODE_EXCLUSIVE)) {
+			lmode = DS_MODE_EXCLUSIVE;
+			(void) dsl_dataset_rollback(drc->drc_real_ds,
+			    DMU_OST_NONE);
+		}
+		dsl_dataset_close(drc->drc_real_ds, lmode, FTAG);
+	}
+}
 
-	bzero(&ra, sizeof (ra));
-	ra.vp = vp;
-	ra.voff = voffset;
-	ra.bufsize = 1<<20;
-	ra.buf = kmem_alloc(ra.bufsize, KM_SLEEP);
+/*
+ * NB: callers *must* call dmu_recv_end() if this succeeds.
+ */
+int
+dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp)
+{
+	struct restorearg ra = { 0 };
+	dmu_replay_record_t *drr;
+	objset_t *os;
+	zio_cksum_t pcksum;
+
+	if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC))
+		ra.byteswap = TRUE;
 
-	if (drrb->drr_magic == DMU_BACKUP_MAGIC) {
-		ra.byteswap = FALSE;
-	} else if (drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
-		ra.byteswap = TRUE;
-	} else {
-		ra.err = EINVAL;
-		goto out;
+	{
+		/* compute checksum of drr_begin record */
+		dmu_replay_record_t *drr;
+		drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
+
+		drr->drr_type = DRR_BEGIN;
+		drr->drr_u.drr_begin = *drc->drc_drrb;
+		if (ra.byteswap) {
+			fletcher_4_incremental_byteswap(drr,
+			    sizeof (dmu_replay_record_t), &ra.cksum);
+		} else {
+			fletcher_4_incremental_native(drr,
+			    sizeof (dmu_replay_record_t), &ra.cksum);
+		}
+		kmem_free(drr, sizeof (dmu_replay_record_t));
 	}
 
-	/*
-	 * NB: this assumes that struct drr_begin will be the largest in
-	 * dmu_replay_record_t's drr_u, and thus we don't need to pad it
-	 * with zeros to make it the same length as we wrote out.
-	 */
-	((dmu_replay_record_t *)ra.buf)->drr_type = DRR_BEGIN;
-	((dmu_replay_record_t *)ra.buf)->drr_pad = 0;
-	((dmu_replay_record_t *)ra.buf)->drr_u.drr_begin = *drrb;
 	if (ra.byteswap) {
-		fletcher_4_incremental_byteswap(ra.buf,
-		    sizeof (dmu_replay_record_t), &ra.zc);
-	} else {
-		fletcher_4_incremental_native(ra.buf,
-		    sizeof (dmu_replay_record_t), &ra.zc);
-	}
-	(void) strcpy(drrb->drr_toname, tosnap); /* for the sync funcs */
-
-	if (ra.byteswap) {
+		struct drr_begin *drrb = drc->drc_drrb;
 		drrb->drr_magic = BSWAP_64(drrb->drr_magic);
 		drrb->drr_version = BSWAP_64(drrb->drr_version);
 		drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
@@ -840,133 +1027,26 @@
 		drrb->drr_fromguid = BSWAP_64(drrb->drr_fromguid);
 	}
 
-	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
-
-	if (drrb->drr_version != DMU_BACKUP_VERSION ||
-	    drrb->drr_type >= DMU_OST_NUMTYPES ||
-	    strchr(drrb->drr_toname, '@') == NULL) {
-		ra.err = EINVAL;
-		goto out;
-	}
-
-	/*
-	 * Process the begin in syncing context.
-	 */
-	if (drrb->drr_fromguid && !online) {
-		/* offline incremental receive */
-
-		dsl_dataset_t *ds = NULL;
-
-		cp = strchr(tosnap, '@');
-		*cp = '\0';
-		ra.err = dsl_dataset_open(tosnap, DS_MODE_EXCLUSIVE, FTAG, &ds);
-		*cp = '@';
-		if (ra.err)
-			goto out;
-
-		/*
-		 * Only do the rollback if the most recent snapshot
-		 * matches the incremental source
-		 */
-		if (force) {
-			if (ds->ds_prev == NULL ||
-			    ds->ds_prev->ds_phys->ds_guid !=
-			    drrb->drr_fromguid) {
-				dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-				kmem_free(ra.buf, ra.bufsize);
-				return (ENODEV);
-			}
-			(void) dsl_dataset_rollback(ds);
-		}
-		ra.err = dsl_sync_task_do(ds->ds_dir->dd_pool,
-		    replay_offline_incremental_check,
-		    replay_offline_incremental_sync, ds, drrb, 1);
-		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-	} else if (drrb->drr_fromguid && online) {
-		/* online incremental receive */
-
-		const char *tail;
-		struct onlineincarg oia = { 0 };
-
-		/*
-		 * Get the dsl_dir for the parent of the
-		 * temporary clone.
-		 */
-		cp = strchr(tosnap, '@');
-		*cp = '\0';
+	ra.vp = vp;
+	ra.voff = *voffp;
+	ra.bufsize = 1<<20;
+	ra.buf = kmem_alloc(ra.bufsize, KM_SLEEP);
 
-		/* tmp clone is: tonsap + '/' + '%' + "snapX" */
-		len = strlen(tosnap) + 2 + strlen(cp + 1) + 1;
-		clonebuf = kmem_alloc(len, KM_SLEEP);
-		(void) snprintf(clonebuf, len, "%s%c%c%s%c",
-		    tosnap, '/', '%', cp + 1, '\0');
-		ra.err = dsl_dir_open(tosnap, FTAG, &oia.dd, &tail);
-		*cp = '@';
-		if (ra.err)
-			goto out;
-
-		/* open the dataset we are logically receiving into */
-		*cp = '\0';
-		ra.err = dsl_dataset_open(tosnap, DS_MODE_STANDARD,
-		    FTAG, &oia.ohds);
-		*cp = '@';
-		if (ra.err) {
-			dsl_dir_close(oia.dd, FTAG);
-			goto out;
-		}
-
-		oia.force = force;
-		oia.cosname = clonebuf;
-		ra.err = dsl_sync_task_do(oia.dd->dd_pool,
-		    replay_online_incremental_check,
-		    replay_online_incremental_sync, &oia, drrb, 5);
-		dsl_dataset_close(oia.ohds, DS_MODE_STANDARD, FTAG);
-		dsl_dir_close(oia.dd, FTAG);
-	} else {
-		/* full backup */
-
-		dsl_dir_t *dd = NULL;
-		const char *tail;
-
-		/* can't restore full backup into topmost fs, for now */
-		if (strrchr(drrb->drr_toname, '/') == NULL) {
-			ra.err = EINVAL;
-			goto out;
-		}
-
-		cp = strchr(tosnap, '@');
-		*cp = '\0';
-		ra.err = dsl_dir_open(tosnap, FTAG, &dd, &tail);
-		*cp = '@';
-		if (ra.err)
-			goto out;
-		if (tail == NULL) {
-			ra.err = EEXIST;
-			goto out;
-		}
-
-		ra.err = dsl_sync_task_do(dd->dd_pool, replay_full_check,
-		    replay_full_sync, dd, drrb, 5);
-		dsl_dir_close(dd, FTAG);
-	}
-	if (ra.err)
-		goto out;
+	/* these were verified in dmu_recv_begin */
+	ASSERT(drc->drc_drrb->drr_version == DMU_BACKUP_STREAM_VERSION);
+	ASSERT(drc->drc_drrb->drr_type < DMU_OST_NUMTYPES);
 
 	/*
 	 * Open the objset we are modifying.
 	 */
+	VERIFY(dmu_objset_open_ds(drc->drc_real_ds, DMU_OST_ANY, &os) == 0);
 
-	cp = strchr(tosnap, '@');
-	*cp = '\0';
-	ra.err = dmu_objset_open(clonebuf == NULL ? tosnap : clonebuf,
-	    DMU_OST_ANY, DS_MODE_PRIMARY | DS_MODE_INCONSISTENT, &os);
-	*cp = '@';
-	ASSERT3U(ra.err, ==, 0);
+	ASSERT(drc->drc_real_ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT);
 
 	/*
 	 * Read records and process them.
 	 */
-	pzc = ra.zc;
+	pcksum = ra.cksum;
 	while (ra.err == 0 &&
 	    NULL != (drr = restore_read(&ra, sizeof (*drr)))) {
 		if (issig(JUSTLOOKING) && issig(FORREAL)) {
@@ -1017,99 +1097,130 @@
 			 * everything before the DRR_END record.
 			 */
 			if (drre.drr_checksum.zc_word[0] != 0 &&
-			    !ZIO_CHECKSUM_EQUAL(drre.drr_checksum, pzc)) {
+			    !ZIO_CHECKSUM_EQUAL(drre.drr_checksum, pcksum)) {
 				ra.err = ECKSUM;
 				goto out;
 			}
-
-			if (clonebuf == NULL) {
-				ra.err = dsl_sync_task_do(dmu_objset_ds(os)->
-				    ds_dir->dd_pool, replay_end_check,
-				    replay_end_sync, os, drrb, 3);
-			}
 			goto out;
 		}
 		default:
 			ra.err = EINVAL;
 			goto out;
 		}
-		pzc = ra.zc;
+		pcksum = ra.cksum;
 	}
 
 out:
-	if (os) {
-		if (drrb->drr_fromguid && online && !ra.err)
-			dmu_objset_name(os, cosname);
-		dmu_objset_close(os);
-	}
+	dmu_objset_close(os);
 
-	/*
-	 * Make sure we don't rollback/destroy unless we actually
-	 * processed the begin properly.  'os' will only be set if this
-	 * is the case.
-	 */
-	if (ra.err && os && tosnap && strchr(tosnap, '@')) {
+	if (ra.err != 0) {
 		/*
 		 * rollback or destroy what we created, so we don't
 		 * leave it in the restoring state.
 		 */
-		dsl_dataset_t *ds;
-		int err;
-
-		cp = strchr(tosnap, '@');
-		*cp = '\0';
-		err = dsl_dataset_open(clonebuf == NULL ? tosnap : clonebuf,
-		    DS_MODE_EXCLUSIVE | DS_MODE_INCONSISTENT,
-		    FTAG, &ds);
-		if (err == 0) {
-			txg_wait_synced(ds->ds_dir->dd_pool, 0);
-			if (drrb->drr_fromguid) {
-				if (clonebuf != NULL) {
-					/*
-					 * online incremental: destroy
-					 * the temporarily created clone.
-					 */
-					dsl_dataset_close(ds, DS_MODE_EXCLUSIVE,
-					    FTAG);
-					(void) dmu_objset_destroy(clonebuf);
-				} else {
-					/*
-					 * offline incremental: rollback to
-					 * most recent snapshot.
-					 */
-					(void) dsl_dataset_rollback(ds);
-					dsl_dataset_close(ds, DS_MODE_EXCLUSIVE,
-					    FTAG);
-				}
-			} else {
-				/* full: destroy whole fs */
-				dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-				(void) dsl_dataset_destroy(tosnap);
-			}
-		}
-		*cp = '@';
+		txg_wait_synced(drc->drc_real_ds->ds_dir->dd_pool, 0);
+		recv_abort_cleanup(drc);
 	}
 
-	if (clonebuf != NULL)
-		kmem_free(clonebuf, len);
 	kmem_free(ra.buf, ra.bufsize);
-	if (sizep)
-		*sizep = ra.voff;
+	*voffp = ra.voff;
 	return (ra.err);
 }
 
-int
-dmu_replay_end_snapshot(char *name, struct drr_begin *drrb)
+struct recvendsyncarg {
+	char *tosnap;
+	uint64_t creation_time;
+	uint64_t toguid;
+};
+
+static int
+recv_end_check(void *arg1, void *arg2, dmu_tx_t *tx)
+{
+	dsl_dataset_t *ds = arg1;
+	struct recvendsyncarg *resa = arg2;
+
+	return (dsl_dataset_snapshot_check(ds, resa->tosnap, tx));
+}
+
+static void
+recv_end_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
-	objset_t *os;
-	int err;
+	dsl_dataset_t *ds = arg1;
+	struct recvendsyncarg *resa = arg2;
+
+	dsl_dataset_snapshot_sync(ds, resa->tosnap, cr, tx);
+
+	/* set snapshot's creation time and guid */
+	dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
+	ds->ds_prev->ds_phys->ds_creation_time = resa->creation_time;
+	ds->ds_prev->ds_phys->ds_guid = resa->toguid;
+	ds->ds_prev->ds_phys->ds_flags &= ~DS_FLAG_INCONSISTENT;
+
+	dmu_buf_will_dirty(ds->ds_dbuf, tx);
+	ds->ds_phys->ds_flags &= ~DS_FLAG_INCONSISTENT;
+}
+
+int
+dmu_recv_end(dmu_recv_cookie_t *drc)
+{
+	int err = 0;
+	int lmode;
+
+	/*
+	 * XXX hack; seems the ds is still dirty and
+	 * dsl_pool_zil_clean() expects it to have a ds_user_ptr (and
+	 * zil), but clone_swap() can close it.
+	 */
+	txg_wait_synced(drc->drc_real_ds->ds_dir->dd_pool, 0);
 
-	err = dmu_objset_open(name, DMU_OST_ZFS, DS_MODE_STANDARD, &os);
-	if (err)
-		return (err);
+	if (dsl_dataset_tryupgrade(drc->drc_real_ds,
+	    DS_MODE_PRIMARY, DS_MODE_EXCLUSIVE)) {
+		lmode = DS_MODE_EXCLUSIVE;
+	} else {
+		recv_abort_cleanup(drc);
+		return (EBUSY);
+	}
+
+	if (drc->drc_logical_ds != drc->drc_real_ds) {
+		if (err == 0 && dsl_dataset_tryupgrade(drc->drc_logical_ds,
+		    DS_MODE_STANDARD, DS_MODE_EXCLUSIVE)) {
+			lmode = DS_MODE_EXCLUSIVE;
+			err = dsl_dataset_clone_swap(drc->drc_real_ds,
+			    drc->drc_logical_ds, drc->drc_force);
+		} else {
+			lmode = DS_MODE_STANDARD;
+			err = EBUSY;
+		}
+	}
+
+	if (err == 0) {
+		struct recvendsyncarg resa;
 
-	err = dsl_sync_task_do(dmu_objset_ds(os)->ds_dir->dd_pool,
-	    replay_end_check, replay_end_sync, os, drrb, 3);
-	dmu_objset_close(os);
+		resa.creation_time = drc->drc_drrb->drr_creation_time;
+		resa.toguid = drc->drc_drrb->drr_toguid;
+		resa.tosnap = drc->drc_tosnap;
+
+		err = dsl_sync_task_do(drc->drc_real_ds->ds_dir->dd_pool,
+		    recv_end_check, recv_end_sync,
+		    drc->drc_logical_ds, &resa, 3);
+		if (err) {
+			if (drc->drc_newfs) {
+				ASSERT(drc->drc_logical_ds == drc->drc_real_ds);
+				(void) dsl_dataset_destroy(drc->drc_real_ds,
+				    dmu_recv_tag);
+				return (err);
+			} else {
+				(void) dsl_dataset_rollback(drc->drc_logical_ds,
+				    DMU_OST_NONE);
+			}
+		}
+	}
+
+	if (drc->drc_logical_ds != drc->drc_real_ds) {
+		/* dsl_dataset_destroy() will close the ds */
+		(void) dsl_dataset_destroy(drc->drc_real_ds, dmu_recv_tag);
+	}
+	/* close the hold from dmu_recv_begin */
+	dsl_dataset_close(drc->drc_logical_ds, lmode, dmu_recv_tag);
 	return (err);
 }