usr/src/uts/common/fs/zfs/dsl_dataset.c
changeset 10242 c40d075fbca6
parent 10204 83c3a84aecef
child 10268 cb380b2e9410
--- a/usr/src/uts/common/fs/zfs/dsl_dataset.c	Sat Aug 01 11:59:47 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/dsl_dataset.c	Sat Aug 01 15:09:50 2009 -0600
@@ -39,6 +39,7 @@
 #include <sys/spa.h>
 #include <sys/zfs_znode.h>
 #include <sys/sunddi.h>
+#include <sys/zvol.h>
 
 static char *dsl_reaper = "the grim reaper";
 
@@ -409,8 +410,15 @@
 					dsl_dataset_rele(origin, FTAG);
 				}
 			}
-		} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
-			err = dsl_dataset_get_snapname(ds);
+		} else {
+			if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
+				err = dsl_dataset_get_snapname(ds);
+			if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
+				err = zap_count(
+				    ds->ds_dir->dd_pool->dp_meta_objset,
+				    ds->ds_phys->ds_userrefs_obj,
+				    &ds->ds_userrefs);
+			}
 		}
 
 		if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
@@ -849,6 +857,7 @@
 	dsl_sync_task_group_t *dstg;
 	char *snapname;
 	char *failed;
+	boolean_t defer;
 };
 
 static int
@@ -856,23 +865,30 @@
 {
 	struct destroyarg *da = arg;
 	dsl_dataset_t *ds;
-	char *cp;
 	int err;
-
-	(void) strcat(name, "@");
-	(void) strcat(name, da->snapname);
-	err = dsl_dataset_own(name, DS_MODE_READONLY | DS_MODE_INCONSISTENT,
+	char *dsname;
+	size_t buflen;
+
+	/* alloc a buffer to hold name@snapname, plus the terminating NULL */
+	buflen = strlen(name) + strlen(da->snapname) + 2;
+	dsname = kmem_alloc(buflen, KM_SLEEP);
+	(void) snprintf(dsname, buflen, "%s@%s", name, da->snapname);
+	err = dsl_dataset_own(dsname, DS_MODE_READONLY | DS_MODE_INCONSISTENT,
 	    da->dstg, &ds);
-	cp = strchr(name, '@');
-	*cp = '\0';
+	kmem_free(dsname, buflen);
 	if (err == 0) {
+		struct dsl_ds_destroyarg *dsda;
+
 		dsl_dataset_make_exclusive(ds, da->dstg);
 		if (ds->ds_user_ptr) {
 			ds->ds_user_evict_func(ds, ds->ds_user_ptr);
 			ds->ds_user_ptr = NULL;
 		}
+		dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg), KM_SLEEP);
+		dsda->ds = ds;
+		dsda->defer = da->defer;
 		dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
-		    dsl_dataset_destroy_sync, ds, da->dstg, 0);
+		    dsl_dataset_destroy_sync, dsda, da->dstg, 0);
 	} else if (err == ENOENT) {
 		err = 0;
 	} else {
@@ -886,7 +902,7 @@
  */
 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
 int
-dsl_snapshots_destroy(char *fsname, char *snapname)
+dsl_snapshots_destroy(char *fsname, char *snapname, boolean_t defer)
 {
 	int err;
 	struct destroyarg da;
@@ -899,6 +915,7 @@
 	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
 	da.snapname = snapname;
 	da.failed = fsname;
+	da.defer = defer;
 
 	err = dmu_objset_find(fsname,
 	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
@@ -908,7 +925,9 @@
 
 	for (dst = list_head(&da.dstg->dstg_tasks); dst;
 	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
-		dsl_dataset_t *ds = dst->dst_arg1;
+		struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
+		dsl_dataset_t *ds = dsda->ds;
+
 		/*
 		 * Return the file system name that triggered the error
 		 */
@@ -916,7 +935,9 @@
 			dsl_dataset_name(ds, fsname);
 			*strchr(fsname, '@') = '\0';
 		}
+		ASSERT3P(dsda->rm_origin, ==, NULL);
 		dsl_dataset_disown(ds, da.dstg);
+		kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
 	}
 
 	dsl_sync_task_group_destroy(da.dstg);
@@ -924,18 +945,100 @@
 	return (err);
 }
 
+static boolean_t
+dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
+{
+	boolean_t might_destroy = B_FALSE;
+
+	mutex_enter(&ds->ds_lock);
+	if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
+	    DS_IS_DEFER_DESTROY(ds))
+		might_destroy = B_TRUE;
+	mutex_exit(&ds->ds_lock);
+
+	return (might_destroy);
+}
+
+#ifdef _KERNEL
+static int
+dsl_dataset_zvol_cleanup(dsl_dataset_t *ds, const char *name)
+{
+	int error;
+	objset_t *os;
+
+	error = dmu_objset_open_ds(ds, DMU_OST_ANY, &os);
+	if (error)
+		return (error);
+
+	if (dmu_objset_type(os) == DMU_OST_ZVOL)
+		error = zvol_remove_minor(name);
+	dmu_objset_close(os);
+
+	return (error);
+}
+#endif
+
+/*
+ * If we're removing a clone, and these three conditions are true:
+ *	1) the clone's origin has no other children
+ *	2) the clone's origin has no user references
+ *	3) the clone's origin has been marked for deferred destruction
+ * Then, prepare to remove the origin as part of this sync task group.
+ */
+static int
+dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
+{
+	dsl_dataset_t *ds = dsda->ds;
+	dsl_dataset_t *origin = ds->ds_prev;
+
+	if (dsl_dataset_might_destroy_origin(origin)) {
+		char *name;
+		int namelen;
+		int error;
+
+		namelen = dsl_dataset_namelen(origin) + 1;
+		name = kmem_alloc(namelen, KM_SLEEP);
+		dsl_dataset_name(origin, name);
+#ifdef _KERNEL
+		error = zfs_unmount_snap(name, NULL);
+		if (error) {
+			kmem_free(name, namelen);
+			return (error);
+		}
+		error = dsl_dataset_zvol_cleanup(origin, name);
+		if (error) {
+			kmem_free(name, namelen);
+			return (error);
+		}
+#endif
+		error = dsl_dataset_own(name,
+		    DS_MODE_READONLY | DS_MODE_INCONSISTENT,
+		    tag, &origin);
+		kmem_free(name, namelen);
+		if (error)
+			return (error);
+		dsda->rm_origin = origin;
+		dsl_dataset_make_exclusive(origin, tag);
+	}
+
+	return (0);
+}
+
 /*
  * ds must be opened as OWNER.  On return (whether successful or not),
  * ds will be closed and caller can no longer dereference it.
  */
 int
-dsl_dataset_destroy(dsl_dataset_t *ds, void *tag)
+dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
 {
 	int err;
 	dsl_sync_task_group_t *dstg;
 	objset_t *os;
 	dsl_dir_t *dd;
 	uint64_t obj;
+	struct dsl_ds_destroyarg dsda = {0};
+
+	dsda.ds = ds;
 
 	if (dsl_dataset_is_snapshot(ds)) {
 		/* Destroying a snapshot is simpler */
@@ -945,9 +1048,12 @@
 			ds->ds_user_evict_func(ds, ds->ds_user_ptr);
 			ds->ds_user_ptr = NULL;
 		}
+		/* NOTE: defer is always B_FALSE for non-snapshots */
+		dsda.defer = defer;
 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
 		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
-		    ds, tag, 0);
+		    &dsda, tag, 0);
+		ASSERT3P(dsda.rm_origin, ==, NULL);
 		goto out;
 	}
 
@@ -1028,13 +1134,45 @@
 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
 		ds->ds_user_ptr = NULL;
 	}
-	dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
-	dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
-	    dsl_dataset_destroy_sync, ds, tag, 0);
-	dsl_sync_task_create(dstg, dsl_dir_destroy_check,
-	    dsl_dir_destroy_sync, dd, FTAG, 0);
-	err = dsl_sync_task_group_wait(dstg);
-	dsl_sync_task_group_destroy(dstg);
+
+	/*
+	 * If we're removing a clone, we might also need to remove its
+	 * origin.
+	 */
+	do {
+		dsda.need_prep = B_FALSE;
+		if (dsl_dir_is_clone(dd)) {
+			err = dsl_dataset_origin_rm_prep(&dsda, tag);
+			if (err) {
+				dsl_dir_close(dd, FTAG);
+				goto out;
+			}
+		}
+
+		dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
+		dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
+		    dsl_dataset_destroy_sync, &dsda, tag, 0);
+		dsl_sync_task_create(dstg, dsl_dir_destroy_check,
+		    dsl_dir_destroy_sync, dd, FTAG, 0);
+		err = dsl_sync_task_group_wait(dstg);
+		dsl_sync_task_group_destroy(dstg);
+
+		/*
+		 * We could be racing against 'zfs release' or 'zfs destroy -d'
+		 * on the origin snap, in which case we can get EBUSY if we
+		 * needed to destroy the origin snap but were not ready to
+		 * do so.
+		 */
+		if (dsda.need_prep) {
+			ASSERT(err == EBUSY);
+			ASSERT(dsl_dir_is_clone(dd));
+			ASSERT(dsda.rm_origin == NULL);
+		}
+	} while (dsda.need_prep);
+
+	if (dsda.rm_origin != NULL)
+		dsl_dataset_disown(dsda.rm_origin, tag);
+
 	/* if it is successful, dsl_dir_destroy_sync will close the dd */
 	if (err)
 		dsl_dir_close(dd, FTAG);
@@ -1391,18 +1529,63 @@
 	    cr, "dataset = %llu", ds->ds_object);
 }
 
+static int
+dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
+    dmu_tx_t *tx)
+{
+	dsl_dataset_t *ds = dsda->ds;
+	dsl_dataset_t *ds_prev = ds->ds_prev;
+
+	if (dsl_dataset_might_destroy_origin(ds_prev)) {
+		struct dsl_ds_destroyarg ndsda = {0};
+
+		/*
+		 * If we're not prepared to remove the origin, don't remove
+		 * the clone either.
+		 */
+		if (dsda->rm_origin == NULL) {
+			dsda->need_prep = B_TRUE;
+			return (EBUSY);
+		}
+
+		ndsda.ds = ds_prev;
+		ndsda.is_origin_rm = B_TRUE;
+		return (dsl_dataset_destroy_check(&ndsda, tag, tx));
+	}
+
+	/*
+	 * If we're not going to remove the origin after all,
+	 * undo the open context setup.
+	 */
+	if (dsda->rm_origin != NULL) {
+		dsl_dataset_disown(dsda->rm_origin, tag);
+		dsda->rm_origin = NULL;
+	}
+
+	return (0);
+}
+
 /* ARGSUSED */
 int
 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
-	dsl_dataset_t *ds = arg1;
+	struct dsl_ds_destroyarg *dsda = arg1;
+	dsl_dataset_t *ds = dsda->ds;
 
 	/* we have an owner hold, so noone else can destroy us */
 	ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
 
-	/* Can't delete a branch point. */
-	if (ds->ds_phys->ds_num_children > 1)
-		return (EEXIST);
+	/*
+	 * Only allow deferred destroy on pools that support it.
+	 * NOTE: deferred destroy is only supported on snapshots.
+	 */
+	if (dsda->defer) {
+		if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
+		    SPA_VERSION_USERREFS)
+			return (ENOTSUP);
+		ASSERT(dsl_dataset_is_snapshot(ds));
+		return (0);
+	}
 
 	/*
 	 * Can't delete a head dataset if there are snapshots of it.
@@ -1420,6 +1603,31 @@
 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
 		return (EAGAIN);
 
+	if (dsl_dataset_is_snapshot(ds)) {
+		/*
+		 * If this snapshot has an elevated user reference count,
+		 * we can't destroy it yet.
+		 */
+		if (ds->ds_userrefs > 0 && !dsda->releasing)
+			return (EBUSY);
+
+		mutex_enter(&ds->ds_lock);
+		/*
+		 * Can't delete a branch point. However, if we're destroying
+		 * a clone and removing its origin due to it having a user
+		 * hold count of 0 and having been marked for deferred destroy,
+		 * it's OK for the origin to have a single clone.
+		 */
+		if (ds->ds_phys->ds_num_children >
+		    (dsda->is_origin_rm ? 2 : 1)) {
+			mutex_exit(&ds->ds_lock);
+			return (EEXIST);
+		}
+		mutex_exit(&ds->ds_lock);
+	} else if (dsl_dir_is_clone(ds->ds_dir)) {
+		return (dsl_dataset_origin_check(dsda, arg2, tx));
+	}
+
 	/* XXX we should do some i/o error checking... */
 	return (0);
 }
@@ -1467,7 +1675,8 @@
 void
 dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
 {
-	dsl_dataset_t *ds = arg1;
+	struct dsl_ds_destroyarg *dsda = arg1;
+	dsl_dataset_t *ds = dsda->ds;
 	zio_t *zio;
 	int err;
 	int after_branch_point = FALSE;
@@ -1477,11 +1686,20 @@
 	uint64_t obj;
 
 	ASSERT(ds->ds_owner);
-	ASSERT3U(ds->ds_phys->ds_num_children, <=, 1);
+	ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
 	ASSERT(ds->ds_prev == NULL ||
 	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
 	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
 
+	if (dsda->defer) {
+		ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
+		if (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1) {
+			dmu_buf_will_dirty(ds->ds_dbuf, tx);
+			ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
+			return;
+		}
+	}
+
 	/* signal any waiters that this dataset is going away */
 	mutex_enter(&ds->ds_lock);
 	ds->ds_owner = dsl_reaper;
@@ -1527,6 +1745,20 @@
 			/* This clone is toast. */
 			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
 			ds_prev->ds_phys->ds_num_children--;
+
+			/*
+			 * If the clone's origin has no other clones, no
+			 * user holds, and has been marked for deferred
+			 * deletion, then we should have done the necessary
+			 * destroy setup for it.
+			 */
+			if (ds_prev->ds_phys->ds_num_children == 1 &&
+			    ds_prev->ds_userrefs == 0 &&
+			    DS_IS_DEFER_DESTROY(ds_prev)) {
+				ASSERT3P(dsda->rm_origin, !=, NULL);
+			} else {
+				ASSERT3P(dsda->rm_origin, ==, NULL);
+			}
 		} else if (!after_branch_point) {
 			ds_prev->ds_phys->ds_next_snap_obj =
 			    ds->ds_phys->ds_next_snap_obj;
@@ -1739,10 +1971,32 @@
 	}
 	if (ds->ds_phys->ds_props_obj != 0)
 		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
+	if (ds->ds_phys->ds_userrefs_obj != 0)
+		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
 	dsl_dir_close(ds->ds_dir, ds);
 	ds->ds_dir = NULL;
 	dsl_dataset_drain_refs(ds, tag);
 	VERIFY(0 == dmu_object_free(mos, obj, tx));
+
+	if (dsda->rm_origin) {
+		/*
+		 * Remove the origin of the clone we just destroyed.
+		 */
+		dsl_dataset_t *origin = ds->ds_prev;
+		struct dsl_ds_destroyarg ndsda = {0};
+
+		ASSERT3P(origin, ==, dsda->rm_origin);
+		if (origin->ds_user_ptr) {
+			origin->ds_user_evict_func(origin, origin->ds_user_ptr);
+			origin->ds_user_ptr = NULL;
+		}
+
+		dsl_dataset_rele(origin, tag);
+		ds->ds_prev = NULL;
+
+		ndsda.ds = origin;
+		dsl_dataset_destroy_sync(&ndsda, tag, cr, tx);
+	}
 }
 
 static int
@@ -1957,6 +2211,9 @@
 	    ds->ds_reserved);
 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
 	    ds->ds_phys->ds_guid);
+	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS, ds->ds_userrefs);
+	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
+	    DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
 
 	if (ds->ds_phys->ds_next_snap_obj) {
 		/*
@@ -3025,7 +3282,7 @@
 
 	ds->ds_quota = new_quota;
 
-	dsl_prop_set_uint64_sync(ds->ds_dir, "refquota", new_quota, cr, tx);
+	dsl_dir_prop_set_uint64_sync(ds->ds_dir, "refquota", new_quota, cr, tx);
 
 	spa_history_internal_log(LOG_DS_REFQUOTA, ds->ds_dir->dd_pool->dp_spa,
 	    tx, cr, "%lld dataset = %llu ",
@@ -3120,7 +3377,7 @@
 
 	dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
 	mutex_exit(&ds->ds_dir->dd_lock);
-	dsl_prop_set_uint64_sync(ds->ds_dir, "refreservation",
+	dsl_dir_prop_set_uint64_sync(ds->ds_dir, "refreservation",
 	    new_reservation, cr, tx);
 
 	spa_history_internal_log(LOG_DS_REFRESERV,
@@ -3144,3 +3401,421 @@
 	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
+
+static int
+dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
+{
+	dsl_dataset_t *ds = arg1;
+	char *htag = arg2;
+	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+	int error = 0;
+
+	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
+		return (ENOTSUP);
+
+	if (!dsl_dataset_is_snapshot(ds))
+		return (EINVAL);
+
+	if (strlen(htag) >= ZAP_MAXNAMELEN)
+		return (ENAMETOOLONG);
+
+	/* tags must be unique */
+	mutex_enter(&ds->ds_lock);
+	if (ds->ds_phys->ds_userrefs_obj) {
+		error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
+		    8, 1, tx);
+		if (error == 0)
+			error = EEXIST;
+		else if (error == ENOENT)
+			error = 0;
+	}
+	mutex_exit(&ds->ds_lock);
+
+	return (error);
+}
+
+static void
+dsl_dataset_user_hold_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
+{
+	dsl_dataset_t *ds = arg1;
+	char *htag = arg2;
+	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+	time_t now = gethrestime_sec();
+	uint64_t zapobj;
+
+	mutex_enter(&ds->ds_lock);
+	if (ds->ds_phys->ds_userrefs_obj == 0) {
+		/*
+		 * This is the first user hold for this dataset.  Create
+		 * the userrefs zap object.
+		 */
+		dmu_buf_will_dirty(ds->ds_dbuf, tx);
+		zapobj = ds->ds_phys->ds_userrefs_obj =
+		    zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
+	} else {
+		zapobj = ds->ds_phys->ds_userrefs_obj;
+	}
+	ds->ds_userrefs++;
+	mutex_exit(&ds->ds_lock);
+
+	VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
+
+	spa_history_internal_log(LOG_DS_USER_HOLD,
+	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "<%s> dataset = %llu",
+	    htag, ds->ds_object);
+}
+
+struct dsl_ds_holdarg {
+	dsl_sync_task_group_t *dstg;
+	char *htag;
+	char *snapname;
+	boolean_t recursive;
+	char failed[MAXPATHLEN];
+};
+
+static int
+dsl_dataset_user_hold_one(char *dsname, void *arg)
+{
+	struct dsl_ds_holdarg *ha = arg;
+	dsl_dataset_t *ds;
+	int error;
+	char *name;
+	size_t buflen;
+
+	/* alloc a buffer to hold dsname@snapname plus terminating NULL */
+	buflen = strlen(dsname) + strlen(ha->snapname) + 2;
+	name = kmem_alloc(buflen, KM_SLEEP);
+	(void) snprintf(name, buflen, "%s@%s", dsname, ha->snapname);
+	error = dsl_dataset_hold(name, ha->dstg, &ds);
+	kmem_free(name, buflen);
+	if (error == 0) {
+		dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
+		    dsl_dataset_user_hold_sync, ds, ha->htag, 0);
+	} else if (error == ENOENT && ha->recursive) {
+		error = 0;
+	} else {
+		(void) strcpy(ha->failed, dsname);
+	}
+	return (error);
+}
+
+int
+dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
+    boolean_t recursive)
+{
+	struct dsl_ds_holdarg *ha;
+	dsl_sync_task_t *dst;
+	spa_t *spa;
+	int error;
+
+	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
+
+	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
+
+	error = spa_open(dsname, &spa, FTAG);
+	if (error) {
+		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
+		return (error);
+	}
+
+	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
+	ha->htag = htag;
+	ha->snapname = snapname;
+	ha->recursive = recursive;
+	if (recursive) {
+		error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
+		    ha, DS_FIND_CHILDREN);
+	} else {
+		error = dsl_dataset_user_hold_one(dsname, ha);
+	}
+	if (error == 0)
+		error = dsl_sync_task_group_wait(ha->dstg);
+
+	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
+	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
+		dsl_dataset_t *ds = dst->dst_arg1;
+
+		if (dst->dst_err) {
+			dsl_dataset_name(ds, ha->failed);
+			*strchr(ha->failed, '@') = '\0';
+		}
+		dsl_dataset_rele(ds, ha->dstg);
+	}
+
+	if (error)
+		(void) strcpy(dsname, ha->failed);
+
+	dsl_sync_task_group_destroy(ha->dstg);
+	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
+	spa_close(spa, FTAG);
+	return (error);
+}
+
+struct dsl_ds_releasearg {
+	dsl_dataset_t *ds;
+	const char *htag;
+	boolean_t own;		/* do we own or just hold ds? */
+};
+
+static int
+dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
+    boolean_t *might_destroy)
+{
+	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+	uint64_t zapobj;
+	uint64_t tmp;
+	int error;
+
+	*might_destroy = B_FALSE;
+
+	mutex_enter(&ds->ds_lock);
+	zapobj = ds->ds_phys->ds_userrefs_obj;
+	if (zapobj == 0) {
+		/* The tag can't possibly exist */
+		mutex_exit(&ds->ds_lock);
+		return (ESRCH);
+	}
+
+	/* Make sure the tag exists */
+	error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
+	if (error) {
+		mutex_exit(&ds->ds_lock);
+		if (error == ENOENT)
+			error = ESRCH;
+		return (error);
+	}
+
+	if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
+	    DS_IS_DEFER_DESTROY(ds))
+		*might_destroy = B_TRUE;
+
+	mutex_exit(&ds->ds_lock);
+	return (0);
+}
+
+static int
+dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
+{
+	struct dsl_ds_releasearg *ra = arg1;
+	dsl_dataset_t *ds = ra->ds;
+	boolean_t might_destroy;
+	int error;
+
+	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
+		return (ENOTSUP);
+
+	error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
+	if (error)
+		return (error);
+
+	if (might_destroy) {
+		struct dsl_ds_destroyarg dsda = {0};
+
+		if (dmu_tx_is_syncing(tx)) {
+			/*
+			 * If we're not prepared to remove the snapshot,
+			 * we can't allow the release to happen right now.
+			 */
+			if (!ra->own)
+				return (EBUSY);
+			if (ds->ds_user_ptr) {
+				ds->ds_user_evict_func(ds, ds->ds_user_ptr);
+				ds->ds_user_ptr = NULL;
+			}
+		}
+		dsda.ds = ds;
+		dsda.releasing = B_TRUE;
+		return (dsl_dataset_destroy_check(&dsda, tag, tx));
+	}
+
+	return (0);
+}
+
+static void
+dsl_dataset_user_release_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
+{
+	struct dsl_ds_releasearg *ra = arg1;
+	dsl_dataset_t *ds = ra->ds;
+	spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
+	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+	uint64_t zapobj;
+	uint64_t dsobj = ds->ds_object;
+	uint64_t refs;
+
+	mutex_enter(&ds->ds_lock);
+	ds->ds_userrefs--;
+	refs = ds->ds_userrefs;
+	mutex_exit(&ds->ds_lock);
+	zapobj = ds->ds_phys->ds_userrefs_obj;
+	VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
+	if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
+	    DS_IS_DEFER_DESTROY(ds)) {
+		struct dsl_ds_destroyarg dsda = {0};
+
+		ASSERT(ra->own);
+		dsda.ds = ds;
+		dsda.releasing = B_TRUE;
+		/* We already did the destroy_check */
+		dsl_dataset_destroy_sync(&dsda, tag, cr, tx);
+	}
+
+	spa_history_internal_log(LOG_DS_USER_RELEASE,
+	    spa, tx, cr, "<%s> %lld dataset = %llu",
+	    ra->htag, (longlong_t)refs, dsobj);
+}
+
+static int
+dsl_dataset_user_release_one(char *dsname, void *arg)
+{
+	struct dsl_ds_holdarg *ha = arg;
+	struct dsl_ds_releasearg *ra;
+	dsl_dataset_t *ds;
+	int error;
+	void *dtag = ha->dstg;
+	char *name;
+	size_t buflen;
+	boolean_t own = B_FALSE;
+	boolean_t might_destroy;
+
+	if (strlen(ha->htag) >= ZAP_MAXNAMELEN)
+		return (ENAMETOOLONG);
+
+	/* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
+	buflen = strlen(dsname) + strlen(ha->snapname) + 2;
+	name = kmem_alloc(buflen, KM_SLEEP);
+	(void) snprintf(name, buflen, "%s@%s", dsname, ha->snapname);
+	error = dsl_dataset_hold(name, dtag, &ds);
+	kmem_free(name, buflen);
+	if (error == ENOENT && ha->recursive)
+		return (0);
+	(void) strcpy(ha->failed, dsname);
+	if (error)
+		return (error);
+
+	ASSERT(dsl_dataset_is_snapshot(ds));
+
+	error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
+	if (error) {
+		dsl_dataset_rele(ds, dtag);
+		return (error);
+	}
+
+	if (might_destroy) {
+#ifdef _KERNEL
+		error = zfs_unmount_snap(name, NULL);
+		if (error) {
+			dsl_dataset_rele(ds, dtag);
+			return (error);
+		}
+		error = dsl_dataset_zvol_cleanup(ds, name);
+		if (error) {
+			dsl_dataset_rele(ds, dtag);
+			return (error);
+		}
+#endif
+		if (!dsl_dataset_tryown(ds,
+		    DS_MODE_READONLY | DS_MODE_INCONSISTENT, dtag)) {
+			dsl_dataset_rele(ds, dtag);
+			return (EBUSY);
+		} else {
+			own = B_TRUE;
+			dsl_dataset_make_exclusive(ds, dtag);
+		}
+	}
+
+	ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
+	ra->ds = ds;
+	ra->htag = ha->htag;
+	ra->own = own;
+	dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
+	    dsl_dataset_user_release_sync, ra, dtag, 0);
+
+	return (0);
+}
+
+int
+dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
+    boolean_t recursive)
+{
+	struct dsl_ds_holdarg *ha;
+	dsl_sync_task_t *dst;
+	spa_t *spa;
+	int error;
+
+	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
+
+	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
+
+	error = spa_open(dsname, &spa, FTAG);
+	if (error) {
+		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
+		return (error);
+	}
+
+	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
+	ha->htag = htag;
+	ha->snapname = snapname;
+	ha->recursive = recursive;
+	if (recursive) {
+		error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
+		    ha, DS_FIND_CHILDREN);
+	} else {
+		error = dsl_dataset_user_release_one(dsname, ha);
+	}
+	if (error == 0)
+		error = dsl_sync_task_group_wait(ha->dstg);
+
+	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
+	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
+		struct dsl_ds_releasearg *ra = dst->dst_arg1;
+		dsl_dataset_t *ds = ra->ds;
+
+		if (dst->dst_err)
+			dsl_dataset_name(ds, ha->failed);
+
+		if (ra->own)
+			dsl_dataset_disown(ds, ha->dstg);
+		else
+			dsl_dataset_rele(ds, ha->dstg);
+
+		kmem_free(ra, sizeof (struct dsl_ds_releasearg));
+	}
+
+	if (error)
+		(void) strcpy(dsname, ha->failed);
+
+	dsl_sync_task_group_destroy(ha->dstg);
+	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
+	spa_close(spa, FTAG);
+	return (error);
+}
+
+int
+dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
+{
+	dsl_dataset_t *ds;
+	int err;
+
+	err = dsl_dataset_hold(dsname, FTAG, &ds);
+	if (err)
+		return (err);
+
+	VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
+	if (ds->ds_phys->ds_userrefs_obj != 0) {
+		zap_attribute_t *za;
+		zap_cursor_t zc;
+
+		za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
+		for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
+		    ds->ds_phys->ds_userrefs_obj);
+		    zap_cursor_retrieve(&zc, za) == 0;
+		    zap_cursor_advance(&zc)) {
+			VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
+			    za->za_first_integer));
+		}
+		zap_cursor_fini(&zc);
+		kmem_free(za, sizeof (zap_attribute_t));
+	}
+	dsl_dataset_rele(ds, FTAG);
+	return (0);
+}