usr/src/uts/common/fs/zfs/dmu_send.c
changeset 10272 a0669934e974
parent 10242 c40d075fbca6
child 10298 a0d52501437c
--- a/usr/src/uts/common/fs/zfs/dmu_send.c	Thu Aug 06 17:39:39 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/dmu_send.c	Thu Aug 06 22:16:07 2009 -0700
@@ -321,31 +321,9 @@
 	dsl_dataset_t *ds; /* the ds to recv into; returned from the syncfunc */
 };
 
-static dsl_dataset_t *
-recv_full_sync_impl(dsl_pool_t *dp, uint64_t dsobj, dmu_objset_type_t type,
-    cred_t *cr, dmu_tx_t *tx)
-{
-	dsl_dataset_t *ds;
-
-	/* This should always work, since we just created it */
-	/* XXX - create should return an owned ds */
-	VERIFY(0 == dsl_dataset_own_obj(dp, dsobj,
-	    DS_MODE_INCONSISTENT, dmu_recv_tag, &ds));
-
-	if (type != DMU_OST_NONE) {
-		(void) dmu_objset_create_impl(dp->dp_spa,
-		    ds, &ds->ds_phys->ds_bp, type, tx);
-	}
-
-	spa_history_internal_log(LOG_DS_REPLAY_FULL_SYNC,
-	    dp->dp_spa, tx, cr, "dataset = %lld", dsobj);
-
-	return (ds);
-}
-
 /* ARGSUSED */
 static int
-recv_full_check(void *arg1, void *arg2, dmu_tx_t *tx)
+recv_new_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
 	dsl_dir_t *dd = arg1;
 	struct recvbeginsyncarg *rbsa = arg2;
@@ -363,7 +341,7 @@
 		/* make sure it's a snap in the same pool */
 		if (rbsa->origin->ds_dir->dd_pool != dd->dd_pool)
 			return (EXDEV);
-		if (rbsa->origin->ds_phys->ds_num_children == 0)
+		if (!dsl_dataset_is_snapshot(rbsa->origin))
 			return (EINVAL);
 		if (rbsa->origin->ds_phys->ds_guid != rbsa->fromguid)
 			return (ENODEV);
@@ -373,82 +351,31 @@
 }
 
 static void
-recv_full_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
+recv_new_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
 	dsl_dir_t *dd = arg1;
 	struct recvbeginsyncarg *rbsa = arg2;
 	uint64_t flags = DS_FLAG_INCONSISTENT | rbsa->dsflags;
 	uint64_t dsobj;
 
+	/* Create and open new dataset. */
 	dsobj = dsl_dataset_create_sync(dd, strrchr(rbsa->tofs, '/') + 1,
 	    rbsa->origin, flags, cr, tx);
-
-	rbsa->ds = recv_full_sync_impl(dd->dd_pool, dsobj,
-	    rbsa->origin ? DMU_OST_NONE : rbsa->type, cr, tx);
-}
-
-static int
-recv_full_existing_check(void *arg1, void *arg2, dmu_tx_t *tx)
-{
-	dsl_dataset_t *ds = arg1;
-	struct recvbeginsyncarg *rbsa = arg2;
-	int err;
-	struct dsl_ds_destroyarg dsda = {0};
-
-	/* must be a head ds */
-	if (ds->ds_phys->ds_next_snap_obj != 0)
-		return (EINVAL);
+	VERIFY(0 == dsl_dataset_own_obj(dd->dd_pool, dsobj,
+	    DS_MODE_INCONSISTENT, dmu_recv_tag, &rbsa->ds));
 
-	/* must not be a clone ds */
-	if (dsl_dir_is_clone(ds->ds_dir))
-		return (EINVAL);
-
-	dsda.ds = ds;
-	err = dsl_dataset_destroy_check(&dsda, rbsa->tag, tx);
-	if (err)
-		return (err);
-
-	if (rbsa->origin) {
-		/* make sure it's a snap in the same pool */
-		if (rbsa->origin->ds_dir->dd_pool != ds->ds_dir->dd_pool)
-			return (EXDEV);
-		if (rbsa->origin->ds_phys->ds_num_children == 0)
-			return (EINVAL);
-		if (rbsa->origin->ds_phys->ds_guid != rbsa->fromguid)
-			return (ENODEV);
+	if (rbsa->origin == NULL) {
+		(void) dmu_objset_create_impl(dd->dd_pool->dp_spa,
+		    rbsa->ds, &rbsa->ds->ds_phys->ds_bp, rbsa->type, tx);
 	}
 
-	return (0);
-}
-
-static void
-recv_full_existing_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
-{
-	dsl_dataset_t *ds = arg1;
-	struct recvbeginsyncarg *rbsa = arg2;
-	dsl_dir_t *dd = ds->ds_dir;
-	uint64_t flags = DS_FLAG_INCONSISTENT | rbsa->dsflags;
-	uint64_t dsobj;
-	struct dsl_ds_destroyarg dsda = {0};
-
-	/*
-	 * NB: caller must provide an extra hold on the dsl_dir_t, so it
-	 * won't go away when dsl_dataset_destroy_sync() closes the
-	 * dataset.
-	 */
-	dsda.ds = ds;
-	dsl_dataset_destroy_sync(&dsda, rbsa->tag, cr, tx);
-	ASSERT3P(dsda.rm_origin, ==, NULL);
-
-	dsobj = dsl_dataset_create_sync_dd(dd, rbsa->origin, flags, tx);
-
-	rbsa->ds = recv_full_sync_impl(dd->dd_pool, dsobj,
-	    rbsa->origin ? DMU_OST_NONE : rbsa->type, cr, tx);
+	spa_history_internal_log(LOG_DS_REPLAY_FULL_SYNC,
+	    dd->dd_pool->dp_spa, tx, cr, "dataset = %lld", dsobj);
 }
 
 /* ARGSUSED */
 static int
-recv_incremental_check(void *arg1, void *arg2, dmu_tx_t *tx)
+recv_existing_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
 	dsl_dataset_t *ds = arg1;
 	struct recvbeginsyncarg *rbsa = arg2;
@@ -459,13 +386,17 @@
 	if (!rbsa->force && dsl_dataset_modified_since_lastsnap(ds))
 		return (ETXTBSY);
 
-	/* must already be a snapshot of this fs */
-	if (ds->ds_phys->ds_prev_snap_obj == 0)
-		return (ENODEV);
-
-	/* most recent snapshot must match fromguid */
-	if (ds->ds_prev->ds_phys->ds_guid != rbsa->fromguid)
-		return (ENODEV);
+	if (rbsa->fromguid) {
+		/* if incremental, most recent snapshot must match fromguid */
+		if (ds->ds_prev == NULL)
+			return (ENODEV);
+		if (ds->ds_prev->ds_phys->ds_guid != rbsa->fromguid)
+			return (ENODEV);
+	} else {
+		/* if full, most recent snapshot must be $ORIGIN */
+		if (ds->ds_phys->ds_prev_snap_txg >= TXG_INITIAL)
+			return (ENODEV);
+	}
 
 	/* temporary clone name must not exist */
 	err = zap_lookup(ds->ds_dir->dd_pool->dp_meta_objset,
@@ -488,26 +419,30 @@
 
 /* ARGSUSED */
 static void
-recv_incremental_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
+recv_existing_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
 	dsl_dataset_t *ohds = arg1;
 	struct recvbeginsyncarg *rbsa = arg2;
 	dsl_pool_t *dp = ohds->ds_dir->dd_pool;
-	dsl_dataset_t *ods, *cds;
+	dsl_dataset_t *cds;
 	uint64_t flags = DS_FLAG_INCONSISTENT | rbsa->dsflags;
 	uint64_t dsobj;
 
-	/* create the temporary clone */
-	VERIFY(0 == dsl_dataset_hold_obj(dp, ohds->ds_phys->ds_prev_snap_obj,
-	    FTAG, &ods));
-	dsobj = dsl_dataset_create_sync(ohds->ds_dir,
-	    rbsa->clonelastname, ods, flags, cr, tx);
-	dsl_dataset_rele(ods, FTAG);
-
-	/* open the temporary clone */
+	/* create and open the temporary clone */
+	dsobj = dsl_dataset_create_sync(ohds->ds_dir, rbsa->clonelastname,
+	    ohds->ds_prev, flags, cr, tx);
 	VERIFY(0 == dsl_dataset_own_obj(dp, dsobj,
 	    DS_MODE_INCONSISTENT, dmu_recv_tag, &cds));
 
+	/*
+	 * If we actually created a non-clone, we need to create the
+	 * objset in our new dataset.
+	 */
+	if (BP_IS_HOLE(dsl_dataset_get_blkptr(cds))) {
+		(void) dmu_objset_create_impl(dp->dp_spa,
+		    cds, dsl_dataset_get_blkptr(cds), rbsa->type, tx);
+	}
+
 	/* copy the refquota from the target fs to the clone */
 	if (ohds->ds_quota > 0)
 		dsl_dataset_set_quota_sync(cds, &ohds->ds_quota, cr, tx);
@@ -528,7 +463,7 @@
 {
 	int err = 0;
 	boolean_t byteswap;
-	struct recvbeginsyncarg rbsa;
+	struct recvbeginsyncarg rbsa = { 0 };
 	uint64_t version;
 	int flags;
 	dsl_dataset_t *ds;
@@ -573,17 +508,17 @@
 	/*
 	 * Process the begin in syncing context.
 	 */
-	if (rbsa.fromguid && !(flags & DRR_FLAG_CLONE)) {
-		/* incremental receive */
+
+	/* open the dataset we are logically receiving into */
+	err = dsl_dataset_hold(tofs, dmu_recv_tag, &ds);
+	if (err == 0) {
+		/* target fs already exists; recv into temp clone */
 
-		/* tmp clone name is: tofs/%tosnap" */
-		(void) snprintf(rbsa.clonelastname, sizeof (rbsa.clonelastname),
-		    "%%%s", tosnap);
-
-		/* open the dataset we are logically receiving into */
-		err = dsl_dataset_hold(tofs, dmu_recv_tag, &ds);
-		if (err)
-			return (err);
+		/* Can't recv a clone into an existing fs */
+		if (flags & DRR_FLAG_CLONE) {
+			dsl_dataset_rele(ds, dmu_recv_tag);
+			return (EINVAL);
+		}
 
 		/* must not have an incremental recv already in progress */
 		if (!mutex_tryenter(&ds->ds_recvlock)) {
@@ -591,10 +526,12 @@
 			return (EBUSY);
 		}
 
+		/* tmp clone name is: tofs/%tosnap" */
+		(void) snprintf(rbsa.clonelastname, sizeof (rbsa.clonelastname),
+		    "%%%s", tosnap);
 		rbsa.force = force;
 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
-		    recv_incremental_check,
-		    recv_incremental_sync, ds, &rbsa, 5);
+		    recv_existing_check, recv_existing_sync, ds, &rbsa, 5);
 		if (err) {
 			mutex_exit(&ds->ds_recvlock);
 			dsl_dataset_rele(ds, dmu_recv_tag);
@@ -602,47 +539,36 @@
 		}
 		drc->drc_logical_ds = ds;
 		drc->drc_real_ds = rbsa.ds;
-	} else {
-		/* create new fs -- full backup or clone */
-		dsl_dir_t *dd = NULL;
-		const char *tail;
+	} else if (err == ENOENT) {
+		/* target fs does not exist; must be a full backup or clone */
+		dsl_dataset_t *parent;
+		char *cp;
 
-		err = dsl_dir_open(tofs, FTAG, &dd, &tail);
+		/*
+		 * If it's a non-clone incremental, we are missing the
+		 * target fs, so fail the recv.
+		 */
+		if (rbsa.fromguid && !(flags & DRR_FLAG_CLONE))
+			return (ENOENT);
+
+		/* Open the parent of tofs */
+		cp = strrchr(tofs, '/');
+		*cp = '\0';
+		err = dsl_dataset_hold(tofs, FTAG, &parent);
+		*cp = '/';
 		if (err)
 			return (err);
-		if (tail == NULL) {
-			if (!force) {
-				dsl_dir_close(dd, FTAG);
-				return (EEXIST);
-			}
 
-			rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
-			err = dsl_dataset_own_obj(dd->dd_pool,
-			    dd->dd_phys->dd_head_dataset_obj,
-			    DS_MODE_INCONSISTENT, FTAG, &ds);
-			rw_exit(&dd->dd_pool->dp_config_rwlock);
-			if (err) {
-				dsl_dir_close(dd, FTAG);
-				return (err);
-			}
-
-			dsl_dataset_make_exclusive(ds, FTAG);
-			err = dsl_sync_task_do(dd->dd_pool,
-			    recv_full_existing_check,
-			    recv_full_existing_sync, ds, &rbsa, 5);
-			dsl_dataset_disown(ds, FTAG);
-		} else {
-			err = dsl_sync_task_do(dd->dd_pool, recv_full_check,
-			    recv_full_sync, dd, &rbsa, 5);
-		}
-		dsl_dir_close(dd, FTAG);
+		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
+		    recv_new_check, recv_new_sync, ds->ds_dir, &rbsa, 5);
+		dsl_dataset_rele(parent, FTAG);
 		if (err)
 			return (err);
 		drc->drc_logical_ds = drc->drc_real_ds = rbsa.ds;
 		drc->drc_newfs = B_TRUE;
 	}
 
-	return (0);
+	return (err);
 }
 
 struct restorearg {
@@ -1079,37 +1005,31 @@
 	ds->ds_phys->ds_flags &= ~DS_FLAG_INCONSISTENT;
 }
 
-int
-dmu_recv_end(dmu_recv_cookie_t *drc)
+static int
+dmu_recv_existing_end(dmu_recv_cookie_t *drc)
 {
 	struct recvendsyncarg resa;
 	dsl_dataset_t *ds = drc->drc_logical_ds;
 	int err;
 
 	/*
-	 * XXX hack; seems the ds is still dirty and
-	 * dsl_pool_zil_clean() expects it to have a ds_user_ptr
-	 * (and zil), but clone_swap() can close it.
+	 * XXX hack; seems the ds is still dirty and dsl_pool_zil_clean()
+	 * expects it to have a ds_user_ptr (and zil), but clone_swap()
+	 * can close it.
 	 */
 	txg_wait_synced(ds->ds_dir->dd_pool, 0);
 
-	if (ds != drc->drc_real_ds) {
-		/* we are doing an online recv */
-		if (dsl_dataset_tryown(ds, FALSE, dmu_recv_tag)) {
-			err = dsl_dataset_clone_swap(drc->drc_real_ds, ds,
-			    drc->drc_force);
-			if (err)
-				dsl_dataset_disown(ds, dmu_recv_tag);
-		} else {
-			err = EBUSY;
-			dsl_dataset_rele(ds, dmu_recv_tag);
-		}
-		/* dsl_dataset_destroy() will disown the ds */
+	if (dsl_dataset_tryown(ds, FALSE, dmu_recv_tag)) {
+		err = dsl_dataset_clone_swap(drc->drc_real_ds, ds,
+		    drc->drc_force);
+		if (err)
+			goto out;
+	} else {
+		mutex_exit(&ds->ds_recvlock);
+		dsl_dataset_rele(ds, dmu_recv_tag);
 		(void) dsl_dataset_destroy(drc->drc_real_ds, dmu_recv_tag,
 		    B_FALSE);
-		mutex_exit(&drc->drc_logical_ds->ds_recvlock);
-		if (err)
-			return (err);
+		return (EBUSY);
 	}
 
 	resa.creation_time = drc->drc_drrb->drr_creation_time;
@@ -1119,17 +1039,52 @@
 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
 	    recv_end_check, recv_end_sync, ds, &resa, 3);
 	if (err) {
-		if (drc->drc_newfs) {
-			ASSERT(ds == drc->drc_real_ds);
-			(void) dsl_dataset_destroy(ds, dmu_recv_tag,
-			    B_FALSE);
-			return (err);
-		} else {
-			(void) dsl_dataset_rollback(ds, DMU_OST_NONE);
-		}
+		/* swap back */
+		(void) dsl_dataset_clone_swap(drc->drc_real_ds, ds, B_TRUE);
 	}
 
-	/* release the hold from dmu_recv_begin */
+out:
+	mutex_exit(&ds->ds_recvlock);
 	dsl_dataset_disown(ds, dmu_recv_tag);
+	(void) dsl_dataset_destroy(drc->drc_real_ds, dmu_recv_tag, B_FALSE);
 	return (err);
 }
+
+static int
+dmu_recv_new_end(dmu_recv_cookie_t *drc)
+{
+	struct recvendsyncarg resa;
+	dsl_dataset_t *ds = drc->drc_logical_ds;
+	int err;
+
+	/*
+	 * XXX hack; seems the ds is still dirty and dsl_pool_zil_clean()
+	 * expects it to have a ds_user_ptr (and zil), but clone_swap()
+	 * can close it.
+	 */
+	txg_wait_synced(ds->ds_dir->dd_pool, 0);
+
+	resa.creation_time = drc->drc_drrb->drr_creation_time;
+	resa.toguid = drc->drc_drrb->drr_toguid;
+	resa.tosnap = drc->drc_tosnap;
+
+	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
+	    recv_end_check, recv_end_sync, ds, &resa, 3);
+	if (err) {
+		/* clean up the fs we just recv'd into */
+		(void) dsl_dataset_destroy(ds, dmu_recv_tag, B_FALSE);
+	} else {
+		/* release the hold from dmu_recv_begin */
+		dsl_dataset_disown(ds, dmu_recv_tag);
+	}
+	return (err);
+}
+
+int
+dmu_recv_end(dmu_recv_cookie_t *drc)
+{
+	if (drc->drc_logical_ds != drc->drc_real_ds)
+		return (dmu_recv_existing_end(drc));
+	else
+		return (dmu_recv_new_end(drc));
+}