PSARC 2009/510 ZFS received properties
authorTom Erickson <Tom.Erickson@Sun.COM>
Mon, 09 Nov 2009 20:45:32 -0800
changeset 11022 63ab26072e41
parent 11021 c5323cd5fa92
child 11023 53231798f4a8
PSARC 2009/510 ZFS received properties 6850025 want to preserve local properties for zfs_recv 6839260 want zfs send with properties 6855486 zfs_receive should keep trying to set properties even after some fail 6850030 snapshots on read-only dataset shouldn't affect zfs_receive
usr/src/cmd/truss/codes.c
usr/src/cmd/zfs/zfs_iter.c
usr/src/cmd/zfs/zfs_iter.h
usr/src/cmd/zfs/zfs_main.c
usr/src/cmd/zpool/zpool_main.c
usr/src/cmd/ztest/ztest.c
usr/src/common/nvpair/nvpair.c
usr/src/grub/capability
usr/src/grub/grub-0.97/stage2/zfs-include/zfs.h
usr/src/lib/libnvpair/mapfile-vers
usr/src/lib/libzfs/common/libzfs.h
usr/src/lib/libzfs/common/libzfs_dataset.c
usr/src/lib/libzfs/common/libzfs_impl.h
usr/src/lib/libzfs/common/libzfs_sendrecv.c
usr/src/lib/libzfs/common/libzfs_util.c
usr/src/lib/libzfs/common/mapfile-vers
usr/src/lib/libzpool/common/sys/zfs_context.h
usr/src/uts/common/fs/zfs/dmu_objset.c
usr/src/uts/common/fs/zfs/dmu_send.c
usr/src/uts/common/fs/zfs/dsl_dataset.c
usr/src/uts/common/fs/zfs/dsl_deleg.c
usr/src/uts/common/fs/zfs/dsl_dir.c
usr/src/uts/common/fs/zfs/dsl_prop.c
usr/src/uts/common/fs/zfs/spa.c
usr/src/uts/common/fs/zfs/spa_misc.c
usr/src/uts/common/fs/zfs/sys/dsl_dataset.h
usr/src/uts/common/fs/zfs/sys/dsl_dir.h
usr/src/uts/common/fs/zfs/sys/dsl_prop.h
usr/src/uts/common/fs/zfs/sys/zap.h
usr/src/uts/common/fs/zfs/sys/zfs_context.h
usr/src/uts/common/fs/zfs/zap.c
usr/src/uts/common/fs/zfs/zap_micro.c
usr/src/uts/common/fs/zfs/zfs_fuid.c
usr/src/uts/common/fs/zfs/zfs_ioctl.c
usr/src/uts/common/fs/zfs/zfs_vfsops.c
usr/src/uts/common/fs/zfs/zvol.c
usr/src/uts/common/sys/fs/zfs.h
usr/src/uts/common/sys/nvpair.h
--- a/usr/src/cmd/truss/codes.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/cmd/truss/codes.c	Mon Nov 09 20:45:32 2009 -0800
@@ -1247,6 +1247,8 @@
 		"zfs_cmd_t" },
 	{ (uint_t)ZFS_IOC_GET_HOLDS,		"ZFS_IOC_GET_HOLDS",
 		"zfs_cmd_t" },
+	{ (uint_t)ZFS_IOC_OBJSET_RECVD_PROPS,	"ZFS_IOC_OBJSET_RECVD_PROPS",
+		"zfs_cmd_t" },
 
 	/* kssl ioctls */
 	{ (uint_t)KSSL_ADD_ENTRY,		"KSSL_ADD_ENTRY",
--- a/usr/src/cmd/zfs/zfs_iter.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/cmd/zfs/zfs_iter.c	Mon Nov 09 20:45:32 2009 -0800
@@ -107,7 +107,8 @@
 					zfs_prune_proplist(zhp,
 					    cb->cb_props_table);
 
-				if (zfs_expand_proplist(zhp, cb->cb_proplist)
+				if (zfs_expand_proplist(zhp, cb->cb_proplist,
+				    (cb->cb_flags & ZFS_ITER_RECVD_PROPS))
 				    != 0) {
 					free(node);
 					return (-1);
--- a/usr/src/cmd/zfs/zfs_iter.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/cmd/zfs/zfs_iter.h	Mon Nov 09 20:45:32 2009 -0800
@@ -42,6 +42,7 @@
 #define	ZFS_ITER_ARGS_CAN_BE_PATHS (1 << 1)
 #define	ZFS_ITER_PROP_LISTSNAPS    (1 << 2)
 #define	ZFS_ITER_DEPTH_LIMIT	   (1 << 3)
+#define	ZFS_ITER_RECVD_PROPS	   (1 << 4)
 
 int zfs_for_each(int, char **, int options, zfs_type_t,
     zfs_sort_column_t *, zprop_list_t **, int, zfs_iter_f, void *);
--- a/usr/src/cmd/zfs/zfs_main.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/cmd/zfs/zfs_main.c	Mon Nov 09 20:45:32 2009 -0800
@@ -157,7 +157,7 @@
 	{ "list",	zfs_do_list,		HELP_LIST		},
 	{ NULL },
 	{ "set",	zfs_do_set,		HELP_SET		},
-	{ "get", 	zfs_do_get,		HELP_GET		},
+	{ "get",	zfs_do_get,		HELP_GET		},
 	{ "inherit",	zfs_do_inherit,		HELP_INHERIT		},
 	{ "upgrade",	zfs_do_upgrade,		HELP_UPGRADE		},
 	{ "userspace",	zfs_do_userspace,	HELP_USERSPACE		},
@@ -201,11 +201,11 @@
 		    "\tdestroy [-rRd] <snapshot>\n"));
 	case HELP_GET:
 		return (gettext("\tget [-rHp] [-d max] "
-		    "[-o field[,...]] [-s source[,...]]\n"
+		    "[-o \"all\" | field[,...]] [-s source[,...]]\n"
 		    "\t    <\"all\" | property[,...]> "
 		    "[filesystem|volume|snapshot] ...\n"));
 	case HELP_INHERIT:
-		return (gettext("\tinherit [-r] <property> "
+		return (gettext("\tinherit [-rS] <property> "
 		    "<filesystem|volume|snapshot> ...\n"));
 	case HELP_UPGRADE:
 		return (gettext("\tupgrade [-v]\n"
@@ -232,7 +232,7 @@
 	case HELP_ROLLBACK:
 		return (gettext("\trollback [-rRf] <snapshot>\n"));
 	case HELP_SEND:
-		return (gettext("\tsend [-RD] [-[iI] snapshot] <snapshot>\n"));
+		return (gettext("\tsend [-RDp] [-[iI] snapshot] <snapshot>\n"));
 	case HELP_SET:
 		return (gettext("\tset <property=value> "
 		    "<filesystem|volume|snapshot> ...\n"));
@@ -787,9 +787,9 @@
  * zfs destroy [-rRf] <fs, vol>
  * zfs destroy [-rRd] <snap>
  *
- * 	-r	Recursively destroy all children
- * 	-R	Recursively destroy all dependents, including clones
- * 	-f	Force unmounting of any dependents
+ *	-r	Recursively destroy all children
+ *	-R	Recursively destroy all dependents, including clones
+ *	-f	Force unmounting of any dependents
  *	-d	If we can't destroy now, mark for deferred destruction
  *
  * Destroys the given dataset.  By default, it will unmount any filesystems,
@@ -1068,18 +1068,32 @@
 	return (0);
 }
 
+static boolean_t
+is_recvd_column(zprop_get_cbdata_t *cbp)
+{
+	int i;
+	zfs_get_column_t col;
+
+	for (i = 0; i < ZFS_GET_NCOLS &&
+	    (col = cbp->cb_columns[i]) != GET_COL_NONE; i++)
+		if (col == GET_COL_RECVD)
+			return (B_TRUE);
+	return (B_FALSE);
+}
+
 /*
- * zfs get [-rHp] [-o field[,field]...] [-s source[,source]...]
- * 	< all | property[,property]... > < fs | snap | vol > ...
+ * zfs get [-rHp] [-o all | field[,field]...] [-s source[,source]...]
+ *	< all | property[,property]... > < fs | snap | vol > ...
  *
  *	-r	recurse over any child datasets
  *	-H	scripted mode.  Headers are stripped, and fields are separated
  *		by tabs instead of spaces.
- *	-o	Set of fields to display.  One of "name,property,value,source".
- *		Default is all four.
+ *	-o	Set of fields to display.  One of "name,property,value,
+ *		received,source". Default is "name,property,value,source".
+ *		"all" is an alias for all five.
  *	-s	Set of sources to allow.  One of
- *		"local,default,inherited,temporary,none".  Default is all
- *		five.
+ *		"local,default,inherited,received,temporary,none".  Default is
+ *		all six.
  *	-p	Display values in parsable (literal) format.
  *
  *  Prints properties for the given datasets.  The user can control which
@@ -1093,16 +1107,19 @@
 get_callback(zfs_handle_t *zhp, void *data)
 {
 	char buf[ZFS_MAXPROPLEN];
+	char rbuf[ZFS_MAXPROPLEN];
 	zprop_source_t sourcetype;
 	char source[ZFS_MAXNAMELEN];
 	zprop_get_cbdata_t *cbp = data;
-	nvlist_t *userprop = zfs_get_user_props(zhp);
+	nvlist_t *user_props = zfs_get_user_props(zhp);
 	zprop_list_t *pl = cbp->cb_proplist;
 	nvlist_t *propval;
 	char *strval;
 	char *sourceval;
+	boolean_t received = is_recvd_column(cbp);
 
 	for (; pl != NULL; pl = pl->pl_next) {
+		char *recvdval = NULL;
 		/*
 		 * Skip the special fake placeholder.  This will also skip over
 		 * the name property when 'all' is specified.
@@ -1129,9 +1146,14 @@
 				(void) strlcpy(buf, "-", sizeof (buf));
 			}
 
+			if (received && (zfs_prop_get_recvd(zhp,
+			    zfs_prop_to_name(pl->pl_prop), rbuf, sizeof (rbuf),
+			    cbp->cb_literal) == 0))
+				recvdval = rbuf;
+
 			zprop_print_one_property(zfs_get_name(zhp), cbp,
 			    zfs_prop_to_name(pl->pl_prop),
-			    buf, sourcetype, source);
+			    buf, sourcetype, source, recvdval);
 		} else if (zfs_prop_userquota(pl->pl_user_prop)) {
 			sourcetype = ZPROP_SRC_LOCAL;
 
@@ -1142,9 +1164,9 @@
 			}
 
 			zprop_print_one_property(zfs_get_name(zhp), cbp,
-			    pl->pl_user_prop, buf, sourcetype, source);
+			    pl->pl_user_prop, buf, sourcetype, source, NULL);
 		} else {
-			if (nvlist_lookup_nvlist(userprop,
+			if (nvlist_lookup_nvlist(user_props,
 			    pl->pl_user_prop, &propval) != 0) {
 				if (pl->pl_all)
 					continue;
@@ -1159,6 +1181,9 @@
 				if (strcmp(sourceval,
 				    zfs_get_name(zhp)) == 0) {
 					sourcetype = ZPROP_SRC_LOCAL;
+				} else if (strcmp(sourceval,
+				    ZPROP_SOURCE_VAL_RECVD) == 0) {
+					sourcetype = ZPROP_SRC_RECEIVED;
 				} else {
 					sourcetype = ZPROP_SRC_INHERITED;
 					(void) strlcpy(source,
@@ -1166,9 +1191,14 @@
 				}
 			}
 
+			if (received && (zfs_prop_get_recvd(zhp,
+			    pl->pl_user_prop, rbuf, sizeof (rbuf),
+			    cbp->cb_literal) == 0))
+				recvdval = rbuf;
+
 			zprop_print_one_property(zfs_get_name(zhp), cbp,
 			    pl->pl_user_prop, strval, sourcetype,
-			    source);
+			    source, recvdval);
 		}
 	}
 
@@ -1224,10 +1254,10 @@
 			i = 0;
 			while (*optarg != '\0') {
 				static char *col_subopts[] =
-				    { "name", "property", "value", "source",
-				    NULL };
-
-				if (i == 4) {
+				    { "name", "property", "value", "received",
+				    "source", "all", NULL };
+
+				if (i == ZFS_GET_NCOLS) {
 					(void) fprintf(stderr, gettext("too "
 					    "many fields given to -o "
 					    "option\n"));
@@ -1246,8 +1276,28 @@
 					cb.cb_columns[i++] = GET_COL_VALUE;
 					break;
 				case 3:
+					cb.cb_columns[i++] = GET_COL_RECVD;
+					flags |= ZFS_ITER_RECVD_PROPS;
+					break;
+				case 4:
 					cb.cb_columns[i++] = GET_COL_SOURCE;
 					break;
+				case 5:
+					if (i > 0) {
+						(void) fprintf(stderr,
+						    gettext("\"all\" conflicts "
+						    "with specific fields "
+						    "given to -o option\n"));
+						usage(B_FALSE);
+					}
+					cb.cb_columns[0] = GET_COL_NAME;
+					cb.cb_columns[1] = GET_COL_PROPERTY;
+					cb.cb_columns[2] = GET_COL_VALUE;
+					cb.cb_columns[3] = GET_COL_RECVD;
+					cb.cb_columns[4] = GET_COL_SOURCE;
+					flags |= ZFS_ITER_RECVD_PROPS;
+					i = ZFS_GET_NCOLS;
+					break;
 				default:
 					(void) fprintf(stderr,
 					    gettext("invalid column name "
@@ -1262,7 +1312,8 @@
 			while (*optarg != '\0') {
 				static char *source_subopts[] = {
 					"local", "default", "inherited",
-					"temporary", "none", NULL };
+					"received", "temporary", "none",
+					NULL };
 
 				switch (getsubopt(&optarg, source_subopts,
 				    &value)) {
@@ -1276,9 +1327,12 @@
 					cb.cb_sources |= ZPROP_SRC_INHERITED;
 					break;
 				case 3:
+					cb.cb_sources |= ZPROP_SRC_RECEIVED;
+					break;
+				case 4:
 					cb.cb_sources |= ZPROP_SRC_TEMPORARY;
 					break;
-				case 4:
+				case 5:
 					cb.cb_sources |= ZPROP_SRC_NONE;
 					break;
 				default:
@@ -1345,9 +1399,10 @@
 }
 
 /*
- * inherit [-r] <property> <fs|vol> ...
+ * inherit [-rS] <property> <fs|vol> ...
  *
- * 	-r	Recurse over all children
+ *	-r	Recurse over all children
+ *	-S	Revert to received value, if any
  *
  * For each dataset specified on the command line, inherit the given property
  * from its parent.  Inheriting a property at the pool level will cause it to
@@ -1356,11 +1411,16 @@
  * local modifications for each dataset.
  */
 
+typedef struct inherit_cbdata {
+	const char *cb_propname;
+	boolean_t cb_received;
+} inherit_cbdata_t;
+
 static int
 inherit_recurse_cb(zfs_handle_t *zhp, void *data)
 {
-	char *propname = data;
-	zfs_prop_t prop = zfs_name_to_prop(propname);
+	inherit_cbdata_t *cb = data;
+	zfs_prop_t prop = zfs_name_to_prop(cb->cb_propname);
 
 	/*
 	 * If we're doing it recursively, then ignore properties that
@@ -1370,15 +1430,15 @@
 	    !zfs_prop_valid_for_type(prop, zfs_get_type(zhp)))
 		return (0);
 
-	return (zfs_prop_inherit(zhp, propname) != 0);
+	return (zfs_prop_inherit(zhp, cb->cb_propname, cb->cb_received) != 0);
 }
 
 static int
 inherit_cb(zfs_handle_t *zhp, void *data)
 {
-	char *propname = data;
-
-	return (zfs_prop_inherit(zhp, propname) != 0);
+	inherit_cbdata_t *cb = data;
+
+	return (zfs_prop_inherit(zhp, cb->cb_propname, cb->cb_received) != 0);
 }
 
 static int
@@ -1386,16 +1446,21 @@
 {
 	int c;
 	zfs_prop_t prop;
+	inherit_cbdata_t cb = { 0 };
 	char *propname;
 	int ret;
 	int flags = 0;
+	boolean_t received = B_FALSE;
 
 	/* check options */
-	while ((c = getopt(argc, argv, "r")) != -1) {
+	while ((c = getopt(argc, argv, "rS")) != -1) {
 		switch (c) {
 		case 'r':
 			flags |= ZFS_ITER_RECURSE;
 			break;
+		case 'S':
+			received = B_TRUE;
+			break;
 		case '?':
 		default:
 			(void) fprintf(stderr, gettext("invalid option '%c'\n"),
@@ -1428,7 +1493,7 @@
 			    propname);
 			return (1);
 		}
-		if (!zfs_prop_inheritable(prop)) {
+		if (!zfs_prop_inheritable(prop) && !received) {
 			(void) fprintf(stderr, gettext("'%s' property cannot "
 			    "be inherited\n"), propname);
 			if (prop == ZFS_PROP_QUOTA ||
@@ -1445,12 +1510,15 @@
 		usage(B_FALSE);
 	}
 
+	cb.cb_propname = propname;
+	cb.cb_received = received;
+
 	if (flags & ZFS_ITER_RECURSE) {
 		ret = zfs_for_each(argc, argv, flags, ZFS_TYPE_DATASET,
-		    NULL, NULL, 0, inherit_recurse_cb, propname);
+		    NULL, NULL, 0, inherit_recurse_cb, &cb);
 	} else {
 		ret = zfs_for_each(argc, argv, flags, ZFS_TYPE_DATASET,
-		    NULL, NULL, 0, inherit_cb, propname);
+		    NULL, NULL, 0, inherit_cb, &cb);
 	}
 
 	return (ret);
@@ -1775,11 +1843,11 @@
  *      [-s property [-s property]...] [-S property [-S property]...]
  *      <dataset> ...
  *
- * 	-r	Recurse over all children
- * 	-d	Limit recursion by depth.
- * 	-H	Scripted mode; elide headers and separate columns by tabs
- * 	-o	Control which fields to display.
- * 	-t	Control which object types to display.
+ *	-r	Recurse over all children
+ *	-d	Limit recursion by depth.
+ *	-H	Scripted mode; elide headers and separate columns by tabs
+ *	-o	Control which fields to display.
+ *	-t	Control which object types to display.
  *	-s	Specify sort columns, descending order.
  *	-S	Specify sort columns, ascending order.
  *
@@ -2176,9 +2244,9 @@
 /*
  * zfs rollback [-rRf] <snapshot>
  *
- * 	-r	Delete any intervening snapshots before doing rollback
- * 	-R	Delete any snapshots and their clones
- * 	-f	ignored for backwards compatability
+ *	-r	Delete any intervening snapshots before doing rollback
+ *	-R	Delete any snapshots and their clones
+ *	-f	ignored for backwards compatability
  *
  * Given a filesystem, rollback to a specific snapshot, discarding any changes
  * since then and making it the active dataset.  If more recent snapshots exist,
@@ -2488,8 +2556,8 @@
 }
 
 /*
- * zfs send [-vD] -R [-i|-I <@snap>] <fs@snap>
- * zfs send [-vD] [-i|-I <@snap>] <fs@snap>
+ * zfs send [-vDp] -R [-i|-I <@snap>] <fs@snap>
+ * zfs send [-vDp] [-i|-I <@snap>] <fs@snap>
  *
  * Send a backup stream to stdout.
  */
@@ -2504,7 +2572,7 @@
 	int c, err;
 
 	/* check options */
-	while ((c = getopt(argc, argv, ":i:I:RDv")) != -1) {
+	while ((c = getopt(argc, argv, ":i:I:RDpv")) != -1) {
 		switch (c) {
 		case 'i':
 			if (fromname)
@@ -2520,6 +2588,9 @@
 		case 'R':
 			flags.replicate = B_TRUE;
 			break;
+		case 'p':
+			flags.props = B_TRUE;
+			break;
 		case 'v':
 			flags.verbose = B_TRUE;
 			break;
@@ -2620,9 +2691,8 @@
 zfs_do_receive(int argc, char **argv)
 {
 	int c, err;
-	recvflags_t flags;
-
-	bzero(&flags, sizeof (recvflags_t));
+	recvflags_t flags = { 0 };
+
 	/* check options */
 	while ((c = getopt(argc, argv, ":dnuvF")) != -1) {
 		switch (c) {
@@ -2762,7 +2832,7 @@
 /*
  * zfs hold [-r] [-t] <tag> <snap> ...
  *
- * 	-r	Recursively hold
+ *	-r	Recursively hold
  *	-t	Temporary hold (hidden option)
  *
  * Apply a user-hold with the given tag to the list of snapshots.
@@ -2776,7 +2846,7 @@
 /*
  * zfs release [-r] <tag> <snap> ...
  *
- * 	-r	Recursively release
+ *	-r	Recursively release
  *
  * Release a user-hold with the given tag from the list of snapshots.
  */
--- a/usr/src/cmd/zpool/zpool_main.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/cmd/zpool/zpool_main.c	Mon Nov 09 20:45:32 2009 -0800
@@ -3677,6 +3677,7 @@
 		(void) printf(gettext(" 20  Compression using zle "
 		    "(zero-length encoding)\n"));
 		(void) printf(gettext(" 21  Deduplication\n"));
+		(void) printf(gettext(" 22  Received properties\n"));
 		(void) printf(gettext("\nFor more information on a particular "
 		    "version, including supported releases, see:\n\n"));
 		(void) printf("http://www.opensolaris.org/os/community/zfs/"
@@ -3892,7 +3893,8 @@
 			continue;
 
 		zprop_print_one_property(zpool_get_name(zhp), cbp,
-		    zpool_prop_to_name(pl->pl_prop), value, srctype, NULL);
+		    zpool_prop_to_name(pl->pl_prop), value, srctype, NULL,
+		    NULL);
 	}
 	return (0);
 }
--- a/usr/src/cmd/ztest/ztest.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/cmd/ztest/ztest.c	Mon Nov 09 20:45:32 2009 -0800
@@ -815,8 +815,9 @@
 	uint64_t curval;
 	int error;
 
-	error = dsl_prop_set(osname, propname, sizeof (value),
-	    inherit ? 0 : 1, &value);
+	error = dsl_prop_set(osname, propname,
+	    (inherit ? ZPROP_SRC_NONE : ZPROP_SRC_LOCAL),
+	    sizeof (value), 1, &value);
 
 	if (error == ENOSPC) {
 		ztest_record_enospc(FTAG);
--- a/usr/src/common/nvpair/nvpair.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/common/nvpair/nvpair.c	Mon Nov 09 20:45:32 2009 -0800
@@ -20,12 +20,10 @@
  */
 
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
-#pragma ident	"%Z%%M%	%I%	%E% SMI"
-
 #include <sys/stropts.h>
 #include <sys/debug.h>
 #include <sys/isa_defs.h>
@@ -692,6 +690,18 @@
 	return (ENOENT);
 }
 
+int
+nvlist_remove_nvpair(nvlist_t *nvl, nvpair_t *nvp)
+{
+	if (nvl == NULL || nvp == NULL)
+		return (EINVAL);
+
+	nvp_buf_unlink(nvl, nvp);
+	nvpair_free(nvp);
+	nvp_buf_free(nvl, nvp);
+	return (0);
+}
+
 /*
  * This function calculates the size of an nvpair value.
  *
@@ -1162,6 +1172,42 @@
 	return (curr != NULL ? &curr->nvi_nvp : NULL);
 }
 
+nvpair_t *
+nvlist_prev_nvpair(nvlist_t *nvl, nvpair_t *nvp)
+{
+	nvpriv_t *priv;
+	i_nvp_t *curr;
+
+	if (nvl == NULL ||
+	    (priv = (nvpriv_t *)(uintptr_t)nvl->nvl_priv) == NULL)
+		return (NULL);
+
+	curr = NVPAIR2I_NVP(nvp);
+
+	if (nvp == NULL)
+		curr = priv->nvp_last;
+	else if (priv->nvp_curr == curr || nvlist_contains_nvp(nvl, nvp))
+		curr = curr->nvi_prev;
+	else
+		curr = NULL;
+
+	priv->nvp_curr = curr;
+
+	return (curr != NULL ? &curr->nvi_nvp : NULL);
+}
+
+boolean_t
+nvlist_empty(nvlist_t *nvl)
+{
+	nvpriv_t *priv;
+
+	if (nvl == NULL ||
+	    (priv = (nvpriv_t *)(uintptr_t)nvl->nvl_priv) == NULL)
+		return (B_TRUE);
+
+	return (priv->nvp_list == NULL);
+}
+
 char *
 nvpair_name(nvpair_t *nvp)
 {
--- a/usr/src/grub/capability	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/grub/capability	Mon Nov 09 20:45:32 2009 -0800
@@ -40,7 +40,7 @@
 # This file and the associated version are Solaris specific and are
 # not a part of the open source distribution of GRUB.
 #
-VERSION=13
+VERSION=14
 dboot
 xVM
 zfs
--- a/usr/src/grub/grub-0.97/stage2/zfs-include/zfs.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/grub/grub-0.97/stage2/zfs-include/zfs.h	Mon Nov 09 20:45:32 2009 -0800
@@ -27,7 +27,7 @@
 /*
  * On-disk version number.
  */
-#define	SPA_VERSION			21ULL
+#define	SPA_VERSION			22ULL
 
 /*
  * The following are configuration names used in the nvlist describing a pool's
--- a/usr/src/lib/libnvpair/mapfile-vers	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/lib/libnvpair/mapfile-vers	Mon Nov 09 20:45:32 2009 -0800
@@ -40,7 +40,10 @@
 SUNW_1.3 {
     global:
 	nvlist_add_double;
+	nvlist_empty;
 	nvlist_lookup_double;
+	nvlist_prev_nvpair;
+	nvlist_remove_nvpair;
 	nvpair_value_double;
 } SUNW_1.2.2;
 
--- a/usr/src/lib/libzfs/common/libzfs.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/lib/libzfs/common/libzfs.h	Mon Nov 09 20:45:32 2009 -0800
@@ -382,6 +382,8 @@
 extern int zfs_prop_set(zfs_handle_t *, const char *, const char *);
 extern int zfs_prop_get(zfs_handle_t *, zfs_prop_t, char *, size_t,
     zprop_source_t *, char *, size_t, boolean_t);
+extern int zfs_prop_get_recvd(zfs_handle_t *, const char *, char *, size_t,
+    boolean_t);
 extern int zfs_prop_get_numeric(zfs_handle_t *, zfs_prop_t, uint64_t *,
     zprop_source_t *, char *, size_t);
 extern int zfs_prop_get_userquota_int(zfs_handle_t *zhp, const char *propname,
@@ -389,10 +391,11 @@
 extern int zfs_prop_get_userquota(zfs_handle_t *zhp, const char *propname,
     char *propbuf, int proplen, boolean_t literal);
 extern uint64_t zfs_prop_get_int(zfs_handle_t *, zfs_prop_t);
-extern int zfs_prop_inherit(zfs_handle_t *, const char *);
+extern int zfs_prop_inherit(zfs_handle_t *, const char *, boolean_t);
 extern const char *zfs_prop_values(zfs_prop_t);
 extern int zfs_prop_is_string(zfs_prop_t prop);
 extern nvlist_t *zfs_get_user_props(zfs_handle_t *);
+extern nvlist_t *zfs_get_recvd_props(zfs_handle_t *);
 
 typedef struct zprop_list {
 	int		pl_prop;
@@ -400,10 +403,11 @@
 	struct zprop_list *pl_next;
 	boolean_t	pl_all;
 	size_t		pl_width;
+	size_t		pl_recvd_width;
 	boolean_t	pl_fixed;
 } zprop_list_t;
 
-extern int zfs_expand_proplist(zfs_handle_t *, zprop_list_t **);
+extern int zfs_expand_proplist(zfs_handle_t *, zprop_list_t **, boolean_t);
 extern void zfs_prune_proplist(zfs_handle_t *, uint8_t *);
 
 #define	ZFS_MOUNTPOINT_NONE	"none"
@@ -427,13 +431,24 @@
     zfs_type_t);
 extern void zprop_free_list(zprop_list_t *);
 
+#define	ZFS_GET_NCOLS	5
+
+typedef enum {
+	GET_COL_NONE,
+	GET_COL_NAME,
+	GET_COL_PROPERTY,
+	GET_COL_VALUE,
+	GET_COL_RECVD,
+	GET_COL_SOURCE
+} zfs_get_column_t;
+
 /*
  * Functions for printing zfs or zpool properties
  */
 typedef struct zprop_get_cbdata {
 	int cb_sources;
-	int cb_columns[4];
-	int cb_colwidths[5];
+	zfs_get_column_t cb_columns[ZFS_GET_NCOLS];
+	int cb_colwidths[ZFS_GET_NCOLS + 1];
 	boolean_t cb_scripted;
 	boolean_t cb_literal;
 	boolean_t cb_first;
@@ -442,12 +457,8 @@
 } zprop_get_cbdata_t;
 
 void zprop_print_one_property(const char *, zprop_get_cbdata_t *,
-    const char *, const char *, zprop_source_t, const char *);
-
-#define	GET_COL_NAME		1
-#define	GET_COL_PROPERTY	2
-#define	GET_COL_VALUE		3
-#define	GET_COL_SOURCE		4
+    const char *, const char *, zprop_source_t, const char *,
+    const char *);
 
 /*
  * Iterator functions.
@@ -477,17 +488,20 @@
 	/* print informational messages (ie, -v was specified) */
 	int verbose : 1;
 
-	/* recursive send  (i.e., -R) */
+	/* recursive send  (ie, -R) */
 	int replicate : 1;
 
 	/* for incrementals, do all intermediate snapshots */
-	int doall : 1; /* (i.e., -I) */
+	int doall : 1; /* (ie, -I) */
 
 	/* if dataset is a clone, do incremental from its origin */
 	int fromorigin : 1;
 
 	/* do deduplication */
 	int dedup : 1;
+
+	/* send properties (ie, -p) */
+	int props : 1;
 } sendflags_t;
 
 typedef boolean_t (snapfilter_cb_t)(zfs_handle_t *, void *);
--- a/usr/src/lib/libzfs/common/libzfs_dataset.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/lib/libzfs/common/libzfs_dataset.c	Mon Nov 09 20:45:32 2009 -0800
@@ -336,6 +336,44 @@
 	return (0);
 }
 
+/*
+ * Utility function to get the received properties of the given object.
+ */
+static int
+get_recvd_props_ioctl(zfs_handle_t *zhp)
+{
+	libzfs_handle_t *hdl = zhp->zfs_hdl;
+	nvlist_t *recvdprops;
+	zfs_cmd_t zc = { 0 };
+	int err;
+
+	if (zcmd_alloc_dst_nvlist(hdl, &zc, 0) != 0)
+		return (-1);
+
+	(void) strlcpy(zc.zc_name, zhp->zfs_name, sizeof (zc.zc_name));
+
+	while (ioctl(hdl->libzfs_fd, ZFS_IOC_OBJSET_RECVD_PROPS, &zc) != 0) {
+		if (errno == ENOMEM) {
+			if (zcmd_expand_dst_nvlist(hdl, &zc) != 0) {
+				return (-1);
+			}
+		} else {
+			zcmd_free_nvlists(&zc);
+			return (-1);
+		}
+	}
+
+	err = zcmd_read_dst_nvlist(zhp->zfs_hdl, &zc, &recvdprops);
+	zcmd_free_nvlists(&zc);
+	if (err != 0)
+		return (-1);
+
+	nvlist_free(zhp->zfs_recvd_props);
+	zhp->zfs_recvd_props = recvdprops;
+
+	return (0);
+}
+
 static int
 put_stats_zhdl(zfs_handle_t *zhp, zfs_cmd_t *zc)
 {
@@ -522,6 +560,7 @@
 		free(zhp->zfs_mntopts);
 	nvlist_free(zhp->zfs_props);
 	nvlist_free(zhp->zfs_user_props);
+	nvlist_free(zhp->zfs_recvd_props);
 	free(zhp);
 }
 
@@ -1218,6 +1257,82 @@
 	return (NULL);
 }
 
+void
+zfs_setprop_error(libzfs_handle_t *hdl, zfs_prop_t prop, int err,
+    char *errbuf)
+{
+	switch (err) {
+
+	case ENOSPC:
+		/*
+		 * For quotas and reservations, ENOSPC indicates
+		 * something different; setting a quota or reservation
+		 * doesn't use any disk space.
+		 */
+		switch (prop) {
+		case ZFS_PROP_QUOTA:
+		case ZFS_PROP_REFQUOTA:
+			zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+			    "size is less than current used or "
+			    "reserved space"));
+			(void) zfs_error(hdl, EZFS_PROPSPACE, errbuf);
+			break;
+
+		case ZFS_PROP_RESERVATION:
+		case ZFS_PROP_REFRESERVATION:
+			zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+			    "size is greater than available space"));
+			(void) zfs_error(hdl, EZFS_PROPSPACE, errbuf);
+			break;
+
+		default:
+			(void) zfs_standard_error(hdl, err, errbuf);
+			break;
+		}
+		break;
+
+	case EBUSY:
+		(void) zfs_standard_error(hdl, EBUSY, errbuf);
+		break;
+
+	case EROFS:
+		(void) zfs_error(hdl, EZFS_DSREADONLY, errbuf);
+		break;
+
+	case ENOTSUP:
+		zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+		    "pool and or dataset must be upgraded to set this "
+		    "property or value"));
+		(void) zfs_error(hdl, EZFS_BADVERSION, errbuf);
+		break;
+
+	case ERANGE:
+		if (prop == ZFS_PROP_COMPRESSION) {
+			(void) zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+			    "property setting is not allowed on "
+			    "bootable datasets"));
+			(void) zfs_error(hdl, EZFS_NOTSUP, errbuf);
+		} else {
+			(void) zfs_standard_error(hdl, err, errbuf);
+		}
+		break;
+
+	case EOVERFLOW:
+		/*
+		 * This platform can't address a volume this big.
+		 */
+#ifdef _ILP32
+		if (prop == ZFS_PROP_VOLSIZE) {
+			(void) zfs_error(hdl, EZFS_VOLTOOBIG, errbuf);
+			break;
+		}
+#endif
+		/* FALLTHROUGH */
+	default:
+		(void) zfs_standard_error(hdl, err, errbuf);
+	}
+}
+
 /*
  * Given a property name and value, set the property for the given dataset.
  */
@@ -1286,76 +1401,7 @@
 	ret = zfs_ioctl(hdl, ZFS_IOC_SET_PROP, &zc);
 
 	if (ret != 0) {
-		switch (errno) {
-
-		case ENOSPC:
-			/*
-			 * For quotas and reservations, ENOSPC indicates
-			 * something different; setting a quota or reservation
-			 * doesn't use any disk space.
-			 */
-			switch (prop) {
-			case ZFS_PROP_QUOTA:
-			case ZFS_PROP_REFQUOTA:
-				zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
-				    "size is less than current used or "
-				    "reserved space"));
-				(void) zfs_error(hdl, EZFS_PROPSPACE, errbuf);
-				break;
-
-			case ZFS_PROP_RESERVATION:
-			case ZFS_PROP_REFRESERVATION:
-				zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
-				    "size is greater than available space"));
-				(void) zfs_error(hdl, EZFS_PROPSPACE, errbuf);
-				break;
-
-			default:
-				(void) zfs_standard_error(hdl, errno, errbuf);
-				break;
-			}
-			break;
-
-		case EBUSY:
-			(void) zfs_standard_error(hdl, EBUSY, errbuf);
-			break;
-
-		case EROFS:
-			(void) zfs_error(hdl, EZFS_DSREADONLY, errbuf);
-			break;
-
-		case ENOTSUP:
-			zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
-			    "pool and or dataset must be upgraded to set this "
-			    "property or value"));
-			(void) zfs_error(hdl, EZFS_BADVERSION, errbuf);
-			break;
-
-		case ERANGE:
-			if (prop == ZFS_PROP_COMPRESSION) {
-				(void) zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
-				    "property setting is not allowed on "
-				    "bootable datasets"));
-				(void) zfs_error(hdl, EZFS_NOTSUP, errbuf);
-			} else {
-				(void) zfs_standard_error(hdl, errno, errbuf);
-			}
-			break;
-
-		case EOVERFLOW:
-			/*
-			 * This platform can't address a volume this big.
-			 */
-#ifdef _ILP32
-			if (prop == ZFS_PROP_VOLSIZE) {
-				(void) zfs_error(hdl, EZFS_VOLTOOBIG, errbuf);
-				break;
-			}
-#endif
-			/* FALLTHROUGH */
-		default:
-			(void) zfs_standard_error(hdl, errno, errbuf);
-		}
+		zfs_setprop_error(hdl, prop, errno, errbuf);
 	} else {
 		if (do_prefix)
 			ret = changelist_postfix(cl);
@@ -1377,10 +1423,11 @@
 }
 
 /*
- * Given a property, inherit the value from the parent dataset.
+ * Given a property, inherit the value from the parent dataset, or if received
+ * is TRUE, revert to the received value, if any.
  */
 int
-zfs_prop_inherit(zfs_handle_t *zhp, const char *propname)
+zfs_prop_inherit(zfs_handle_t *zhp, const char *propname, boolean_t received)
 {
 	zfs_cmd_t zc = { 0 };
 	int ret;
@@ -1392,6 +1439,7 @@
 	(void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
 	    "cannot inherit %s for '%s'"), propname, zhp->zfs_name);
 
+	zc.zc_cookie = received;
 	if ((prop = zfs_name_to_prop(propname)) == ZPROP_INVAL) {
 		/*
 		 * For user properties, the amount of work we have to do is very
@@ -1418,7 +1466,7 @@
 	if (zfs_prop_readonly(prop))
 		return (zfs_error(hdl, EZFS_PROPREADONLY, errbuf));
 
-	if (!zfs_prop_inheritable(prop))
+	if (!zfs_prop_inheritable(prop) && !received)
 		return (zfs_error(hdl, EZFS_PROPNONINHERIT, errbuf));
 
 	/*
@@ -1523,6 +1571,26 @@
 	return (value);
 }
 
+static boolean_t
+zfs_is_recvd_props_mode(zfs_handle_t *zhp)
+{
+	return (zhp->zfs_props == zhp->zfs_recvd_props);
+}
+
+static void
+zfs_set_recvd_props_mode(zfs_handle_t *zhp, uint64_t *cookie)
+{
+	*cookie = (uint64_t)(uintptr_t)zhp->zfs_props;
+	zhp->zfs_props = zhp->zfs_recvd_props;
+}
+
+static void
+zfs_unset_recvd_props_mode(zfs_handle_t *zhp, uint64_t *cookie)
+{
+	zhp->zfs_props = (nvlist_t *)(uintptr_t)*cookie;
+	*cookie = 0;
+}
+
 /*
  * Internal function for getting a numeric property.  Both zfs_prop_get() and
  * zfs_prop_get_int() are built using this interface.
@@ -1541,6 +1609,7 @@
 	struct mnttab mnt;
 	char *mntopt_on = NULL;
 	char *mntopt_off = NULL;
+	boolean_t received = zfs_is_recvd_props_mode(zhp);
 
 	*source = NULL;
 
@@ -1616,6 +1685,9 @@
 	case ZFS_PROP_NBMAND:
 		*val = getprop_uint64(zhp, prop, source);
 
+		if (received)
+			break;
+
 		if (hasmntopt(&mnt, mntopt_on) && !*val) {
 			*val = B_TRUE;
 			if (src)
@@ -1628,22 +1700,16 @@
 		break;
 
 	case ZFS_PROP_CANMOUNT:
-		*val = getprop_uint64(zhp, prop, source);
-		if (*val != ZFS_CANMOUNT_ON)
-			*source = zhp->zfs_name;
-		else
-			*source = "";	/* default */
-		break;
-
 	case ZFS_PROP_QUOTA:
 	case ZFS_PROP_REFQUOTA:
 	case ZFS_PROP_RESERVATION:
 	case ZFS_PROP_REFRESERVATION:
 		*val = getprop_uint64(zhp, prop, source);
-		if (*val == 0)
-			*source = "";	/* default */
-		else
+
+		if (*source == NULL) {
+			/* not default, must be local */
 			*source = zhp->zfs_name;
+		}
 		break;
 
 	case ZFS_PROP_MOUNTED:
@@ -1719,6 +1785,8 @@
 		*srctype = ZPROP_SRC_NONE;
 	} else if (source[0] == '\0') {
 		*srctype = ZPROP_SRC_DEFAULT;
+	} else if (strstr(source, ZPROP_SOURCE_VAL_RECVD) != NULL) {
+		*srctype = ZPROP_SRC_RECEIVED;
 	} else {
 		if (strcmp(source, zhp->zfs_name) == 0) {
 			*srctype = ZPROP_SRC_LOCAL;
@@ -1730,6 +1798,43 @@
 
 }
 
+int
+zfs_prop_get_recvd(zfs_handle_t *zhp, const char *propname, char *propbuf,
+    size_t proplen, boolean_t literal)
+{
+	zfs_prop_t prop;
+	int err = 0;
+
+	if (zhp->zfs_recvd_props == NULL)
+		if (get_recvd_props_ioctl(zhp) != 0)
+			return (-1);
+
+	prop = zfs_name_to_prop(propname);
+
+	if (prop != ZPROP_INVAL) {
+		uint64_t cookie;
+		if (!nvlist_exists(zhp->zfs_recvd_props, propname))
+			return (-1);
+		zfs_set_recvd_props_mode(zhp, &cookie);
+		err = zfs_prop_get(zhp, prop, propbuf, proplen,
+		    NULL, NULL, 0, literal);
+		zfs_unset_recvd_props_mode(zhp, &cookie);
+	} else if (zfs_prop_userquota(propname)) {
+		return (-1);
+	} else {
+		nvlist_t *propval;
+		char *recvdval;
+		if (nvlist_lookup_nvlist(zhp->zfs_recvd_props,
+		    propname, &propval) != 0)
+			return (-1);
+		verify(nvlist_lookup_string(propval, ZPROP_VALUE,
+		    &recvdval) == 0);
+		(void) strlcpy(propbuf, recvdval, proplen);
+	}
+
+	return (err == 0 ? 0 : -1);
+}
+
 /*
  * Retrieve a property from the given object.  If 'literal' is specified, then
  * numbers are left as exact values.  Otherwise, numbers are converted to a
@@ -1745,6 +1850,7 @@
 	uint64_t val;
 	char *str;
 	const char *strval;
+	boolean_t received = zfs_is_recvd_props_mode(zhp);
 
 	/*
 	 * Check to see if this property applies to our object
@@ -1752,6 +1858,9 @@
 	if (!zfs_prop_valid_for_type(prop, zhp->zfs_type))
 		return (-1);
 
+	if (received && zfs_prop_readonly(prop))
+		return (-1);
+
 	if (src)
 		*src = ZPROP_SRC_NONE;
 
@@ -3478,6 +3587,15 @@
 	return (zhp->zfs_user_props);
 }
 
+nvlist_t *
+zfs_get_recvd_props(zfs_handle_t *zhp)
+{
+	if (zhp->zfs_recvd_props == NULL)
+		if (get_recvd_props_ioctl(zhp) != 0)
+			return (NULL);
+	return (zhp->zfs_recvd_props);
+}
+
 /*
  * This function is used by 'zfs list' to determine the exact set of columns to
  * display, and their maximum widths.  This does two main things:
@@ -3487,10 +3605,12 @@
  *        for new unique user properties and add them to the list.
  *
  *      - For non fixed-width properties, keep track of the maximum width seen
- *        so that we can size the column appropriately.
+ *        so that we can size the column appropriately. If the user has
+ *        requested received property values, we also need to compute the width
+ *        of the RECEIVED column.
  */
 int
-zfs_expand_proplist(zfs_handle_t *zhp, zprop_list_t **plp)
+zfs_expand_proplist(zfs_handle_t *zhp, zprop_list_t **plp, boolean_t received)
 {
 	libzfs_handle_t *hdl = zhp->zfs_hdl;
 	zprop_list_t *entry;
@@ -3561,12 +3681,24 @@
 				if (strlen(buf) > entry->pl_width)
 					entry->pl_width = strlen(buf);
 			}
-		} else if (nvlist_lookup_nvlist(userprops,
-		    entry->pl_user_prop, &propval)  == 0) {
-			verify(nvlist_lookup_string(propval,
-			    ZPROP_VALUE, &strval) == 0);
-			if (strlen(strval) > entry->pl_width)
-				entry->pl_width = strlen(strval);
+			if (received && zfs_prop_get_recvd(zhp,
+			    zfs_prop_to_name(entry->pl_prop),
+			    buf, sizeof (buf), B_FALSE) == 0)
+				if (strlen(buf) > entry->pl_recvd_width)
+					entry->pl_recvd_width = strlen(buf);
+		} else {
+			if (nvlist_lookup_nvlist(userprops, entry->pl_user_prop,
+			    &propval) == 0) {
+				verify(nvlist_lookup_string(propval,
+				    ZPROP_VALUE, &strval) == 0);
+				if (strlen(strval) > entry->pl_width)
+					entry->pl_width = strlen(strval);
+			}
+			if (received && zfs_prop_get_recvd(zhp,
+			    entry->pl_user_prop,
+			    buf, sizeof (buf), B_FALSE) == 0)
+				if (strlen(buf) > entry->pl_recvd_width)
+					entry->pl_recvd_width = strlen(buf);
 		}
 	}
 
--- a/usr/src/lib/libzfs/common/libzfs_impl.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/lib/libzfs/common/libzfs_impl.h	Mon Nov 09 20:45:32 2009 -0800
@@ -92,6 +92,7 @@
 	dmu_objset_stats_t zfs_dmustats;
 	nvlist_t *zfs_props;
 	nvlist_t *zfs_user_props;
+	nvlist_t *zfs_recvd_props;
 	boolean_t zfs_mntcheck;
 	char *zfs_mntopts;
 	uint8_t *zfs_props_table;
--- a/usr/src/lib/libzfs/common/libzfs_sendrecv.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/lib/libzfs/common/libzfs_sendrecv.c	Mon Nov 09 20:45:32 2009 -0800
@@ -46,6 +46,9 @@
 #include "libzfs_impl.h"
 #include <sha2.h>
 
+/* in libzfs_dataset.c */
+extern void zfs_setprop_error(libzfs_handle_t *, zfs_prop_t, int, char *);
+
 static int zfs_receive_impl(libzfs_handle_t *, const char *, recvflags_t,
     int, avl_tree_t *, char **);
 
@@ -510,6 +513,7 @@
 	nvlist_t *snapprops;
 	const char *fromsnap;
 	const char *tosnap;
+	boolean_t recursive;
 
 	/*
 	 * The header nvlist is of the following format:
@@ -597,18 +601,30 @@
 		if (prop == ZFS_PROP_QUOTA || prop == ZFS_PROP_RESERVATION ||
 		    prop == ZFS_PROP_REFQUOTA ||
 		    prop == ZFS_PROP_REFRESERVATION) {
-			/* these guys are modifyable, but have no source */
+			char *source;
 			uint64_t value;
 			verify(nvlist_lookup_uint64(propnv,
 			    ZPROP_VALUE, &value) == 0);
 			if (zhp->zfs_type == ZFS_TYPE_SNAPSHOT)
 				continue;
+			/*
+			 * May have no source before SPA_VERSION_RECVD_PROPS,
+			 * but is still modifiable.
+			 */
+			if (nvlist_lookup_string(propnv,
+			    ZPROP_SOURCE, &source) == 0) {
+				if ((strcmp(source, zhp->zfs_name) != 0) &&
+				    (strcmp(source,
+				    ZPROP_SOURCE_VAL_RECVD) != 0))
+					continue;
+			}
 		} else {
 			char *source;
 			if (nvlist_lookup_string(propnv,
 			    ZPROP_SOURCE, &source) != 0)
 				continue;
-			if (strcmp(source, zhp->zfs_name) != 0)
+			if ((strcmp(source, zhp->zfs_name) != 0) &&
+			    (strcmp(source, ZPROP_SOURCE_VAL_RECVD) != 0))
 				continue;
 		}
 
@@ -637,7 +653,7 @@
 {
 	send_data_t *sd = arg;
 	nvlist_t *nvfs, *nv;
-	int rv;
+	int rv = 0;
 	uint64_t parent_fromsnap_guid_save = sd->parent_fromsnap_guid;
 	uint64_t guid = zhp->zfs_dmustats.dds_guid;
 	char guidstring[64];
@@ -679,7 +695,8 @@
 	nvlist_free(nvfs);
 
 	/* iterate over children */
-	rv = zfs_iter_filesystems(zhp, send_iterate_fs, sd);
+	if (sd->recursive)
+		rv = zfs_iter_filesystems(zhp, send_iterate_fs, sd);
 
 	sd->parent_fromsnap_guid = parent_fromsnap_guid_save;
 
@@ -689,7 +706,7 @@
 
 static int
 gather_nvlist(libzfs_handle_t *hdl, const char *fsname, const char *fromsnap,
-    const char *tosnap, nvlist_t **nvlp, avl_tree_t **avlp)
+    const char *tosnap, boolean_t recursive, nvlist_t **nvlp, avl_tree_t **avlp)
 {
 	zfs_handle_t *zhp;
 	send_data_t sd = { 0 };
@@ -702,6 +719,7 @@
 	VERIFY(0 == nvlist_alloc(&sd.fss, NV_UNIQUE_NAME, 0));
 	sd.fromsnap = fromsnap;
 	sd.tosnap = tosnap;
+	sd.recursive = recursive;
 
 	if ((error = send_iterate_fs(zhp, &sd)) != 0) {
 		nvlist_free(sd.fss);
@@ -1045,12 +1063,12 @@
 		origin_nv = fsavl_find(sdd->fsavl, origin_guid, NULL);
 		if (origin_nv &&
 		    nvlist_lookup_boolean(origin_nv, "sent") == ENOENT) {
-				/*
-				 * origin has not been sent yet;
-				 * skip this clone.
-				 */
-				needagain = B_TRUE;
-				continue;
+			/*
+			 * origin has not been sent yet;
+			 * skip this clone.
+			 */
+			needagain = B_TRUE;
+			continue;
 		}
 
 		zhp = zfs_open(rzhp->zfs_hdl, fsname, ZFS_TYPE_DATASET);
@@ -1084,7 +1102,7 @@
  * uses a special header (with a hdrtype field of DMU_COMPOUNDSTREAM)
  * if "replicate" is set.  If "doall" is set, dump all the intermediate
  * snapshots. The DMU_COMPOUNDSTREAM header is used in the "doall"
- * case too.
+ * case too. If "props" is set, send properties.
  */
 int
 zfs_send(zfs_handle_t *zhp, const char *fromsnap, const char *tosnap,
@@ -1137,14 +1155,12 @@
 		}
 	}
 
-	if (flags.replicate || flags.doall) {
+	if (flags.replicate || flags.doall || flags.props) {
 		dmu_replay_record_t drr = { 0 };
 		char *packbuf = NULL;
 		size_t buflen = 0;
 		zio_cksum_t zc = { 0 };
 
-		assert(fromsnap || flags.doall);
-
 		if (holdsnaps) {
 			(void) snprintf(holdtag, sizeof (holdtag),
 			    ".send-%d-%llu", getpid(), (u_longlong_t)holdseq);
@@ -1155,8 +1171,7 @@
 				goto err_out;
 		}
 
-
-		if (flags.replicate) {
+		if (flags.replicate || flags.props) {
 			nvlist_t *hdrnv;
 
 			VERIFY(0 == nvlist_alloc(&hdrnv, NV_UNIQUE_NAME, 0));
@@ -1165,9 +1180,13 @@
 				    "fromsnap", fromsnap));
 			}
 			VERIFY(0 == nvlist_add_string(hdrnv, "tosnap", tosnap));
+			if (!flags.replicate) {
+				VERIFY(0 == nvlist_add_boolean(hdrnv,
+				    "not_recursive"));
+			}
 
 			err = gather_nvlist(zhp->zfs_hdl, zhp->zfs_name,
-			    fromsnap, tosnap, &fss, &fsavl);
+			    fromsnap, tosnap, flags.replicate, &fss, &fsavl);
 			if (err) {
 				if (holdsnaps) {
 					(void) zfs_release_range(zhp, fromsnap,
@@ -1204,7 +1223,7 @@
 		err = cksum_and_write(&drr, sizeof (drr), &zc, outfd);
 
 		/* write header nvlist */
-		if (err != -1 && flags.replicate) {
+		if (err != -1 && packbuf != NULL) {
 			err = cksum_and_write(packbuf, buflen, &zc, outfd);
 		}
 		free(packbuf);
@@ -1261,7 +1280,8 @@
 		(void) close(pipefd[0]);
 		(void) pthread_join(tid, NULL);
 	}
-	if (flags.replicate || flags.doall) {
+
+	if (flags.replicate || flags.doall || flags.props) {
 		/*
 		 * write final end record.  NB: want to do this even if
 		 * there was some error, because it might not be totally
@@ -1471,12 +1491,13 @@
 	changelist_free(clp);
 
 	/*
-	 * Deferred destroy should always succeed. Since we can't tell
-	 * if it destroyed the dataset or just marked it for deferred
-	 * destroy, always do the rename just in case.
+	 * Deferred destroy might destroy the snapshot or only mark it to be
+	 * destroyed later, and it returns success in either case.
 	 */
-	if (err != 0 || defer)
+	if (err != 0 || (defer && zfs_dataset_exists(hdl, name,
+	    ZFS_TYPE_SNAPSHOT))) {
 		err = recv_rename(hdl, name, NULL, baselen, newname, flags);
+	}
 
 	return (err);
 }
@@ -1592,12 +1613,15 @@
 	char *tosnap, *fromsnap;
 	char newname[ZFS_MAXNAMELEN];
 	int error;
-	boolean_t needagain, progress;
+	boolean_t needagain, progress, recursive;
 	char *s1, *s2;
 
 	VERIFY(0 == nvlist_lookup_string(stream_nv, "fromsnap", &fromsnap));
 	VERIFY(0 == nvlist_lookup_string(stream_nv, "tosnap", &tosnap));
 
+	recursive = (nvlist_lookup_boolean(stream_nv, "not_recursive") ==
+	    ENOENT);
+
 	if (flags.dryrun)
 		return (0);
 
@@ -1605,7 +1629,7 @@
 	needagain = progress = B_FALSE;
 
 	if ((error = gather_nvlist(hdl, tofs, fromsnap, NULL,
-	    &local_nv, &local_avl)) != 0)
+	    recursive, &local_nv, &local_avl)) != 0)
 		return (error);
 
 	/*
@@ -1728,7 +1752,7 @@
 			    stream_snapname, &props)) {
 				zfs_cmd_t zc = { 0 };
 
-				zc.zc_cookie = B_TRUE; /* clear current props */
+				zc.zc_cookie = B_TRUE; /* received */
 				(void) snprintf(zc.zc_name, sizeof (zc.zc_name),
 				    "%s@%s", fsname, nvpair_name(snapelem));
 				if (zcmd_write_src_nvlist(hdl, &zc,
@@ -1877,12 +1901,7 @@
 	 * Read in the nvlist from the stream.
 	 */
 	if (drr->drr_payloadlen != 0) {
-		if (!flags.isprefix) {
-			zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
-			    "must use -d to receive replication "
-			    "(send -R) stream"));
-			return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
-		}
+		boolean_t recursive;
 
 		error = recv_read_nvlist(hdl, fd, drr->drr_payloadlen,
 		    &stream_nv, flags.byteswap, zc);
@@ -1890,6 +1909,16 @@
 			error = zfs_error(hdl, EZFS_BADSTREAM, errbuf);
 			goto out;
 		}
+
+		recursive = (nvlist_lookup_boolean(stream_nv,
+		    "not_recursive") == ENOENT);
+
+		if (recursive && !flags.isprefix) {
+			zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+			    "must use -d to receive replication "
+			    "(send -R) stream"));
+			return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
+		}
 	}
 
 	/*
@@ -1989,6 +2018,19 @@
 	return (error);
 }
 
+static void
+trunc_prop_errs(int truncated)
+{
+	ASSERT(truncated != 0);
+
+	if (truncated == 1)
+		(void) fprintf(stderr, dgettext(TEXT_DOMAIN,
+		    "1 more property could not be set\n"));
+	else
+		(void) fprintf(stderr, dgettext(TEXT_DOMAIN,
+		    "%d more properties could not be set\n"), truncated);
+}
+
 static int
 recv_skip(libzfs_handle_t *hdl, int fd, boolean_t byteswap)
 {
@@ -2072,12 +2114,14 @@
 	char *cp;
 	struct drr_begin *drrb = &drr->drr_u.drr_begin;
 	char errbuf[1024];
+	char prop_errbuf[1024];
 	char chopprefix[ZFS_MAXNAMELEN];
 	boolean_t newfs = B_FALSE;
 	boolean_t stream_wantsnewfs;
 	uint64_t parent_snapguid = 0;
 	prop_changelist_t *clp = NULL;
 	nvlist_t *snapprops_nvlist = NULL;
+	zprop_errflags_t prop_errflags;
 
 	begin_time = time(NULL);
 
@@ -2337,14 +2381,52 @@
 		return (recv_skip(hdl, infd, flags.byteswap));
 	}
 
+	zc.zc_nvlist_dst = (uint64_t)(uintptr_t)prop_errbuf;
+	zc.zc_nvlist_dst_size = sizeof (prop_errbuf);
+
 	err = ioctl_err = zfs_ioctl(hdl, ZFS_IOC_RECV, &zc);
 	ioctl_errno = errno;
+	prop_errflags = (zprop_errflags_t)zc.zc_obj;
+
+	if (err == 0) {
+		nvlist_t *prop_errors;
+		VERIFY(0 == nvlist_unpack((void *)(uintptr_t)zc.zc_nvlist_dst,
+		    zc.zc_nvlist_dst_size, &prop_errors, 0));
+
+		nvpair_t *prop_err = NULL;
+
+		while ((prop_err = nvlist_next_nvpair(prop_errors,
+		    prop_err)) != NULL) {
+			char tbuf[1024];
+			zfs_prop_t prop;
+			int intval;
+
+			prop = zfs_name_to_prop(nvpair_name(prop_err));
+			(void) nvpair_value_int32(prop_err, &intval);
+			if (strcmp(nvpair_name(prop_err),
+			    ZPROP_N_MORE_ERRORS) == 0) {
+				trunc_prop_errs(intval);
+				break;
+			} else {
+				(void) snprintf(tbuf, sizeof (tbuf),
+				    dgettext(TEXT_DOMAIN,
+				    "cannot receive %s property on %s"),
+				    nvpair_name(prop_err), zc.zc_name);
+				zfs_setprop_error(hdl, prop, intval, tbuf);
+			}
+		}
+		nvlist_free(prop_errors);
+	}
+
+	zc.zc_nvlist_dst = 0;
+	zc.zc_nvlist_dst_size = 0;
 	zcmd_free_nvlists(&zc);
 
 	if (err == 0 && snapprops_nvlist) {
 		zfs_cmd_t zc2 = { 0 };
 
 		(void) strcpy(zc2.zc_name, zc.zc_value);
+		zc2.zc_cookie = B_TRUE; /* received */
 		if (zcmd_write_src_nvlist(hdl, &zc2, snapprops_nvlist) == 0) {
 			(void) zfs_ioctl(hdl, ZFS_IOC_SET_PROP, &zc2);
 			zcmd_free_nvlists(&zc2);
@@ -2367,7 +2449,7 @@
 		 * get a strange "does not exist" error message.
 		 */
 		*cp = '\0';
-		if (gather_nvlist(hdl, zc.zc_value, NULL, NULL,
+		if (gather_nvlist(hdl, zc.zc_value, NULL, NULL, B_FALSE,
 		    &local_nv, &local_avl) == 0) {
 			*cp = '@';
 			fs = fsavl_find(local_avl, drrb->drr_toguid, NULL);
@@ -2379,14 +2461,13 @@
 					(void) printf("snap %s already exists; "
 					    "ignoring\n", zc.zc_value);
 				}
-				ioctl_err = recv_skip(hdl, infd,
+				err = ioctl_err = recv_skip(hdl, infd,
 				    flags.byteswap);
 			}
 		}
 		*cp = '@';
 	}
 
-
 	if (ioctl_err != 0) {
 		switch (ioctl_errno) {
 		case ENODEV:
@@ -2463,6 +2544,19 @@
 		changelist_free(clp);
 	}
 
+	if (prop_errflags & ZPROP_ERR_NOCLEAR) {
+		(void) fprintf(stderr, dgettext(TEXT_DOMAIN, "Warning: "
+		    "failed to clear unreceived properties on %s"),
+		    zc.zc_name);
+		(void) fprintf(stderr, "\n");
+	}
+	if (prop_errflags & ZPROP_ERR_NORESTORE) {
+		(void) fprintf(stderr, dgettext(TEXT_DOMAIN, "Warning: "
+		    "failed to restore original properties on %s"),
+		    zc.zc_name);
+		(void) fprintf(stderr, "\n");
+	}
+
 	if (err || ioctl_err)
 		return (-1);
 
--- a/usr/src/lib/libzfs/common/libzfs_util.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/lib/libzfs/common/libzfs_util.c	Mon Nov 09 20:45:32 2009 -0800
@@ -380,7 +380,7 @@
 		zfs_verror(hdl, EZFS_POOLUNAVAIL, fmt, ap);
 		break;
 	default:
-		zfs_error_aux(hdl, strerror(errno));
+		zfs_error_aux(hdl, strerror(error));
 		zfs_verror(hdl, EZFS_UNKNOWN, fmt, ap);
 		break;
 	}
@@ -815,6 +815,8 @@
 	    "PROPERTY"));
 	cbp->cb_colwidths[GET_COL_VALUE] = strlen(dgettext(TEXT_DOMAIN,
 	    "VALUE"));
+	cbp->cb_colwidths[GET_COL_RECVD] = strlen(dgettext(TEXT_DOMAIN,
+	    "RECEIVED"));
 	cbp->cb_colwidths[GET_COL_SOURCE] = strlen(dgettext(TEXT_DOMAIN,
 	    "SOURCE"));
 
@@ -828,7 +830,7 @@
 	 * inheriting from the longest name.  This is acceptable because in the
 	 * majority of cases 'SOURCE' is the last column displayed, and we don't
 	 * use the width anyway.  Note that the 'VALUE' column can be oversized,
-	 * if the name of the property is much longer the any values we find.
+	 * if the name of the property is much longer than any values we find.
 	 */
 	for (pl = cbp->cb_proplist; pl != NULL; pl = pl->pl_next) {
 		/*
@@ -859,6 +861,11 @@
 		    pl->pl_width > cbp->cb_colwidths[GET_COL_VALUE])
 			cbp->cb_colwidths[GET_COL_VALUE] = pl->pl_width;
 
+		/* 'RECEIVED' column. */
+		if (pl != cbp->cb_proplist &&
+		    pl->pl_recvd_width > cbp->cb_colwidths[GET_COL_RECVD])
+			cbp->cb_colwidths[GET_COL_RECVD] = pl->pl_recvd_width;
+
 		/*
 		 * 'NAME' and 'SOURCE' columns
 		 */
@@ -874,7 +881,7 @@
 	/*
 	 * Now go through and print the headers.
 	 */
-	for (i = 0; i < 4; i++) {
+	for (i = 0; i < ZFS_GET_NCOLS; i++) {
 		switch (cbp->cb_columns[i]) {
 		case GET_COL_NAME:
 			title = dgettext(TEXT_DOMAIN, "NAME");
@@ -885,6 +892,9 @@
 		case GET_COL_VALUE:
 			title = dgettext(TEXT_DOMAIN, "VALUE");
 			break;
+		case GET_COL_RECVD:
+			title = dgettext(TEXT_DOMAIN, "RECEIVED");
+			break;
 		case GET_COL_SOURCE:
 			title = dgettext(TEXT_DOMAIN, "SOURCE");
 			break;
@@ -893,7 +903,8 @@
 		}
 
 		if (title != NULL) {
-			if (i == 3 || cbp->cb_columns[i + 1] == 0)
+			if (i == (ZFS_GET_NCOLS - 1) ||
+			    cbp->cb_columns[i + 1] == GET_COL_NONE)
 				(void) printf("%s", title);
 			else
 				(void) printf("%-*s  ",
@@ -911,7 +922,7 @@
 void
 zprop_print_one_property(const char *name, zprop_get_cbdata_t *cbp,
     const char *propname, const char *value, zprop_source_t sourcetype,
-    const char *source)
+    const char *source, const char *recvd_value)
 {
 	int i;
 	const char *str;
@@ -926,7 +937,7 @@
 	if (cbp->cb_first)
 		zprop_print_headers(cbp, cbp->cb_type);
 
-	for (i = 0; i < 4; i++) {
+	for (i = 0; i < ZFS_GET_NCOLS; i++) {
 		switch (cbp->cb_columns[i]) {
 		case GET_COL_NAME:
 			str = name;
@@ -963,14 +974,21 @@
 				    "inherited from %s", source);
 				str = buf;
 				break;
+			case ZPROP_SRC_RECEIVED:
+				str = "received";
+				break;
 			}
 			break;
 
+		case GET_COL_RECVD:
+			str = (recvd_value == NULL ? "-" : recvd_value);
+			break;
+
 		default:
 			continue;
 		}
 
-		if (cbp->cb_columns[i + 1] == 0)
+		if (cbp->cb_columns[i + 1] == GET_COL_NONE)
 			(void) printf("%s", str);
 		else if (cbp->cb_scripted)
 			(void) printf("%s\t", str);
@@ -978,7 +996,6 @@
 			(void) printf("%-*s  ",
 			    cbp->cb_colwidths[cbp->cb_columns[i]],
 			    str);
-
 	}
 
 	(void) printf("\n");
--- a/usr/src/lib/libzfs/common/mapfile-vers	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/lib/libzfs/common/mapfile-vers	Mon Nov 09 20:45:32 2009 -0800
@@ -102,6 +102,7 @@
 	zfs_prop_get;
 	zfs_prop_get_int;
 	zfs_prop_get_numeric;
+	zfs_prop_get_recvd;
 	zfs_prop_get_table;
 	zfs_prop_get_userquota_int;
 	zfs_prop_get_userquota;
--- a/usr/src/lib/libzpool/common/sys/zfs_context.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/lib/libzpool/common/sys/zfs_context.h	Mon Nov 09 20:45:32 2009 -0800
@@ -75,6 +75,7 @@
 #include <sys/u8_textprep.h>
 #include <sys/sysevent/eventdefs.h>
 #include <sys/sysevent/dev.h>
+#include <sys/sunddi.h>
 
 /*
  * Debugging
--- a/usr/src/uts/common/fs/zfs/dmu_objset.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/dmu_objset.c	Mon Nov 09 20:45:32 2009 -0800
@@ -769,8 +769,12 @@
 
 	dsl_dataset_snapshot_sync(ds, sn->snapname, cr, tx);
 
-	if (sn->props)
-		dsl_props_set_sync(ds->ds_prev, sn->props, cr, tx);
+	if (sn->props) {
+		dsl_props_arg_t pa;
+		pa.pa_props = sn->props;
+		pa.pa_source = ZPROP_SRC_LOCAL;
+		dsl_props_set_sync(ds->ds_prev, &pa, cr, tx);
+	}
 }
 
 static int
--- a/usr/src/uts/common/fs/zfs/dmu_send.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/dmu_send.c	Mon Nov 09 20:45:32 2009 -0800
@@ -33,6 +33,7 @@
 #include <sys/dmu_traverse.h>
 #include <sys/dsl_dataset.h>
 #include <sys/dsl_dir.h>
+#include <sys/dsl_prop.h>
 #include <sys/dsl_pool.h>
 #include <sys/dsl_synctask.h>
 #include <sys/zfs_ioctl.h>
@@ -514,8 +515,34 @@
 		/* if incremental, most recent snapshot must match fromguid */
 		if (ds->ds_prev == NULL)
 			return (ENODEV);
-		if (ds->ds_prev->ds_phys->ds_guid != rbsa->fromguid)
-			return (ENODEV);
+
+		/*
+		 * most recent snapshot must match fromguid, or there are no
+		 * changes since the fromguid one
+		 */
+		if (ds->ds_prev->ds_phys->ds_guid != rbsa->fromguid) {
+			uint64_t birth = ds->ds_prev->ds_phys->ds_bp.blk_birth;
+			uint64_t obj = ds->ds_prev->ds_phys->ds_prev_snap_obj;
+			while (obj != 0) {
+				dsl_dataset_t *snap;
+				err = dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
+				    obj, FTAG, &snap);
+				if (err)
+					return (ENODEV);
+				if (snap->ds_phys->ds_creation_txg < birth) {
+					dsl_dataset_rele(snap, FTAG);
+					return (ENODEV);
+				}
+				if (snap->ds_phys->ds_guid == rbsa->fromguid) {
+					dsl_dataset_rele(snap, FTAG);
+					break; /* it's ok */
+				}
+				obj = snap->ds_phys->ds_prev_snap_obj;
+				dsl_dataset_rele(snap, FTAG);
+			}
+			if (obj == 0)
+				return (ENODEV);
+		}
 	} else {
 		/* if full, most recent snapshot must be $ORIGIN */
 		if (ds->ds_phys->ds_prev_snap_txg >= TXG_INITIAL)
@@ -566,10 +593,6 @@
 		    cds, dsl_dataset_get_blkptr(cds), rbsa->type, tx);
 	}
 
-	/* copy the refquota from the target fs to the clone */
-	if (ohds->ds_quota > 0)
-		dsl_dataset_set_quota_sync(cds, &ohds->ds_quota, cr, tx);
-
 	rbsa->ds = cds;
 
 	spa_history_internal_log(LOG_DS_REPLAY_INC_SYNC,
--- a/usr/src/uts/common/fs/zfs/dsl_dataset.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/dsl_dataset.c	Mon Nov 09 20:45:32 2009 -0800
@@ -38,7 +38,6 @@
 #include <sys/zfs_ioctl.h>
 #include <sys/spa.h>
 #include <sys/zfs_znode.h>
-#include <sys/sunddi.h>
 #include <sys/zvol.h>
 
 static char *dsl_reaper = "the grim reaper";
@@ -1005,7 +1004,8 @@
 	objset_t *os;
 	dsl_dir_t *dd;
 	uint64_t obj;
-	struct dsl_ds_destroyarg dsda = {0};
+	struct dsl_ds_destroyarg dsda = { 0 };
+	dsl_dataset_t dummy_ds = { 0 };
 
 	dsda.ds = ds;
 
@@ -1029,6 +1029,8 @@
 	}
 
 	dd = ds->ds_dir;
+	dummy_ds.ds_dir = dd;
+	dummy_ds.ds_object = ds->ds_object;
 
 	/*
 	 * Check for errors and mark this ds as inconsistent, in
@@ -1123,7 +1125,7 @@
 		dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
 		    dsl_dataset_destroy_sync, &dsda, tag, 0);
 		dsl_sync_task_create(dstg, dsl_dir_destroy_check,
-		    dsl_dir_destroy_sync, dd, FTAG, 0);
+		    dsl_dir_destroy_sync, &dummy_ds, FTAG, 0);
 		err = dsl_sync_task_group_wait(dstg);
 		dsl_sync_task_group_destroy(dstg);
 
@@ -1524,8 +1526,15 @@
 
 	/* Remove our reservation */
 	if (ds->ds_reserved != 0) {
-		uint64_t val = 0;
-		dsl_dataset_set_reservation_sync(ds, &val, cr, tx);
+		dsl_prop_setarg_t psa;
+		uint64_t value = 0;
+
+		dsl_prop_setarg_init_uint64(&psa, "refreservation",
+		    (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
+		    &value);
+		psa.psa_effective_value = 0;	/* predict default value */
+
+		dsl_dataset_set_reservation_sync(ds, &psa, cr, tx);
 		ASSERT3U(ds->ds_reserved, ==, 0);
 	}
 
@@ -2019,7 +2028,8 @@
 	    dsl_dataset_unique(ds));
 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
 	    ds->ds_object);
-	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS, ds->ds_userrefs);
+	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
+	    ds->ds_userrefs);
 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
 	    DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
 
@@ -3081,62 +3091,70 @@
 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
 	dsl_dataset_t *ds = arg1;
-	uint64_t *quotap = arg2;
-	uint64_t new_quota = *quotap;
+	dsl_prop_setarg_t *psa = arg2;
+	int err;
 
 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
 		return (ENOTSUP);
 
-	if (new_quota == 0)
+	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
+		return (err);
+
+	if (psa->psa_effective_value == 0)
 		return (0);
 
-	if (new_quota < ds->ds_phys->ds_used_bytes ||
-	    new_quota < ds->ds_reserved)
+	if (psa->psa_effective_value < ds->ds_phys->ds_used_bytes ||
+	    psa->psa_effective_value < ds->ds_reserved)
 		return (ENOSPC);
 
 	return (0);
 }
 
-/* ARGSUSED */
+extern void dsl_prop_set_sync(void *, void *, cred_t *, dmu_tx_t *);
+
 void
 dsl_dataset_set_quota_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
 	dsl_dataset_t *ds = arg1;
-	uint64_t *quotap = arg2;
-	uint64_t new_quota = *quotap;
-
-	dmu_buf_will_dirty(ds->ds_dbuf, tx);
-
-	ds->ds_quota = new_quota;
-
-	dsl_dir_prop_set_uint64_sync(ds->ds_dir, "refquota", new_quota, cr, tx);
-
-	spa_history_internal_log(LOG_DS_REFQUOTA, ds->ds_dir->dd_pool->dp_spa,
-	    tx, cr, "%lld dataset = %llu ",
-	    (longlong_t)new_quota, ds->ds_object);
+	dsl_prop_setarg_t *psa = arg2;
+	uint64_t effective_value = psa->psa_effective_value;
+
+	dsl_prop_set_sync(ds, psa, cr, tx);
+	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
+
+	if (ds->ds_quota != effective_value) {
+		dmu_buf_will_dirty(ds->ds_dbuf, tx);
+		ds->ds_quota = effective_value;
+
+		spa_history_internal_log(LOG_DS_REFQUOTA,
+		    ds->ds_dir->dd_pool->dp_spa, tx, cr, "%lld dataset = %llu ",
+		    (longlong_t)ds->ds_quota, ds->ds_object);
+	}
 }
 
 int
-dsl_dataset_set_quota(const char *dsname, uint64_t quota)
+dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
 {
 	dsl_dataset_t *ds;
+	dsl_prop_setarg_t psa;
 	int err;
 
+	dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
+
 	err = dsl_dataset_hold(dsname, FTAG, &ds);
 	if (err)
 		return (err);
 
-	if (quota != ds->ds_quota) {
-		/*
-		 * If someone removes a file, then tries to set the quota, we
-		 * want to make sure the file freeing takes effect.
-		 */
-		txg_wait_open(ds->ds_dir->dd_pool, 0);
-
-		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
-		    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
-		    ds, &quota, 0);
-	}
+	/*
+	 * If someone removes a file, then tries to set the quota, we
+	 * want to make sure the file freeing takes effect.
+	 */
+	txg_wait_open(ds->ds_dir->dd_pool, 0);
+
+	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
+	    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
+	    ds, &psa, 0);
+
 	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
@@ -3145,9 +3163,10 @@
 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
 	dsl_dataset_t *ds = arg1;
-	uint64_t *reservationp = arg2;
-	uint64_t new_reservation = *reservationp;
+	dsl_prop_setarg_t *psa = arg2;
+	uint64_t effective_value;
 	uint64_t unique;
+	int err;
 
 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
 	    SPA_VERSION_REFRESERVATION)
@@ -3156,6 +3175,11 @@
 	if (dsl_dataset_is_snapshot(ds))
 		return (EINVAL);
 
+	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
+		return (err);
+
+	effective_value = psa->psa_effective_value;
+
 	/*
 	 * If we are doing the preliminary check in open context, the
 	 * space estimates may be inaccurate.
@@ -3167,14 +3191,14 @@
 	unique = dsl_dataset_unique(ds);
 	mutex_exit(&ds->ds_lock);
 
-	if (MAX(unique, new_reservation) > MAX(unique, ds->ds_reserved)) {
-		uint64_t delta = MAX(unique, new_reservation) -
+	if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
+		uint64_t delta = MAX(unique, effective_value) -
 		    MAX(unique, ds->ds_reserved);
 
 		if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
 			return (ENOSPC);
 		if (ds->ds_quota > 0 &&
-		    new_reservation > ds->ds_quota)
+		    effective_value > ds->ds_quota)
 			return (ENOSPC);
 	}
 
@@ -3187,44 +3211,51 @@
     dmu_tx_t *tx)
 {
 	dsl_dataset_t *ds = arg1;
-	uint64_t *reservationp = arg2;
-	uint64_t new_reservation = *reservationp;
+	dsl_prop_setarg_t *psa = arg2;
+	uint64_t effective_value = psa->psa_effective_value;
 	uint64_t unique;
 	int64_t delta;
 
+	dsl_prop_set_sync(ds, psa, cr, tx);
+	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
+
 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
 
 	mutex_enter(&ds->ds_dir->dd_lock);
 	mutex_enter(&ds->ds_lock);
 	unique = dsl_dataset_unique(ds);
-	delta = MAX(0, (int64_t)(new_reservation - unique)) -
+	delta = MAX(0, (int64_t)(effective_value - unique)) -
 	    MAX(0, (int64_t)(ds->ds_reserved - unique));
-	ds->ds_reserved = new_reservation;
+	ds->ds_reserved = effective_value;
 	mutex_exit(&ds->ds_lock);
 
 	dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
 	mutex_exit(&ds->ds_dir->dd_lock);
-	dsl_dir_prop_set_uint64_sync(ds->ds_dir, "refreservation",
-	    new_reservation, cr, tx);
 
 	spa_history_internal_log(LOG_DS_REFRESERV,
 	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "%lld dataset = %llu",
-	    (longlong_t)new_reservation, ds->ds_object);
+	    (longlong_t)effective_value, ds->ds_object);
 }
 
 int
-dsl_dataset_set_reservation(const char *dsname, uint64_t reservation)
+dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
+    uint64_t reservation)
 {
 	dsl_dataset_t *ds;
+	dsl_prop_setarg_t psa;
 	int err;
 
+	dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
+	    &reservation);
+
 	err = dsl_dataset_hold(dsname, FTAG, &ds);
 	if (err)
 		return (err);
 
 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
 	    dsl_dataset_set_reservation_check,
-	    dsl_dataset_set_reservation_sync, ds, &reservation, 0);
+	    dsl_dataset_set_reservation_sync, ds, &psa, 0);
+
 	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
--- a/usr/src/uts/common/fs/zfs/dsl_deleg.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/dsl_deleg.c	Mon Nov 09 20:45:32 2009 -0800
@@ -587,7 +587,7 @@
 
 			if (dsl_prop_get_dd(dd,
 			    zfs_prop_to_name(ZFS_PROP_ZONED),
-			    8, 1, &zoned, NULL) != 0)
+			    8, 1, &zoned, NULL, B_FALSE) != 0)
 				break;
 			if (!zoned)
 				break;
--- a/usr/src/uts/common/fs/zfs/dsl_dir.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/dsl_dir.c	Mon Nov 09 20:45:32 2009 -0800
@@ -430,7 +430,8 @@
 int
 dsl_dir_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
-	dsl_dir_t *dd = arg1;
+	dsl_dataset_t *ds = arg1;
+	dsl_dir_t *dd = ds->ds_dir;
 	dsl_pool_t *dp = dd->dd_pool;
 	objset_t *mos = dp->dp_meta_objset;
 	int err;
@@ -459,17 +460,25 @@
 void
 dsl_dir_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
 {
-	dsl_dir_t *dd = arg1;
+	dsl_dataset_t *ds = arg1;
+	dsl_dir_t *dd = ds->ds_dir;
 	objset_t *mos = dd->dd_pool->dp_meta_objset;
-	uint64_t val, obj;
+	dsl_prop_setarg_t psa;
+	uint64_t value = 0;
+	uint64_t obj;
 	dd_used_t t;
 
 	ASSERT(RW_WRITE_HELD(&dd->dd_pool->dp_config_rwlock));
 	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
 
 	/* Remove our reservation. */
-	val = 0;
-	dsl_dir_set_reservation_sync(dd, &val, cr, tx);
+	dsl_prop_setarg_init_uint64(&psa, "reservation",
+	    (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
+	    &value);
+	psa.psa_effective_value = 0;	/* predict default value */
+
+	dsl_dir_set_reservation_sync(ds, &psa, cr, tx);
+
 	ASSERT3U(dd->dd_phys->dd_used_bytes, ==, 0);
 	ASSERT3U(dd->dd_phys->dd_reserved, ==, 0);
 	for (t = 0; t < DD_USED_NUM; t++)
@@ -995,13 +1004,16 @@
 static int
 dsl_dir_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
-	dsl_dir_t *dd = arg1;
-	uint64_t *quotap = arg2;
-	uint64_t new_quota = *quotap;
-	int err = 0;
+	dsl_dataset_t *ds = arg1;
+	dsl_dir_t *dd = ds->ds_dir;
+	dsl_prop_setarg_t *psa = arg2;
+	int err;
 	uint64_t towrite;
 
-	if (new_quota == 0)
+	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
+		return (err);
+
+	if (psa->psa_effective_value == 0)
 		return (0);
 
 	mutex_enter(&dd->dd_lock);
@@ -1013,64 +1025,89 @@
 	 */
 	towrite = dsl_dir_space_towrite(dd);
 	if ((dmu_tx_is_syncing(tx) || towrite == 0) &&
-	    (new_quota < dd->dd_phys->dd_reserved ||
-	    new_quota < dd->dd_phys->dd_used_bytes + towrite)) {
+	    (psa->psa_effective_value < dd->dd_phys->dd_reserved ||
+	    psa->psa_effective_value < dd->dd_phys->dd_used_bytes + towrite)) {
 		err = ENOSPC;
 	}
 	mutex_exit(&dd->dd_lock);
 	return (err);
 }
 
+extern void dsl_prop_set_sync(void *, void *, cred_t *, dmu_tx_t *);
+
 /* ARGSUSED */
 static void
 dsl_dir_set_quota_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
-	dsl_dir_t *dd = arg1;
-	uint64_t *quotap = arg2;
-	uint64_t new_quota = *quotap;
+	dsl_dataset_t *ds = arg1;
+	dsl_dir_t *dd = ds->ds_dir;
+	dsl_prop_setarg_t *psa = arg2;
+	uint64_t effective_value = psa->psa_effective_value;
+
+	dsl_prop_set_sync(ds, psa, cr, tx);
+	DSL_PROP_CHECK_PREDICTION(dd, psa);
 
 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
 
 	mutex_enter(&dd->dd_lock);
-	dd->dd_phys->dd_quota = new_quota;
+	dd->dd_phys->dd_quota = effective_value;
 	mutex_exit(&dd->dd_lock);
 
 	spa_history_internal_log(LOG_DS_QUOTA, dd->dd_pool->dp_spa,
 	    tx, cr, "%lld dataset = %llu ",
-	    (longlong_t)new_quota, dd->dd_phys->dd_head_dataset_obj);
+	    (longlong_t)effective_value, dd->dd_phys->dd_head_dataset_obj);
 }
 
 int
-dsl_dir_set_quota(const char *ddname, uint64_t quota)
+dsl_dir_set_quota(const char *ddname, zprop_source_t source, uint64_t quota)
 {
 	dsl_dir_t *dd;
+	dsl_dataset_t *ds;
+	dsl_prop_setarg_t psa;
 	int err;
 
-	err = dsl_dir_open(ddname, FTAG, &dd, NULL);
+	dsl_prop_setarg_init_uint64(&psa, "quota", source, &quota);
+
+	err = dsl_dataset_hold(ddname, FTAG, &ds);
 	if (err)
 		return (err);
 
-	if (quota != dd->dd_phys->dd_quota) {
-		/*
-		 * If someone removes a file, then tries to set the quota, we
-		 * want to make sure the file freeing takes effect.
-		 */
-		txg_wait_open(dd->dd_pool, 0);
+	err = dsl_dir_open(ddname, FTAG, &dd, NULL);
+	if (err) {
+		dsl_dataset_rele(ds, FTAG);
+		return (err);
+	}
+
+	ASSERT(ds->ds_dir == dd);
 
-		err = dsl_sync_task_do(dd->dd_pool, dsl_dir_set_quota_check,
-		    dsl_dir_set_quota_sync, dd, &quota, 0);
-	}
+	/*
+	 * If someone removes a file, then tries to set the quota, we want to
+	 * make sure the file freeing takes effect.
+	 */
+	txg_wait_open(dd->dd_pool, 0);
+
+	err = dsl_sync_task_do(dd->dd_pool, dsl_dir_set_quota_check,
+	    dsl_dir_set_quota_sync, ds, &psa, 0);
+
 	dsl_dir_close(dd, FTAG);
+	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
 
 int
 dsl_dir_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
-	dsl_dir_t *dd = arg1;
-	uint64_t *reservationp = arg2;
-	uint64_t new_reservation = *reservationp;
+	dsl_dataset_t *ds = arg1;
+	dsl_dir_t *dd = ds->ds_dir;
+	dsl_prop_setarg_t *psa = arg2;
+	uint64_t effective_value;
 	uint64_t used, avail;
+	int err;
+
+	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
+		return (err);
+
+	effective_value = psa->psa_effective_value;
 
 	/*
 	 * If we are doing the preliminary check in open context, the
@@ -1090,14 +1127,14 @@
 		avail = dsl_pool_adjustedsize(dd->dd_pool, B_FALSE) - used;
 	}
 
-	if (MAX(used, new_reservation) > MAX(used, dd->dd_phys->dd_reserved)) {
-		uint64_t delta = MAX(used, new_reservation) -
+	if (MAX(used, effective_value) > MAX(used, dd->dd_phys->dd_reserved)) {
+		uint64_t delta = MAX(used, effective_value) -
 		    MAX(used, dd->dd_phys->dd_reserved);
 
 		if (delta > avail)
 			return (ENOSPC);
 		if (dd->dd_phys->dd_quota > 0 &&
-		    new_reservation > dd->dd_phys->dd_quota)
+		    effective_value > dd->dd_phys->dd_quota)
 			return (ENOSPC);
 	}
 
@@ -1108,19 +1145,23 @@
 static void
 dsl_dir_set_reservation_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
-	dsl_dir_t *dd = arg1;
-	uint64_t *reservationp = arg2;
-	uint64_t new_reservation = *reservationp;
+	dsl_dataset_t *ds = arg1;
+	dsl_dir_t *dd = ds->ds_dir;
+	dsl_prop_setarg_t *psa = arg2;
+	uint64_t effective_value = psa->psa_effective_value;
 	uint64_t used;
 	int64_t delta;
 
+	dsl_prop_set_sync(ds, psa, cr, tx);
+	DSL_PROP_CHECK_PREDICTION(dd, psa);
+
 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
 
 	mutex_enter(&dd->dd_lock);
 	used = dd->dd_phys->dd_used_bytes;
-	delta = MAX(used, new_reservation) -
+	delta = MAX(used, effective_value) -
 	    MAX(used, dd->dd_phys->dd_reserved);
-	dd->dd_phys->dd_reserved = new_reservation;
+	dd->dd_phys->dd_reserved = effective_value;
 
 	if (dd->dd_parent != NULL) {
 		/* Roll up this additional usage into our ancestors */
@@ -1131,21 +1172,37 @@
 
 	spa_history_internal_log(LOG_DS_RESERVATION, dd->dd_pool->dp_spa,
 	    tx, cr, "%lld dataset = %llu",
-	    (longlong_t)new_reservation, dd->dd_phys->dd_head_dataset_obj);
+	    (longlong_t)effective_value, dd->dd_phys->dd_head_dataset_obj);
 }
 
 int
-dsl_dir_set_reservation(const char *ddname, uint64_t reservation)
+dsl_dir_set_reservation(const char *ddname, zprop_source_t source,
+    uint64_t reservation)
 {
 	dsl_dir_t *dd;
+	dsl_dataset_t *ds;
+	dsl_prop_setarg_t psa;
 	int err;
 
+	dsl_prop_setarg_init_uint64(&psa, "reservation", source, &reservation);
+
+	err = dsl_dataset_hold(ddname, FTAG, &ds);
+	if (err)
+		return (err);
+
 	err = dsl_dir_open(ddname, FTAG, &dd, NULL);
-	if (err)
+	if (err) {
+		dsl_dataset_rele(ds, FTAG);
 		return (err);
+	}
+
+	ASSERT(ds->ds_dir == dd);
+
 	err = dsl_sync_task_do(dd->dd_pool, dsl_dir_set_reservation_check,
-	    dsl_dir_set_reservation_sync, dd, &reservation, 0);
+	    dsl_dir_set_reservation_sync, ds, &psa, 0);
+
 	dsl_dir_close(dd, FTAG);
+	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
 
--- a/usr/src/uts/common/fs/zfs/dsl_prop.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/dsl_prop.c	Mon Nov 09 20:45:32 2009 -0800
@@ -23,6 +23,7 @@
  * Use is subject to license terms.
  */
 
+#include <sys/zfs_context.h>
 #include <sys/dmu.h>
 #include <sys/dmu_objset.h>
 #include <sys/dmu_tx.h>
@@ -36,8 +37,11 @@
 
 #include "zfs_prop.h"
 
+#define	ZPROP_INHERIT_SUFFIX "$inherit"
+#define	ZPROP_RECVD_SUFFIX "$recvd"
+
 static int
-dodefault(const char *propname, int intsz, int numint, void *buf)
+dodefault(const char *propname, int intsz, int numints, void *buf)
 {
 	zfs_prop_t prop;
 
@@ -54,9 +58,9 @@
 		if (intsz != 1)
 			return (EOVERFLOW);
 		(void) strncpy(buf, zfs_prop_default_string(prop),
-		    numint);
+		    numints);
 	} else {
-		if (intsz != 8 || numint < 1)
+		if (intsz != 8 || numints < 1)
 			return (EOVERFLOW);
 
 		*(uint64_t *)buf = zfs_prop_default_numeric(prop);
@@ -67,11 +71,16 @@
 
 int
 dsl_prop_get_dd(dsl_dir_t *dd, const char *propname,
-    int intsz, int numint, void *buf, char *setpoint)
+    int intsz, int numints, void *buf, char *setpoint, boolean_t snapshot)
 {
 	int err = ENOENT;
+	dsl_dir_t *target = dd;
 	objset_t *mos = dd->dd_pool->dp_meta_objset;
 	zfs_prop_t prop;
+	boolean_t inheritable;
+	boolean_t inheriting = B_FALSE;
+	char *inheritstr;
+	char *recvdstr;
 
 	ASSERT(RW_LOCK_HELD(&dd->dd_pool->dp_config_rwlock));
 
@@ -79,51 +88,135 @@
 		setpoint[0] = '\0';
 
 	prop = zfs_name_to_prop(propname);
+	inheritable = (prop == ZPROP_INVAL || zfs_prop_inheritable(prop));
+	inheritstr = kmem_asprintf("%s%s", propname, ZPROP_INHERIT_SUFFIX);
+	recvdstr = kmem_asprintf("%s%s", propname, ZPROP_RECVD_SUFFIX);
 
 	/*
-	 * Note: dd may be NULL, therefore we shouldn't dereference it
-	 * ouside this loop.
+	 * Note: dd may become NULL, therefore we shouldn't dereference it
+	 * after this loop.
 	 */
 	for (; dd != NULL; dd = dd->dd_parent) {
 		ASSERT(RW_LOCK_HELD(&dd->dd_pool->dp_config_rwlock));
-		err = zap_lookup(mos, dd->dd_phys->dd_props_zapobj,
-		    propname, intsz, numint, buf);
+
+		if (dd != target || snapshot) {
+			if (!inheritable)
+				break;
+			inheriting = B_TRUE;
+		}
+
+		/* Check for a local value. */
+		err = zap_lookup(mos, dd->dd_phys->dd_props_zapobj, propname,
+		    intsz, numints, buf);
 		if (err != ENOENT) {
-			if (setpoint)
+			if (setpoint != NULL && err == 0)
 				dsl_dir_name(dd, setpoint);
 			break;
 		}
 
 		/*
-		 * Break out of this loop for non-inheritable properties.
+		 * Skip the check for a received value if there is an explicit
+		 * inheritance entry.
 		 */
-		if (prop != ZPROP_INVAL && !zfs_prop_inheritable(prop))
+		err = zap_contains(mos, dd->dd_phys->dd_props_zapobj,
+		    inheritstr);
+		if (err != 0 && err != ENOENT)
 			break;
+
+		if (err == ENOENT) {
+			/* Check for a received value. */
+			err = zap_lookup(mos, dd->dd_phys->dd_props_zapobj,
+			    recvdstr, intsz, numints, buf);
+			if (err != ENOENT) {
+				if (setpoint != NULL && err == 0) {
+					if (inheriting) {
+						dsl_dir_name(dd, setpoint);
+					} else {
+						(void) strcpy(setpoint,
+						    ZPROP_SOURCE_VAL_RECVD);
+					}
+				}
+				break;
+			}
+		}
+
+		/*
+		 * If we found an explicit inheritance entry, err is zero even
+		 * though we haven't yet found the value, so reinitializing err
+		 * at the end of the loop (instead of at the beginning) ensures
+		 * that err has a valid post-loop value.
+		 */
+		err = ENOENT;
 	}
+
 	if (err == ENOENT)
-		err = dodefault(propname, intsz, numint, buf);
+		err = dodefault(propname, intsz, numints, buf);
+
+	strfree(inheritstr);
+	strfree(recvdstr);
 
 	return (err);
 }
 
 int
 dsl_prop_get_ds(dsl_dataset_t *ds, const char *propname,
-    int intsz, int numint, void *buf, char *setpoint)
+    int intsz, int numints, void *buf, char *setpoint)
 {
-	ASSERT(RW_LOCK_HELD(&ds->ds_dir->dd_pool->dp_config_rwlock));
+	zfs_prop_t prop = zfs_name_to_prop(propname);
+	boolean_t inheritable;
+	boolean_t snapshot;
+	uint64_t zapobj;
 
-	if (ds->ds_phys->ds_props_obj) {
-		int err = zap_lookup(ds->ds_dir->dd_pool->dp_meta_objset,
-		    ds->ds_phys->ds_props_obj, propname, intsz, numint, buf);
+	ASSERT(RW_LOCK_HELD(&ds->ds_dir->dd_pool->dp_config_rwlock));
+	inheritable = (prop == ZPROP_INVAL || zfs_prop_inheritable(prop));
+	snapshot = (ds->ds_phys != NULL && dsl_dataset_is_snapshot(ds));
+	zapobj = (ds->ds_phys == NULL ? 0 : ds->ds_phys->ds_props_obj);
+
+	if (zapobj != 0) {
+		objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+		int err;
+
+		ASSERT(snapshot);
+
+		/* Check for a local value. */
+		err = zap_lookup(mos, zapobj, propname, intsz, numints, buf);
 		if (err != ENOENT) {
-			if (setpoint)
+			if (setpoint != NULL && err == 0)
 				dsl_dataset_name(ds, setpoint);
 			return (err);
 		}
+
+		/*
+		 * Skip the check for a received value if there is an explicit
+		 * inheritance entry.
+		 */
+		if (inheritable) {
+			char *inheritstr = kmem_asprintf("%s%s", propname,
+			    ZPROP_INHERIT_SUFFIX);
+			err = zap_contains(mos, zapobj, inheritstr);
+			strfree(inheritstr);
+			if (err != 0 && err != ENOENT)
+				return (err);
+		}
+
+		if (err == ENOENT) {
+			/* Check for a received value. */
+			char *recvdstr = kmem_asprintf("%s%s", propname,
+			    ZPROP_RECVD_SUFFIX);
+			err = zap_lookup(mos, zapobj, recvdstr,
+			    intsz, numints, buf);
+			strfree(recvdstr);
+			if (err != ENOENT) {
+				if (setpoint != NULL && err == 0)
+					(void) strcpy(setpoint,
+					    ZPROP_SOURCE_VAL_RECVD);
+				return (err);
+			}
+		}
 	}
 
 	return (dsl_prop_get_dd(ds->ds_dir, propname,
-	    intsz, numint, buf, setpoint));
+	    intsz, numints, buf, setpoint, snapshot));
 }
 
 /*
@@ -209,6 +302,137 @@
 	return (dsl_prop_get(ddname, propname, 8, 1, valuep, setpoint));
 }
 
+void
+dsl_prop_setarg_init_uint64(dsl_prop_setarg_t *psa, const char *propname,
+    zprop_source_t source, uint64_t *value)
+{
+	psa->psa_name = propname;
+	psa->psa_source = source;
+	psa->psa_intsz = 8;
+	psa->psa_numints = 1;
+	psa->psa_value = value;
+
+	psa->psa_effective_value = -1ULL;
+}
+
+/*
+ * Predict the effective value of the given special property if it were set with
+ * the given value and source. This is not a general purpose function. It exists
+ * only to handle the special requirements of the quota and reservation
+ * properties. The fact that these properties are non-inheritable greatly
+ * simplifies the prediction logic.
+ *
+ * Returns 0 on success, a positive error code on failure, or -1 if called with
+ * a property not handled by this function.
+ */
+int
+dsl_prop_predict_sync(dsl_dir_t *dd, dsl_prop_setarg_t *psa)
+{
+	const char *propname = psa->psa_name;
+	zfs_prop_t prop = zfs_name_to_prop(propname);
+	zprop_source_t source = psa->psa_source;
+	objset_t *mos;
+	uint64_t zapobj;
+	uint64_t version;
+	char *recvdstr;
+	int err = 0;
+
+	switch (prop) {
+	case ZFS_PROP_QUOTA:
+	case ZFS_PROP_RESERVATION:
+	case ZFS_PROP_REFQUOTA:
+	case ZFS_PROP_REFRESERVATION:
+		break;
+	default:
+		return (-1);
+	}
+
+	mos = dd->dd_pool->dp_meta_objset;
+	zapobj = dd->dd_phys->dd_props_zapobj;
+	recvdstr = kmem_asprintf("%s%s", propname, ZPROP_RECVD_SUFFIX);
+
+	version = spa_version(dd->dd_pool->dp_spa);
+	if (version < SPA_VERSION_RECVD_PROPS) {
+		if (source & ZPROP_SRC_NONE)
+			source = ZPROP_SRC_NONE;
+		else if (source & ZPROP_SRC_RECEIVED)
+			source = ZPROP_SRC_LOCAL;
+	}
+
+	switch (source) {
+	case ZPROP_SRC_NONE:
+		/* Revert to the received value, if any. */
+		err = zap_lookup(mos, zapobj, recvdstr, 8, 1,
+		    &psa->psa_effective_value);
+		if (err == ENOENT)
+			psa->psa_effective_value = 0;
+		break;
+	case ZPROP_SRC_LOCAL:
+		psa->psa_effective_value = *(uint64_t *)psa->psa_value;
+		break;
+	case ZPROP_SRC_RECEIVED:
+		/*
+		 * If there's no local setting, then the new received value will
+		 * be the effective value.
+		 */
+		err = zap_lookup(mos, zapobj, propname, 8, 1,
+		    &psa->psa_effective_value);
+		if (err == ENOENT)
+			psa->psa_effective_value = *(uint64_t *)psa->psa_value;
+		break;
+	case (ZPROP_SRC_NONE | ZPROP_SRC_RECEIVED):
+		/*
+		 * We're clearing the received value, so the local setting (if
+		 * it exists) remains the effective value.
+		 */
+		err = zap_lookup(mos, zapobj, propname, 8, 1,
+		    &psa->psa_effective_value);
+		if (err == ENOENT)
+			psa->psa_effective_value = 0;
+		break;
+	default:
+		cmn_err(CE_PANIC, "unexpected property source: %d", source);
+	}
+
+	strfree(recvdstr);
+
+	if (err == ENOENT)
+		return (0);
+
+	return (err);
+}
+
+#ifdef	ZFS_DEBUG
+void
+dsl_prop_check_prediction(dsl_dir_t *dd, dsl_prop_setarg_t *psa)
+{
+	zfs_prop_t prop = zfs_name_to_prop(psa->psa_name);
+	uint64_t intval;
+	char setpoint[MAXNAMELEN];
+	uint64_t version = spa_version(dd->dd_pool->dp_spa);
+	int err;
+
+	if (version < SPA_VERSION_RECVD_PROPS) {
+		switch (prop) {
+		case ZFS_PROP_QUOTA:
+		case ZFS_PROP_RESERVATION:
+			return;
+		}
+	}
+
+	err = dsl_prop_get_dd(dd, psa->psa_name, 8, 1, &intval,
+	    setpoint, B_FALSE);
+	if (err == 0 && intval != psa->psa_effective_value) {
+		cmn_err(CE_PANIC, "%s property, source: %x, "
+		    "predicted effective value: %llu, "
+		    "actual effective value: %llu (setpoint: %s)",
+		    psa->psa_name, psa->psa_source,
+		    (unsigned long long)psa->psa_effective_value,
+		    (unsigned long long)intval, setpoint);
+	}
+}
+#endif
+
 /*
  * Unregister this callback.  Return 0 on success, ENOENT if ddname is
  * invalid, ENOMSG if no matching callback registered.
@@ -276,7 +500,6 @@
 	zap_cursor_t zc;
 	zap_attribute_t *za;
 	int err;
-	uint64_t dummyval;
 
 	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
 	err = dsl_dir_open_obj(dp, ddobj, NULL, FTAG, &dd);
@@ -288,8 +511,7 @@
 		 * If the prop is set here, then this change is not
 		 * being inherited here or below; stop the recursion.
 		 */
-		err = zap_lookup(mos, dd->dd_phys->dd_props_zapobj, propname,
-		    8, 1, &dummyval);
+		err = zap_contains(mos, dd->dd_phys->dd_props_zapobj, propname);
 		if (err == 0) {
 			dsl_dir_close(dd, FTAG);
 			return;
@@ -309,8 +531,7 @@
 		 * If the property is set on this ds, then it is not
 		 * inherited here; don't call the callback.
 		 */
-		if (propobj && 0 == zap_lookup(mos, propobj, propname,
-		    8, 1, &dummyval))
+		if (propobj && 0 == zap_contains(mos, propobj, propname))
 			continue;
 
 		cbr->cbr_func(cbr->cbr_arg, value);
@@ -330,30 +551,28 @@
 	dsl_dir_close(dd, FTAG);
 }
 
-struct prop_set_arg {
-	const char *name;
-	int intsz;
-	int numints;
-	const void *buf;
-};
-
-
-static void
+void
 dsl_prop_set_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
 	dsl_dataset_t *ds = arg1;
-	struct prop_set_arg *psa = arg2;
+	dsl_prop_setarg_t *psa = arg2;
 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
-	uint64_t zapobj, intval;
+	uint64_t zapobj, intval, dummy;
 	int isint;
 	char valbuf[32];
-	char *valstr;
-
-	isint = (dodefault(psa->name, 8, 1, &intval) == 0);
+	char *valstr = NULL;
+	char *inheritstr;
+	char *recvdstr;
+	char *tbuf = NULL;
+	int err;
+	uint64_t version = spa_version(ds->ds_dir->dd_pool->dp_spa);
+	const char *propname = psa->psa_name;
+	zprop_source_t source = psa->psa_source;
 
-	if (dsl_dataset_is_snapshot(ds)) {
-		ASSERT(spa_version(ds->ds_dir->dd_pool->dp_spa) >=
-		    SPA_VERSION_SNAP_PROPS);
+	isint = (dodefault(propname, 8, 1, &intval) == 0);
+
+	if (ds->ds_phys != NULL && dsl_dataset_is_snapshot(ds)) {
+		ASSERT(version >= SPA_VERSION_SNAP_PROPS);
 		if (ds->ds_phys->ds_props_obj == 0) {
 			dmu_buf_will_dirty(ds->ds_dbuf, tx);
 			ds->ds_phys->ds_props_obj =
@@ -365,22 +584,96 @@
 		zapobj = ds->ds_dir->dd_phys->dd_props_zapobj;
 	}
 
-	if (psa->numints == 0) {
-		int err = zap_remove(mos, zapobj, psa->name, tx);
-		ASSERT(err == 0 || err == ENOENT);
-		if (isint) {
-			VERIFY(0 == dsl_prop_get_ds(ds,
-			    psa->name, 8, 1, &intval, NULL));
-		}
-	} else {
-		VERIFY(0 == zap_update(mos, zapobj, psa->name,
-		    psa->intsz, psa->numints, psa->buf, tx));
-		if (isint)
-			intval = *(uint64_t *)psa->buf;
+	if (version < SPA_VERSION_RECVD_PROPS) {
+		zfs_prop_t prop = zfs_name_to_prop(propname);
+		if (prop == ZFS_PROP_QUOTA || prop == ZFS_PROP_RESERVATION)
+			return;
+
+		if (source & ZPROP_SRC_NONE)
+			source = ZPROP_SRC_NONE;
+		else if (source & ZPROP_SRC_RECEIVED)
+			source = ZPROP_SRC_LOCAL;
 	}
 
+	inheritstr = kmem_asprintf("%s%s", propname, ZPROP_INHERIT_SUFFIX);
+	recvdstr = kmem_asprintf("%s%s", propname, ZPROP_RECVD_SUFFIX);
+
+	switch (source) {
+	case ZPROP_SRC_NONE:
+		/*
+		 * revert to received value, if any (inherit -S)
+		 * - remove propname
+		 * - remove propname$inherit
+		 */
+		err = zap_remove(mos, zapobj, propname, tx);
+		ASSERT(err == 0 || err == ENOENT);
+		err = zap_remove(mos, zapobj, inheritstr, tx);
+		ASSERT(err == 0 || err == ENOENT);
+		break;
+	case ZPROP_SRC_LOCAL:
+		/*
+		 * remove propname$inherit
+		 * set propname -> value
+		 */
+		err = zap_remove(mos, zapobj, inheritstr, tx);
+		ASSERT(err == 0 || err == ENOENT);
+		VERIFY(0 == zap_update(mos, zapobj, propname,
+		    psa->psa_intsz, psa->psa_numints, psa->psa_value, tx));
+		break;
+	case ZPROP_SRC_INHERITED:
+		/*
+		 * explicitly inherit
+		 * - remove propname
+		 * - set propname$inherit
+		 */
+		err = zap_remove(mos, zapobj, propname, tx);
+		ASSERT(err == 0 || err == ENOENT);
+		if (version >= SPA_VERSION_RECVD_PROPS &&
+		    zap_contains(mos, zapobj, ZPROP_HAS_RECVD) == 0) {
+			dummy = 0;
+			err = zap_update(mos, zapobj, inheritstr,
+			    8, 1, &dummy, tx);
+			ASSERT(err == 0);
+		}
+		break;
+	case ZPROP_SRC_RECEIVED:
+		/*
+		 * set propname$recvd -> value
+		 */
+		err = zap_update(mos, zapobj, recvdstr,
+		    psa->psa_intsz, psa->psa_numints, psa->psa_value, tx);
+		ASSERT(err == 0);
+		break;
+	case (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED):
+		/*
+		 * clear local and received settings
+		 * - remove propname
+		 * - remove propname$inherit
+		 * - remove propname$recvd
+		 */
+		err = zap_remove(mos, zapobj, propname, tx);
+		ASSERT(err == 0 || err == ENOENT);
+		err = zap_remove(mos, zapobj, inheritstr, tx);
+		ASSERT(err == 0 || err == ENOENT);
+		/* FALLTHRU */
+	case (ZPROP_SRC_NONE | ZPROP_SRC_RECEIVED):
+		/*
+		 * remove propname$recvd
+		 */
+		err = zap_remove(mos, zapobj, recvdstr, tx);
+		ASSERT(err == 0 || err == ENOENT);
+		break;
+	default:
+		cmn_err(CE_PANIC, "unexpected property source: %d", source);
+	}
+
+	strfree(inheritstr);
+	strfree(recvdstr);
+
 	if (isint) {
-		if (dsl_dataset_is_snapshot(ds)) {
+		VERIFY(0 == dsl_prop_get_ds(ds, propname, 8, 1, &intval, NULL));
+
+		if (ds->ds_phys != NULL && dsl_dataset_is_snapshot(ds)) {
 			dsl_prop_cb_record_t *cbr;
 			/*
 			 * It's a snapshot; nothing can inherit this
@@ -391,50 +684,77 @@
 			for (cbr = list_head(&ds->ds_dir->dd_prop_cbs); cbr;
 			    cbr = list_next(&ds->ds_dir->dd_prop_cbs, cbr)) {
 				if (cbr->cbr_ds == ds &&
-				    strcmp(cbr->cbr_propname, psa->name) == 0)
+				    strcmp(cbr->cbr_propname, propname) == 0)
 					cbr->cbr_func(cbr->cbr_arg, intval);
 			}
 			mutex_exit(&ds->ds_dir->dd_lock);
 		} else {
 			dsl_prop_changed_notify(ds->ds_dir->dd_pool,
-			    ds->ds_dir->dd_object, psa->name, intval, TRUE);
+			    ds->ds_dir->dd_object, propname, intval, TRUE);
 		}
-	}
-	if (isint) {
+
 		(void) snprintf(valbuf, sizeof (valbuf),
 		    "%lld", (longlong_t)intval);
 		valstr = valbuf;
 	} else {
-		valstr = (char *)psa->buf;
+		if (source == ZPROP_SRC_LOCAL) {
+			valstr = (char *)psa->psa_value;
+		} else {
+			tbuf = kmem_alloc(ZAP_MAXVALUELEN, KM_SLEEP);
+			if (dsl_prop_get_ds(ds, propname, 1,
+			    ZAP_MAXVALUELEN, tbuf, NULL) == 0)
+				valstr = tbuf;
+		}
 	}
-	spa_history_internal_log((psa->numints == 0) ? LOG_DS_INHERIT :
+
+	spa_history_internal_log((source == ZPROP_SRC_NONE ||
+	    source == ZPROP_SRC_INHERITED) ? LOG_DS_INHERIT :
 	    LOG_DS_PROPSET, ds->ds_dir->dd_pool->dp_spa, tx, cr,
-	    "%s=%s dataset = %llu", psa->name, valstr, ds->ds_object);
+	    "%s=%s dataset = %llu", propname,
+	    (valstr == NULL ? "" : valstr), ds->ds_object);
+
+	if (tbuf != NULL)
+		kmem_free(tbuf, ZAP_MAXVALUELEN);
 }
 
 void
 dsl_props_set_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 {
 	dsl_dataset_t *ds = arg1;
-	nvlist_t *nvl = arg2;
+	dsl_props_arg_t *pa = arg2;
+	nvlist_t *props = pa->pa_props;
+	dsl_prop_setarg_t psa;
 	nvpair_t *elem = NULL;
 
-	while ((elem = nvlist_next_nvpair(nvl, elem)) != NULL) {
-		struct prop_set_arg psa;
+	psa.psa_source = pa->pa_source;
 
-		psa.name = nvpair_name(elem);
+	while ((elem = nvlist_next_nvpair(props, elem)) != NULL) {
+		nvpair_t *pair = elem;
+
+		psa.psa_name = nvpair_name(pair);
 
-		if (nvpair_type(elem) == DATA_TYPE_STRING) {
-			VERIFY(nvpair_value_string(elem,
-			    (char **)&psa.buf) == 0);
-			psa.intsz = 1;
-			psa.numints = strlen(psa.buf) + 1;
+		if (nvpair_type(pair) == DATA_TYPE_NVLIST) {
+			/*
+			 * dsl_prop_get_all_impl() returns properties in this
+			 * format.
+			 */
+			nvlist_t *attrs;
+			VERIFY(nvpair_value_nvlist(pair, &attrs) == 0);
+			VERIFY(nvlist_lookup_nvpair(attrs, ZPROP_VALUE,
+			    &pair) == 0);
+		}
+
+		if (nvpair_type(pair) == DATA_TYPE_STRING) {
+			VERIFY(nvpair_value_string(pair,
+			    (char **)&psa.psa_value) == 0);
+			psa.psa_intsz = 1;
+			psa.psa_numints = strlen(psa.psa_value) + 1;
 		} else {
 			uint64_t intval;
-			VERIFY(nvpair_value_uint64(elem, &intval) == 0);
-			psa.intsz = sizeof (intval);
-			psa.numints = 1;
-			psa.buf = &intval;
+			VERIFY(nvpair_value_uint64(pair, &intval) == 0);
+			psa.psa_intsz = sizeof (intval);
+			psa.psa_numints = 1;
+			psa.psa_value = &intval;
 		}
 		dsl_prop_set_sync(ds, &psa, cr, tx);
 	}
@@ -459,13 +779,13 @@
 }
 
 int
-dsl_prop_set(const char *dsname, const char *propname,
+dsl_prop_set(const char *dsname, const char *propname, zprop_source_t source,
     int intsz, int numints, const void *buf)
 {
 	dsl_dataset_t *ds;
 	uint64_t version;
 	int err;
-	struct prop_set_arg psa;
+	dsl_prop_setarg_t psa;
 
 	/*
 	 * We must do these checks before we get to the syncfunc, since
@@ -490,10 +810,13 @@
 		return (ENOTSUP);
 	}
 
-	psa.name = propname;
-	psa.intsz = intsz;
-	psa.numints = numints;
-	psa.buf = buf;
+	psa.psa_name = propname;
+	psa.psa_source = source;
+	psa.psa_intsz = intsz;
+	psa.psa_numints = numints;
+	psa.psa_value = buf;
+	psa.psa_effective_value = -1ULL;
+
 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
 	    NULL, dsl_prop_set_sync, ds, &psa, 2);
 
@@ -502,11 +825,12 @@
 }
 
 int
-dsl_props_set(const char *dsname, nvlist_t *nvl)
+dsl_props_set(const char *dsname, zprop_source_t source, nvlist_t *props)
 {
 	dsl_dataset_t *ds;
 	uint64_t version;
 	nvpair_t *elem = NULL;
+	dsl_props_arg_t pa;
 	int err;
 
 	if (err = dsl_dataset_hold(dsname, FTAG, &ds))
@@ -515,7 +839,7 @@
 	 * Do these checks before the syncfunc, since it can't fail.
 	 */
 	version = spa_version(ds->ds_dir->dd_pool->dp_spa);
-	while ((elem = nvlist_next_nvpair(nvl, elem)) != NULL) {
+	while ((elem = nvlist_next_nvpair(props, elem)) != NULL) {
 		if (strlen(nvpair_name(elem)) >= ZAP_MAXNAMELEN) {
 			dsl_dataset_rele(ds, FTAG);
 			return (ENAMETOOLONG);
@@ -538,129 +862,281 @@
 		return (ENOTSUP);
 	}
 
+	pa.pa_props = props;
+	pa.pa_source = source;
+
 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
-	    NULL, dsl_props_set_sync, ds, nvl, 2);
+	    NULL, dsl_props_set_sync, ds, &pa, 2);
 
 	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
 
+typedef enum dsl_prop_getflags {
+	DSL_PROP_GET_INHERITING = 0x1,	/* searching parent of target ds */
+	DSL_PROP_GET_SNAPSHOT = 0x2,	/* snapshot dataset */
+	DSL_PROP_GET_LOCAL = 0x4,	/* local properties */
+	DSL_PROP_GET_RECEIVED = 0x8	/* received properties */
+} dsl_prop_getflags_t;
+
+static int
+dsl_prop_get_all_impl(objset_t *mos, uint64_t propobj,
+    const char *setpoint, dsl_prop_getflags_t flags, nvlist_t *nv)
+{
+	zap_cursor_t zc;
+	zap_attribute_t za;
+	int err = 0;
+
+	for (zap_cursor_init(&zc, mos, propobj);
+	    (err = zap_cursor_retrieve(&zc, &za)) == 0;
+	    zap_cursor_advance(&zc)) {
+		nvlist_t *propval;
+		zfs_prop_t prop;
+		char buf[ZAP_MAXNAMELEN];
+		char *valstr;
+		const char *suffix;
+		const char *propname;
+		const char *source;
+
+		suffix = strchr(za.za_name, '$');
+
+		if (suffix == NULL) {
+			/*
+			 * Skip local properties if we only want received
+			 * properties.
+			 */
+			if (flags & DSL_PROP_GET_RECEIVED)
+				continue;
+
+			propname = za.za_name;
+			source = setpoint;
+		} else if (strcmp(suffix, ZPROP_INHERIT_SUFFIX) == 0) {
+			/* Skip explicitly inherited entries. */
+			continue;
+		} else if (strcmp(suffix, ZPROP_RECVD_SUFFIX) == 0) {
+			if (flags & DSL_PROP_GET_LOCAL)
+				continue;
+
+			(void) strncpy(buf, za.za_name, (suffix - za.za_name));
+			buf[suffix - za.za_name] = '\0';
+			propname = buf;
+
+			if (!(flags & DSL_PROP_GET_RECEIVED)) {
+				/* Skip if locally overridden. */
+				err = zap_contains(mos, propobj, propname);
+				if (err == 0)
+					continue;
+				if (err != ENOENT)
+					break;
+
+				/* Skip if explicitly inherited. */
+				valstr = kmem_asprintf("%s%s", propname,
+				    ZPROP_INHERIT_SUFFIX);
+				err = zap_contains(mos, propobj, valstr);
+				strfree(valstr);
+				if (err == 0)
+					continue;
+				if (err != ENOENT)
+					break;
+			}
+
+			source = ((flags & DSL_PROP_GET_INHERITING) ?
+			    setpoint : ZPROP_SOURCE_VAL_RECVD);
+		} else {
+			/*
+			 * For backward compatibility, skip suffixes we don't
+			 * recognize.
+			 */
+			continue;
+		}
+
+		prop = zfs_name_to_prop(propname);
+
+		/* Skip non-inheritable properties. */
+		if ((flags & DSL_PROP_GET_INHERITING) && prop != ZPROP_INVAL &&
+		    !zfs_prop_inheritable(prop))
+			continue;
+
+		/* Skip properties not valid for this type. */
+		if ((flags & DSL_PROP_GET_SNAPSHOT) && prop != ZPROP_INVAL &&
+		    !zfs_prop_valid_for_type(prop, ZFS_TYPE_SNAPSHOT))
+			continue;
+
+		/* Skip properties already defined. */
+		if (nvlist_exists(nv, propname))
+			continue;
+
+		VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
+		if (za.za_integer_length == 1) {
+			/*
+			 * String property
+			 */
+			char *tmp = kmem_alloc(za.za_num_integers,
+			    KM_SLEEP);
+			err = zap_lookup(mos, propobj,
+			    za.za_name, 1, za.za_num_integers, tmp);
+			if (err != 0) {
+				kmem_free(tmp, za.za_num_integers);
+				break;
+			}
+			VERIFY(nvlist_add_string(propval, ZPROP_VALUE,
+			    tmp) == 0);
+			kmem_free(tmp, za.za_num_integers);
+		} else {
+			/*
+			 * Integer property
+			 */
+			ASSERT(za.za_integer_length == 8);
+			(void) nvlist_add_uint64(propval, ZPROP_VALUE,
+			    za.za_first_integer);
+		}
+
+		VERIFY(nvlist_add_string(propval, ZPROP_SOURCE, source) == 0);
+		VERIFY(nvlist_add_nvlist(nv, propname, propval) == 0);
+		nvlist_free(propval);
+	}
+	zap_cursor_fini(&zc);
+	if (err == ENOENT)
+		err = 0;
+	return (err);
+}
+
 /*
  * Iterate over all properties for this dataset and return them in an nvlist.
  */
-int
-dsl_prop_get_all(objset_t *os, nvlist_t **nvp, boolean_t local)
+static int
+dsl_prop_get_all_ds(dsl_dataset_t *ds, nvlist_t **nvp,
+    dsl_prop_getflags_t flags)
 {
-	dsl_dataset_t *ds = os->os_dsl_dataset;
 	dsl_dir_t *dd = ds->ds_dir;
-	boolean_t snapshot = dsl_dataset_is_snapshot(ds);
-	int err = 0;
 	dsl_pool_t *dp = dd->dd_pool;
 	objset_t *mos = dp->dp_meta_objset;
-	uint64_t propobj = ds->ds_phys->ds_props_obj;
+	int err = 0;
+	char setpoint[MAXNAMELEN];
 
 	VERIFY(nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP) == 0);
 
-	if (local && snapshot && !propobj)
-		return (0);
+	if (dsl_dataset_is_snapshot(ds))
+		flags |= DSL_PROP_GET_SNAPSHOT;
 
 	rw_enter(&dp->dp_config_rwlock, RW_READER);
-	while (dd != NULL) {
-		char setpoint[MAXNAMELEN];
-		zap_cursor_t zc;
-		zap_attribute_t za;
-		dsl_dir_t *dd_next;
 
-		if (propobj) {
-			dsl_dataset_name(ds, setpoint);
-			dd_next = dd;
-		} else {
-			dsl_dir_name(dd, setpoint);
-			propobj = dd->dd_phys->dd_props_zapobj;
-			dd_next = dd->dd_parent;
-		}
+	if (ds->ds_phys->ds_props_obj != 0) {
+		ASSERT(flags & DSL_PROP_GET_SNAPSHOT);
+		dsl_dataset_name(ds, setpoint);
+		err = dsl_prop_get_all_impl(mos, ds->ds_phys->ds_props_obj,
+		    setpoint, flags, *nvp);
+		if (err)
+			goto out;
+	}
 
-		for (zap_cursor_init(&zc, mos, propobj);
-		    (err = zap_cursor_retrieve(&zc, &za)) == 0;
-		    zap_cursor_advance(&zc)) {
-			nvlist_t *propval;
-			zfs_prop_t prop = zfs_name_to_prop(za.za_name);
+	for (; dd != NULL; dd = dd->dd_parent) {
+		if (dd != ds->ds_dir || (flags & DSL_PROP_GET_SNAPSHOT)) {
+			if (flags & (DSL_PROP_GET_LOCAL |
+			    DSL_PROP_GET_RECEIVED))
+				break;
+			flags |= DSL_PROP_GET_INHERITING;
+		}
+		dsl_dir_name(dd, setpoint);
+		err = dsl_prop_get_all_impl(mos, dd->dd_phys->dd_props_zapobj,
+		    setpoint, flags, *nvp);
+		if (err)
+			break;
+	}
+out:
+	rw_exit(&dp->dp_config_rwlock);
+	return (err);
+}
 
-			/* Skip non-inheritable properties. */
-			if (prop != ZPROP_INVAL &&
-			    !zfs_prop_inheritable(prop) &&
-			    (dd != ds->ds_dir || (snapshot && dd != dd_next)))
-				continue;
+boolean_t
+dsl_prop_get_hasrecvd(objset_t *os)
+{
+	dsl_dataset_t *ds = os->os_dsl_dataset;
+	int rc;
+	uint64_t dummy;
 
-			/* Skip properties not valid for this type. */
-			if (snapshot && prop != ZPROP_INVAL &&
-			    !zfs_prop_valid_for_type(prop, ZFS_TYPE_SNAPSHOT))
-				continue;
-
-			/* Skip properties already defined */
-			if (nvlist_lookup_nvlist(*nvp, za.za_name,
-			    &propval) == 0)
-				continue;
+	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
+	rc = dsl_prop_get_ds(ds, ZPROP_HAS_RECVD, 8, 1, &dummy, NULL);
+	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
+	ASSERT(rc != 0 || spa_version(os->os_spa) >= SPA_VERSION_RECVD_PROPS);
+	return (rc == 0);
+}
 
-			VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME,
-			    KM_SLEEP) == 0);
-			if (za.za_integer_length == 1) {
-				/*
-				 * String property
-				 */
-				char *tmp = kmem_alloc(za.za_num_integers,
-				    KM_SLEEP);
-				err = zap_lookup(mos, propobj,
-				    za.za_name, 1, za.za_num_integers, tmp);
-				if (err != 0) {
-					kmem_free(tmp, za.za_num_integers);
-					break;
-				}
-				VERIFY(nvlist_add_string(propval, ZPROP_VALUE,
-				    tmp) == 0);
-				kmem_free(tmp, za.za_num_integers);
-			} else {
-				/*
-				 * Integer property
-				 */
-				ASSERT(za.za_integer_length == 8);
-				(void) nvlist_add_uint64(propval, ZPROP_VALUE,
-				    za.za_first_integer);
-			}
+static void
+dsl_prop_set_hasrecvd_impl(objset_t *os, zprop_source_t source)
+{
+	dsl_dataset_t *ds = os->os_dsl_dataset;
+	uint64_t dummy = 0;
+	dsl_prop_setarg_t psa;
+
+	if (spa_version(os->os_spa) < SPA_VERSION_RECVD_PROPS)
+		return;
+
+	dsl_prop_setarg_init_uint64(&psa, ZPROP_HAS_RECVD, source, &dummy);
+
+	(void) dsl_sync_task_do(ds->ds_dir->dd_pool, NULL,
+	    dsl_prop_set_sync, ds, &psa, 2);
+}
 
-			VERIFY(nvlist_add_string(propval, ZPROP_SOURCE,
-			    setpoint) == 0);
-			VERIFY(nvlist_add_nvlist(*nvp, za.za_name,
-			    propval) == 0);
-			nvlist_free(propval);
-		}
-		zap_cursor_fini(&zc);
+/*
+ * Call after successfully receiving properties to ensure that only the first
+ * receive on or after SPA_VERSION_RECVD_PROPS blows away local properties.
+ */
+void
+dsl_prop_set_hasrecvd(objset_t *os)
+{
+	if (dsl_prop_get_hasrecvd(os)) {
+		ASSERT(spa_version(os->os_spa) >= SPA_VERSION_RECVD_PROPS);
+		return;
+	}
+	dsl_prop_set_hasrecvd_impl(os, ZPROP_SRC_LOCAL);
+}
 
-		if (err != ENOENT)
-			break;
-		err = 0;
-		/*
-		 * If we are just after the props that have been set
-		 * locally, then we are done after the first iteration.
-		 */
-		if (local)
-			break;
-		dd = dd_next;
-		propobj = 0;
-	}
-	rw_exit(&dp->dp_config_rwlock);
+void
+dsl_prop_unset_hasrecvd(objset_t *os)
+{
+	dsl_prop_set_hasrecvd_impl(os, ZPROP_SRC_NONE);
+}
+
+int
+dsl_prop_get_all(objset_t *os, nvlist_t **nvp)
+{
+	return (dsl_prop_get_all_ds(os->os_dsl_dataset, nvp, 0));
+}
 
-	return (err);
+int
+dsl_prop_get_received(objset_t *os, nvlist_t **nvp)
+{
+	/*
+	 * Received properties are not distinguishable from local properties
+	 * until the dataset has received properties on or after
+	 * SPA_VERSION_RECVD_PROPS.
+	 */
+	dsl_prop_getflags_t flags = (dsl_prop_get_hasrecvd(os) ?
+	    DSL_PROP_GET_RECEIVED : DSL_PROP_GET_LOCAL);
+	return (dsl_prop_get_all_ds(os->os_dsl_dataset, nvp, flags));
 }
 
 void
 dsl_prop_nvlist_add_uint64(nvlist_t *nv, zfs_prop_t prop, uint64_t value)
 {
 	nvlist_t *propval;
+	const char *propname = zfs_prop_to_name(prop);
+	uint64_t default_value;
+
+	if (nvlist_lookup_nvlist(nv, propname, &propval) == 0) {
+		VERIFY(nvlist_add_uint64(propval, ZPROP_VALUE, value) == 0);
+		return;
+	}
 
 	VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
 	VERIFY(nvlist_add_uint64(propval, ZPROP_VALUE, value) == 0);
-	VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(prop), propval) == 0);
+	/* Indicate the default source if we can. */
+	if (dodefault(propname, 8, 1, &default_value) == 0 &&
+	    value == default_value) {
+		VERIFY(nvlist_add_string(propval, ZPROP_SOURCE, "") == 0);
+	}
+	VERIFY(nvlist_add_nvlist(nv, propname, propval) == 0);
 	nvlist_free(propval);
 }
 
@@ -668,9 +1144,15 @@
 dsl_prop_nvlist_add_string(nvlist_t *nv, zfs_prop_t prop, const char *value)
 {
 	nvlist_t *propval;
+	const char *propname = zfs_prop_to_name(prop);
+
+	if (nvlist_lookup_nvlist(nv, propname, &propval) == 0) {
+		VERIFY(nvlist_add_string(propval, ZPROP_VALUE, value) == 0);
+		return;
+	}
 
 	VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
 	VERIFY(nvlist_add_string(propval, ZPROP_VALUE, value) == 0);
-	VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(prop), propval) == 0);
+	VERIFY(nvlist_add_nvlist(nv, propname, propval) == 0);
 	nvlist_free(propval);
 }
--- a/usr/src/uts/common/fs/zfs/spa.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/spa.c	Mon Nov 09 20:45:32 2009 -0800
@@ -58,7 +58,6 @@
 #include <sys/arc.h>
 #include <sys/callb.h>
 #include <sys/systeminfo.h>
-#include <sys/sunddi.h>
 #include <sys/spa_boot.h>
 #include <sys/zfs_ioctl.h>
 
--- a/usr/src/uts/common/fs/zfs/spa_misc.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/spa_misc.c	Mon Nov 09 20:45:32 2009 -0800
@@ -43,7 +43,6 @@
 #include <sys/dsl_prop.h>
 #include <sys/fs/zfs.h>
 #include <sys/metaslab_impl.h>
-#include <sys/sunddi.h>
 #include <sys/arc.h>
 #include <sys/ddt.h>
 #include "zfs_prop.h"
--- a/usr/src/uts/common/fs/zfs/sys/dsl_dataset.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/sys/dsl_dataset.h	Mon Nov 09 20:45:32 2009 -0800
@@ -233,10 +233,12 @@
 int dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
     uint64_t asize, uint64_t inflight, uint64_t *used,
     uint64_t *ref_rsrv);
-int dsl_dataset_set_quota(const char *dsname, uint64_t quota);
+int dsl_dataset_set_quota(const char *dsname, zprop_source_t source,
+    uint64_t quota);
 void dsl_dataset_set_quota_sync(void *arg1, void *arg2, cred_t *cr,
     dmu_tx_t *tx);
-int dsl_dataset_set_reservation(const char *dsname, uint64_t reservation);
+int dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
+    uint64_t reservation);
 
 int dsl_destroy_inconsistent(char *dsname, void *arg);
 
--- a/usr/src/uts/common/fs/zfs/sys/dsl_dir.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/sys/dsl_dir.h	Mon Nov 09 20:45:32 2009 -0800
@@ -126,8 +126,10 @@
     int64_t used, int64_t compressed, int64_t uncompressed, dmu_tx_t *tx);
 void dsl_dir_transfer_space(dsl_dir_t *dd, int64_t delta,
     dd_used_t oldtype, dd_used_t newtype, dmu_tx_t *tx);
-int dsl_dir_set_quota(const char *ddname, uint64_t quota);
-int dsl_dir_set_reservation(const char *ddname, uint64_t reservation);
+int dsl_dir_set_quota(const char *ddname, zprop_source_t source,
+    uint64_t quota);
+int dsl_dir_set_reservation(const char *ddname, zprop_source_t source,
+    uint64_t reservation);
 int dsl_dir_rename(dsl_dir_t *dd, const char *newname);
 int dsl_dir_transfer_possible(dsl_dir_t *sdd, dsl_dir_t *tdd, uint64_t space);
 int dsl_dir_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx);
--- a/usr/src/uts/common/fs/zfs/sys/dsl_prop.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/sys/dsl_prop.h	Mon Nov 09 20:45:32 2009 -0800
@@ -49,6 +49,25 @@
 	void *cbr_arg;
 } dsl_prop_cb_record_t;
 
+typedef struct dsl_props_arg {
+	nvlist_t *pa_props;
+	zprop_source_t pa_source;
+} dsl_props_arg_t;
+
+typedef struct dsl_prop_set_arg {
+	const char *psa_name;
+	zprop_source_t psa_source;
+	int psa_intsz;
+	int psa_numints;
+	const void *psa_value;
+
+	/*
+	 * Used to handle the special requirements of the quota and reservation
+	 * properties.
+	 */
+	uint64_t psa_effective_value;
+} dsl_prop_setarg_t;
+
 int dsl_prop_register(struct dsl_dataset *ds, const char *propname,
     dsl_prop_changed_cb_t *callback, void *cbarg);
 int dsl_prop_unregister(struct dsl_dataset *ds, const char *propname,
@@ -59,19 +78,37 @@
     int intsz, int numints, void *buf, char *setpoint);
 int dsl_prop_get_integer(const char *ddname, const char *propname,
     uint64_t *valuep, char *setpoint);
-int dsl_prop_get_all(objset_t *os, nvlist_t **nvp, boolean_t local);
+int dsl_prop_get_all(objset_t *os, nvlist_t **nvp);
+int dsl_prop_get_received(objset_t *os, nvlist_t **nvp);
 int dsl_prop_get_ds(struct dsl_dataset *ds, const char *propname,
     int intsz, int numints, void *buf, char *setpoint);
 int dsl_prop_get_dd(struct dsl_dir *dd, const char *propname,
-    int intsz, int numints, void *buf, char *setpoint);
+    int intsz, int numints, void *buf, char *setpoint,
+    boolean_t snapshot);
 
 dsl_syncfunc_t dsl_props_set_sync;
 int dsl_prop_set(const char *ddname, const char *propname,
-    int intsz, int numints, const void *buf);
-int dsl_props_set(const char *dsname, nvlist_t *nvl);
+    zprop_source_t source, int intsz, int numints, const void *buf);
+int dsl_props_set(const char *dsname, zprop_source_t source, nvlist_t *nvl);
 void dsl_dir_prop_set_uint64_sync(dsl_dir_t *dd, const char *name, uint64_t val,
     cred_t *cr, dmu_tx_t *tx);
 
+void dsl_prop_setarg_init_uint64(dsl_prop_setarg_t *psa, const char *propname,
+    zprop_source_t source, uint64_t *value);
+int dsl_prop_predict_sync(dsl_dir_t *dd, dsl_prop_setarg_t *psa);
+#ifdef	ZFS_DEBUG
+void dsl_prop_check_prediction(dsl_dir_t *dd, dsl_prop_setarg_t *psa);
+#define	DSL_PROP_CHECK_PREDICTION(dd, psa)	\
+	dsl_prop_check_prediction((dd), (psa))
+#else
+#define	DSL_PROP_CHECK_PREDICTION(dd, psa)	/* nothing */
+#endif
+
+/* flag first receive on or after SPA_VERSION_RECVD_PROPS */
+boolean_t dsl_prop_get_hasrecvd(objset_t *os);
+void dsl_prop_set_hasrecvd(objset_t *os);
+void dsl_prop_unset_hasrecvd(objset_t *os);
+
 void dsl_prop_nvlist_add_uint64(nvlist_t *nv, zfs_prop_t prop, uint64_t value);
 void dsl_prop_nvlist_add_string(nvlist_t *nv,
     zfs_prop_t prop, const char *value);
--- a/usr/src/uts/common/fs/zfs/sys/zap.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/sys/zap.h	Mon Nov 09 20:45:32 2009 -0800
@@ -197,6 +197,7 @@
     boolean_t *normalization_conflictp);
 int zap_lookup_uint64(objset_t *os, uint64_t zapobj, const uint64_t *key,
     int key_numints, uint64_t integer_size, uint64_t num_integers, void *buf);
+int zap_contains(objset_t *ds, uint64_t zapobj, const char *name);
 
 int zap_count_write(objset_t *os, uint64_t zapobj, const char *name,
     int add, uint64_t *towrite, uint64_t *tooverwrite);
--- a/usr/src/uts/common/fs/zfs/sys/zfs_context.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/sys/zfs_context.h	Mon Nov 09 20:45:32 2009 -0800
@@ -62,6 +62,7 @@
 #include <sys/sysevent/eventdefs.h>
 #include <sys/sysevent/dev.h>
 #include <sys/fm/util.h>
+#include <sys/sunddi.h>
 
 #define	CPU_SEQID	(CPU->cpu_seqid)
 
--- a/usr/src/uts/common/fs/zfs/zap.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/zap.c	Mon Nov 09 20:45:32 2009 -0800
@@ -706,13 +706,17 @@
 	}
 }
 
+static int
+fzap_checkname(zap_name_t *zn)
+{
+	if (zn->zn_key_orig_len > ZAP_MAXNAMELEN)
+		return (ENAMETOOLONG);
+	return (0);
+}
 
 static int
-fzap_checksize(zap_name_t *zn, uint64_t integer_size, uint64_t num_integers)
+fzap_checksize(uint64_t integer_size, uint64_t num_integers)
 {
-	if (zn->zn_key_orig_len > ZAP_MAXNAMELEN)
-		return (E2BIG);
-
 	/* Only integer sizes supported by C */
 	switch (integer_size) {
 	case 1:
@@ -730,6 +734,16 @@
 	return (0);
 }
 
+static int
+fzap_check(zap_name_t *zn, uint64_t integer_size, uint64_t num_integers)
+{
+	int err;
+
+	if ((err = fzap_checkname(zn)) != 0)
+		return (err);
+	return (fzap_checksize(integer_size, num_integers));
+}
+
 /*
  * Routines for manipulating attributes.
  */
@@ -742,8 +756,7 @@
 	int err;
 	zap_entry_handle_t zeh;
 
-	err = fzap_checksize(zn, integer_size, num_integers);
-	if (err != 0)
+	if ((err = fzap_checkname(zn)) != 0)
 		return (err);
 
 	err = zap_deref_leaf(zn->zn_zap, zn->zn_hash, NULL, RW_READER, &l);
@@ -751,6 +764,11 @@
 		return (err);
 	err = zap_leaf_lookup(l, zn, &zeh);
 	if (err == 0) {
+		if ((err = fzap_checksize(integer_size, num_integers)) != 0) {
+			zap_put_leaf(l);
+			return (err);
+		}
+
 		err = zap_entry_read(&zeh, integer_size, num_integers, buf);
 		(void) zap_entry_read_name(zn->zn_zap, &zeh, rn_len, realname);
 		if (ncp) {
@@ -775,7 +793,7 @@
 
 	ASSERT(RW_LOCK_HELD(&zap->zap_rwlock));
 	ASSERT(!zap->zap_ismicro);
-	ASSERT(fzap_checksize(zn, integer_size, num_integers) == 0);
+	ASSERT(fzap_check(zn, integer_size, num_integers) == 0);
 
 	err = zap_deref_leaf(zap, zn->zn_hash, tx, RW_WRITER, &l);
 	if (err != 0)
@@ -812,7 +830,7 @@
     uint64_t integer_size, uint64_t num_integers,
     const void *val, dmu_tx_t *tx)
 {
-	int err = fzap_checksize(zn, integer_size, num_integers);
+	int err = fzap_check(zn, integer_size, num_integers);
 	if (err != 0)
 		return (err);
 
@@ -830,7 +848,7 @@
 	zap_t *zap = zn->zn_zap;
 
 	ASSERT(RW_LOCK_HELD(&zap->zap_rwlock));
-	err = fzap_checksize(zn, integer_size, num_integers);
+	err = fzap_check(zn, integer_size, num_integers);
 	if (err != 0)
 		return (err);
 
@@ -1018,8 +1036,6 @@
 	zap_entry_handle_t zeh;
 	zap_leaf_t *l;
 
-	/* memset(za, 0xba, sizeof (zap_attribute_t)); */
-
 	/* retrieve the next entry at or after zc_hash/zc_cd */
 	/* if no entry, return ENOENT */
 
@@ -1117,7 +1133,7 @@
 	zap_entry_handle_t zeh;
 
 	if (zn->zn_key_orig_len > ZAP_MAXNAMELEN)
-		return (E2BIG);
+		return (ENAMETOOLONG);
 
 	err = zap_deref_leaf(zc->zc_zap, zn->zn_hash, NULL, RW_READER, &l);
 	if (err != 0)
--- a/usr/src/uts/common/fs/zfs/zap_micro.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/zap_micro.c	Mon Nov 09 20:45:32 2009 -0800
@@ -816,6 +816,16 @@
 }
 
 int
+zap_contains(objset_t *os, uint64_t zapobj, const char *name)
+{
+	int err = (zap_lookup_norm(os, zapobj, name, 0,
+	    0, NULL, MT_EXACT, NULL, 0, NULL));
+	if (err == EOVERFLOW || err == EINVAL)
+		err = 0; /* found, but skipped reading the value */
+	return (err);
+}
+
+int
 zap_length(objset_t *os, uint64_t zapobj, const char *name,
     uint64_t *integer_size, uint64_t *num_integers)
 {
--- a/usr/src/uts/common/fs/zfs/zfs_fuid.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/zfs_fuid.c	Mon Nov 09 20:45:32 2009 -0800
@@ -24,7 +24,6 @@
  */
 
 #include <sys/zfs_context.h>
-#include <sys/sunddi.h>
 #include <sys/dmu.h>
 #include <sys/avl.h>
 #include <sys/zap.h>
--- a/usr/src/uts/common/fs/zfs/zfs_ioctl.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/zfs_ioctl.c	Mon Nov 09 20:45:32 2009 -0800
@@ -102,13 +102,14 @@
 	ZFS_DELEG_PERM_GROUPQUOTA,
 };
 
-static char *setsl_tag = "setsl_tag";
-
 static int zfs_ioc_userspace_upgrade(zfs_cmd_t *zc);
-static void clear_props(char *dataset, nvlist_t *props, nvlist_t *newprops);
+static int zfs_check_settable(const char *name, nvpair_t *property,
+    cred_t *cr);
+static int zfs_check_clearable(char *dataset, nvlist_t *props,
+    nvlist_t **errors);
 static int zfs_fill_zplprops_root(uint64_t, nvlist_t *, nvlist_t *,
     boolean_t *);
-int zfs_set_prop_nvlist(const char *, nvlist_t *);
+int zfs_set_prop_nvlist(const char *, zprop_source_t, nvlist_t *, nvlist_t **);
 
 /* _NOTE(PRINTFLIKE(4)) - this is printf-like, but lint is too whiney */
 void
@@ -334,14 +335,9 @@
  * Policy for setting the security label property.
  *
  * Returns 0 for success, non-zero for access and other errors.
- *
- * If the objset is non-NULL upon return, the caller is responsible
- * for dis-owning it, using the tag: setsl_tag.
- *
  */
 static int
-zfs_set_slabel_policy(const char *name, char *strval, cred_t *cr,
-    objset_t **osp)
+zfs_set_slabel_policy(const char *name, char *strval, cred_t *cr)
 {
 	char		ds_hexsl[MAXNAMELEN];
 	bslabel_t	ds_sl, new_sl;
@@ -394,16 +390,20 @@
 	 * mounted (or isn't a dataset, doesn't exist, ...).
 	 */
 	if (strcasecmp(ds_hexsl, ZFS_MLSLABEL_DEFAULT) != 0) {
-		ASSERT(osp != NULL);
+		objset_t *os;
+		static char *setsl_tag = "setsl_tag";
+
 		/*
 		 * Try to own the dataset; abort if there is any error,
 		 * (e.g., already mounted, in use, or other error).
 		 */
 		error = dmu_objset_own(name, DMU_OST_ZFS, B_TRUE,
-		    setsl_tag, osp);
+		    setsl_tag, &os);
 		if (error)
 			return (EPERM);
 
+		dmu_objset_disown(os, setsl_tag);
+
 		if (new_default) {
 			needed_priv = PRIV_FILE_DOWNGRADE_SL;
 			goto out_check;
@@ -429,8 +429,11 @@
 }
 
 static int
-zfs_secpolicy_setprop(const char *name, zfs_prop_t prop, cred_t *cr)
+zfs_secpolicy_setprop(const char *dsname, zfs_prop_t prop, nvpair_t *propval,
+    cred_t *cr)
 {
+	char *strval;
+
 	/*
 	 * Check permissions for special properties.
 	 */
@@ -452,10 +455,10 @@
 			 * quota on things *under* (ie. contained by)
 			 * the thing they own.
 			 */
-			if (dsl_prop_get_integer(name, "zoned", &zoned,
+			if (dsl_prop_get_integer(dsname, "zoned", &zoned,
 			    setpoint))
 				return (EPERM);
-			if (!zoned || strlen(name) <= strlen(setpoint))
+			if (!zoned || strlen(dsname) <= strlen(setpoint))
 				return (EPERM);
 		}
 		break;
@@ -463,10 +466,18 @@
 	case ZFS_PROP_MLSLABEL:
 		if (!is_system_labeled())
 			return (EPERM);
+
+		if (nvpair_value_string(propval, &strval) == 0) {
+			int err;
+
+			err = zfs_set_slabel_policy(dsname, strval, CRED());
+			if (err != 0)
+				return (err);
+		}
 		break;
 	}
 
-	return (zfs_secpolicy_write_perms(name, zfs_prop_to_name(prop), cr));
+	return (zfs_secpolicy_write_perms(dsname, zfs_prop_to_name(prop), cr));
 }
 
 int
@@ -602,7 +613,7 @@
 int
 zfs_secpolicy_rename_perms(const char *from, const char *to, cred_t *cr)
 {
-	char 	parentname[MAXNAMELEN];
+	char	parentname[MAXNAMELEN];
 	int	error;
 
 	if ((error = zfs_secpolicy_write_perms(from,
@@ -637,7 +648,7 @@
 static int
 zfs_secpolicy_promote(zfs_cmd_t *zc, cred_t *cr)
 {
-	char 	parentname[MAXNAMELEN];
+	char	parentname[MAXNAMELEN];
 	objset_t *clone;
 	int error;
 
@@ -709,8 +720,8 @@
 static int
 zfs_secpolicy_create(zfs_cmd_t *zc, cred_t *cr)
 {
-	char 	parentname[MAXNAMELEN];
-	int 	error;
+	char	parentname[MAXNAMELEN];
+	int	error;
 
 	if ((error = zfs_get_parent(zc->zc_name, parentname,
 	    sizeof (parentname))) != 0)
@@ -779,9 +790,8 @@
 		return (zfs_secpolicy_write_perms(zc->zc_name,
 		    ZFS_DELEG_PERM_USERPROP, cr));
 	} else {
-		if (!zfs_prop_inheritable(prop))
-			return (EINVAL);
-		return (zfs_secpolicy_setprop(zc->zc_name, prop, cr));
+		return (zfs_secpolicy_setprop(zc->zc_name, prop,
+		    NULL, cr));
 	}
 }
 
@@ -831,7 +841,8 @@
 static int
 zfs_secpolicy_userspace_upgrade(zfs_cmd_t *zc, cred_t *cr)
 {
-	return (zfs_secpolicy_setprop(zc->zc_name, ZFS_PROP_VERSION, cr));
+	return (zfs_secpolicy_setprop(zc->zc_name, ZFS_PROP_VERSION,
+	    NULL, cr));
 }
 
 static int
@@ -884,6 +895,41 @@
 }
 
 static int
+fit_error_list(zfs_cmd_t *zc, nvlist_t **errors)
+{
+	size_t size;
+
+	VERIFY(nvlist_size(*errors, &size, NV_ENCODE_NATIVE) == 0);
+
+	if (size > zc->zc_nvlist_dst_size) {
+		nvpair_t *more_errors;
+		int n = 0;
+
+		if (zc->zc_nvlist_dst_size < 1024)
+			return (ENOMEM);
+
+		VERIFY(nvlist_add_int32(*errors, ZPROP_N_MORE_ERRORS, 0) == 0);
+		more_errors = nvlist_prev_nvpair(*errors, NULL);
+
+		do {
+			nvpair_t *pair = nvlist_prev_nvpair(*errors,
+			    more_errors);
+			VERIFY(nvlist_remove_nvpair(*errors, pair) == 0);
+			n++;
+			VERIFY(nvlist_size(*errors, &size,
+			    NV_ENCODE_NATIVE) == 0);
+		} while (size > zc->zc_nvlist_dst_size);
+
+		VERIFY(nvlist_remove_nvpair(*errors, more_errors) == 0);
+		VERIFY(nvlist_add_int32(*errors, ZPROP_N_MORE_ERRORS, n) == 0);
+		ASSERT(nvlist_size(*errors, &size, NV_ENCODE_NATIVE) == 0);
+		ASSERT(size <= zc->zc_nvlist_dst_size);
+	}
+
+	return (0);
+}
+
+static int
 put_nvlist(zfs_cmd_t *zc, nvlist_t *nvl)
 {
 	char *packed = NULL;
@@ -1026,8 +1072,8 @@
 	/*
 	 * Set the remaining root properties
 	 */
-	if (!error &&
-	    (error = zfs_set_prop_nvlist(zc->zc_name, rootprops)) != 0)
+	if (!error && (error = zfs_set_prop_nvlist(zc->zc_name,
+	    ZPROP_SRC_LOCAL, rootprops, NULL)) != 0)
 		(void) spa_destroy(zc->zc_name);
 
 	if (buf != NULL)
@@ -1487,7 +1533,7 @@
 	dmu_objset_fast_stat(os, &zc->zc_objset_stats);
 
 	if (zc->zc_nvlist_dst != 0 &&
-	    (error = dsl_prop_get_all(os, &nv, FALSE)) == 0) {
+	    (error = dsl_prop_get_all(os, &nv)) == 0) {
 		dmu_objset_stats(os, nv);
 		/*
 		 * NB: zvol_get_stats() will read the objset contents,
@@ -1508,6 +1554,49 @@
 	return (error);
 }
 
+/*
+ * inputs:
+ * zc_name		name of filesystem
+ * zc_nvlist_dst_size	size of buffer for property nvlist
+ *
+ * outputs:
+ * zc_nvlist_dst	received property nvlist
+ * zc_nvlist_dst_size	size of received property nvlist
+ *
+ * Gets received properties (distinct from local properties on or after
+ * SPA_VERSION_RECVD_PROPS) for callers who want to differentiate received from
+ * local property values.
+ */
+static int
+zfs_ioc_objset_recvd_props(zfs_cmd_t *zc)
+{
+	objset_t *os = NULL;
+	int error;
+	nvlist_t *nv;
+
+	if (error = dmu_objset_hold(zc->zc_name, FTAG, &os))
+		return (error);
+
+	/*
+	 * Without this check, we would return local property values if the
+	 * caller has not already received properties on or after
+	 * SPA_VERSION_RECVD_PROPS.
+	 */
+	if (!dsl_prop_get_hasrecvd(os)) {
+		dmu_objset_rele(os, FTAG);
+		return (ENOTSUP);
+	}
+
+	if (zc->zc_nvlist_dst != 0 &&
+	    (error = dsl_prop_get_received(os, &nv)) == 0) {
+		error = put_nvlist(zc, nv);
+		nvlist_free(nv);
+	}
+
+	dmu_objset_rele(os, FTAG);
+	return (error);
+}
+
 static int
 nvl_add_zplprop(objset_t *os, nvlist_t *props, zfs_prop_t prop)
 {
@@ -1698,293 +1787,289 @@
 	return (error);
 }
 
-int
-zfs_set_prop_nvlist(const char *name, nvlist_t *nvl)
+static int
+zfs_prop_set_userquota(const char *dsname, nvpair_t *pair)
+{
+	const char *propname = nvpair_name(pair);
+	uint64_t *valary;
+	unsigned int vallen;
+	const char *domain;
+	zfs_userquota_prop_t type;
+	uint64_t rid;
+	uint64_t quota;
+	zfsvfs_t *zfsvfs;
+	int err;
+
+	if (nvpair_type(pair) == DATA_TYPE_NVLIST) {
+		nvlist_t *attrs;
+		VERIFY(nvpair_value_nvlist(pair, &attrs) == 0);
+		VERIFY(nvlist_lookup_nvpair(attrs, ZPROP_VALUE,
+		    &pair) == 0);
+	}
+
+	VERIFY(nvpair_value_uint64_array(pair, &valary, &vallen) == 0);
+	VERIFY(vallen == 3);
+	type = valary[0];
+	rid = valary[1];
+	quota = valary[2];
+	/*
+	 * The propname is encoded as
+	 * userquota@<rid>-<domain>.
+	 */
+	domain = strchr(propname, '-') + 1;
+
+	err = zfsvfs_hold(dsname, FTAG, &zfsvfs);
+	if (err == 0) {
+		err = zfs_set_userquota(zfsvfs, type, domain, rid, quota);
+		zfsvfs_rele(zfsvfs, FTAG);
+	}
+
+	return (err);
+}
+
+/*
+ * If the named property is one that has a special function to set its value,
+ * return 0 on success and a positive error code on failure; otherwise if it is
+ * not one of the special properties handled by this function, return -1.
+ *
+ * XXX: It would be better for callers of the properety interface if we handled
+ * these special cases in dsl_prop.c (in the dsl layer).
+ */
+static int
+zfs_prop_set_special(const char *dsname, zprop_source_t source,
+    nvpair_t *pair)
 {
-	nvpair_t *elem;
-	int error = 0;
+	const char *propname = nvpair_name(pair);
+	zfs_prop_t prop = zfs_name_to_prop(propname);
+	uint64_t intval;
+	int err;
+
+	if (prop == ZPROP_INVAL) {
+		if (zfs_prop_userquota(propname))
+			return (zfs_prop_set_userquota(dsname, pair));
+		return (-1);
+	}
+
+	if (nvpair_type(pair) == DATA_TYPE_NVLIST) {
+		nvlist_t *attrs;
+		VERIFY(nvpair_value_nvlist(pair, &attrs) == 0);
+		VERIFY(nvlist_lookup_nvpair(attrs, ZPROP_VALUE,
+		    &pair) == 0);
+	}
+
+	if (zfs_prop_get_type(prop) == PROP_TYPE_STRING)
+		return (-1);
+
+	VERIFY(0 == nvpair_value_uint64(pair, &intval));
+
+	switch (prop) {
+	case ZFS_PROP_QUOTA:
+		err = dsl_dir_set_quota(dsname, source, intval);
+		break;
+	case ZFS_PROP_REFQUOTA:
+		err = dsl_dataset_set_quota(dsname, source, intval);
+		break;
+	case ZFS_PROP_RESERVATION:
+		err = dsl_dir_set_reservation(dsname, source, intval);
+		break;
+	case ZFS_PROP_REFRESERVATION:
+		err = dsl_dataset_set_reservation(dsname, source, intval);
+		break;
+	case ZFS_PROP_VOLSIZE:
+		err = zvol_set_volsize(dsname, ddi_driver_major(zfs_dip),
+		    intval);
+		break;
+	case ZFS_PROP_VERSION:
+	{
+		zfsvfs_t *zfsvfs;
+
+		if ((err = zfsvfs_hold(dsname, FTAG, &zfsvfs)) != 0)
+			break;
+
+		err = zfs_set_version(zfsvfs, intval);
+		zfsvfs_rele(zfsvfs, FTAG);
+
+		if (err == 0 && intval >= ZPL_VERSION_USERSPACE) {
+			zfs_cmd_t zc = { 0 };
+			(void) strcpy(zc.zc_name, dsname);
+			(void) zfs_ioc_userspace_upgrade(&zc);
+		}
+		break;
+	}
+
+	default:
+		err = -1;
+	}
+
+	return (err);
+}
+
+/*
+ * This function is best effort. If it fails to set any of the given properties,
+ * it continues to set as many as it can and returns the first error
+ * encountered. If the caller provides a non-NULL errlist, it also gives the
+ * complete list of names of all the properties it failed to set along with the
+ * corresponding error numbers. The caller is responsible for freeing the
+ * returned errlist.
+ *
+ * If every property is set successfully, zero is returned and the list pointed
+ * at by errlist is NULL.
+ */
+int
+zfs_set_prop_nvlist(const char *dsname, zprop_source_t source, nvlist_t *nvl,
+    nvlist_t **errlist)
+{
+	nvpair_t *pair;
+	nvpair_t *propval;
+	int err, rv = 0;
 	uint64_t intval;
 	char *strval;
 	nvlist_t *genericnvl;
-	boolean_t issnap = (strchr(name, '@') != NULL);
-
-	/*
-	 * First validate permission to set all of the properties
-	 */
-	elem = NULL;
-	while ((elem = nvlist_next_nvpair(nvl, elem)) != NULL) {
-		const char *propname = nvpair_name(elem);
-		zfs_prop_t prop = zfs_name_to_prop(propname);
-
-		if (prop == ZPROP_INVAL) {
-			/*
-			 * If this is a user-defined property, it must be a
-			 * string, and there is no further validation to do.
-			 */
-			if (zfs_prop_user(propname) &&
-			    nvpair_type(elem) == DATA_TYPE_STRING) {
-				if (error = zfs_secpolicy_write_perms(name,
-				    ZFS_DELEG_PERM_USERPROP, CRED()))
-					return (error);
-				continue;
-			}
-
-			if (!issnap && zfs_prop_userquota(propname) &&
-			    nvpair_type(elem) == DATA_TYPE_UINT64_ARRAY) {
-				const char *perm;
-				const char *up = zfs_userquota_prop_prefixes
-				    [ZFS_PROP_USERQUOTA];
-				if (strncmp(propname, up, strlen(up)) == 0)
-					perm = ZFS_DELEG_PERM_USERQUOTA;
-				else
-					perm = ZFS_DELEG_PERM_GROUPQUOTA;
-				if (error = zfs_secpolicy_write_perms(name,
-				    perm, CRED()))
-					return (error);
-				continue;
-			}
-
-			return (EINVAL);
-		}
-
-		if (issnap)
-			return (EINVAL);
-
-		if ((error = zfs_secpolicy_setprop(name, prop, CRED())) != 0)
-			return (error);
-
-		/*
-		 * Check that this value is valid for this pool version
-		 */
-		switch (prop) {
-		case ZFS_PROP_COMPRESSION:
-			/*
-			 * If the user specified gzip compression, make sure
-			 * the SPA supports it. We ignore any errors here since
-			 * we'll catch them later.
-			 */
-			if (nvpair_type(elem) == DATA_TYPE_UINT64 &&
-			    nvpair_value_uint64(elem, &intval) == 0) {
-				if (intval >= ZIO_COMPRESS_GZIP_1 &&
-				    intval <= ZIO_COMPRESS_GZIP_9 &&
-				    zfs_earlier_version(name,
-				    SPA_VERSION_GZIP_COMPRESSION))
-					return (ENOTSUP);
-
-				if (intval == ZIO_COMPRESS_ZLE &&
-				    zfs_earlier_version(name,
-				    SPA_VERSION_ZLE_COMPRESSION))
-					return (ENOTSUP);
-
-				/*
-				 * If this is a bootable dataset then
-				 * verify that the compression algorithm
-				 * is supported for booting. We must return
-				 * something other than ENOTSUP since it
-				 * implies a downrev pool version.
-				 */
-				if (zfs_is_bootfs(name) &&
-				    !BOOTFS_COMPRESS_VALID(intval))
-					return (ERANGE);
-			}
-			break;
-
-		case ZFS_PROP_COPIES:
-			if (zfs_earlier_version(name, SPA_VERSION_DITTO_BLOCKS))
-				return (ENOTSUP);
-			break;
-
-		case ZFS_PROP_DEDUP:
-			if (zfs_earlier_version(name, SPA_VERSION_DEDUP))
-				return (ENOTSUP);
-			break;
-
-		case ZFS_PROP_SHARESMB:
-			if (zpl_earlier_version(name, ZPL_VERSION_FUID))
-				return (ENOTSUP);
-			break;
-
-		case ZFS_PROP_ACLINHERIT:
-			if (nvpair_type(elem) == DATA_TYPE_UINT64 &&
-			    nvpair_value_uint64(elem, &intval) == 0)
-				if (intval == ZFS_ACL_PASSTHROUGH_X &&
-				    zfs_earlier_version(name,
-				    SPA_VERSION_PASSTHROUGH_X))
-					return (ENOTSUP);
-		}
-	}
+	nvlist_t *errors;
+	nvlist_t *retrynvl;
 
 	VERIFY(nvlist_alloc(&genericnvl, NV_UNIQUE_NAME, KM_SLEEP) == 0);
-	elem = NULL;
-	while ((elem = nvlist_next_nvpair(nvl, elem)) != NULL) {
-		const char *propname = nvpair_name(elem);
+	VERIFY(nvlist_alloc(&errors, NV_UNIQUE_NAME, KM_SLEEP) == 0);
+	VERIFY(nvlist_alloc(&retrynvl, NV_UNIQUE_NAME, KM_SLEEP) == 0);
+
+retry:
+	pair = NULL;
+	while ((pair = nvlist_next_nvpair(nvl, pair)) != NULL) {
+		const char *propname = nvpair_name(pair);
 		zfs_prop_t prop = zfs_name_to_prop(propname);
 
-		if (prop == ZPROP_INVAL) {
-			if (zfs_prop_userquota(propname)) {
-				uint64_t *valary;
-				unsigned int vallen;
-				const char *domain;
-				zfs_userquota_prop_t type;
-				uint64_t rid;
-				uint64_t quota;
-				zfsvfs_t *zfsvfs;
-
-				VERIFY(nvpair_value_uint64_array(elem,
-				    &valary, &vallen) == 0);
-				VERIFY(vallen == 3);
-				type = valary[0];
-				rid = valary[1];
-				quota = valary[2];
-				/*
-				 * The propname is encoded as
-				 * userquota@<rid>-<domain>.
-				 */
-				domain = strchr(propname, '-') + 1;
-
-				error = zfsvfs_hold(name, FTAG, &zfsvfs);
-				if (error == 0) {
-					error = zfs_set_userquota(zfsvfs,
-					    type, domain, rid, quota);
-					zfsvfs_rele(zfsvfs, FTAG);
-				}
-				if (error == 0)
-					continue;
-				else
-					goto out;
-			} else if (zfs_prop_user(propname)) {
-				VERIFY(nvpair_value_string(elem, &strval) == 0);
-				error = dsl_prop_set(name, propname, 1,
-				    strlen(strval) + 1, strval);
-				if (error == 0)
-					continue;
-				else
-					goto out;
-			}
+		/* decode the property value */
+		propval = pair;
+		if (nvpair_type(pair) == DATA_TYPE_NVLIST) {
+			nvlist_t *attrs;
+			VERIFY(nvpair_value_nvlist(pair, &attrs) == 0);
+			VERIFY(nvlist_lookup_nvpair(attrs, ZPROP_VALUE,
+			    &propval) == 0);
 		}
 
-		switch (prop) {
-		case ZFS_PROP_QUOTA:
-			if ((error = nvpair_value_uint64(elem, &intval)) != 0 ||
-			    (error = dsl_dir_set_quota(name, intval)) != 0)
-				goto out;
-			break;
-
-		case ZFS_PROP_REFQUOTA:
-			if ((error = nvpair_value_uint64(elem, &intval)) != 0 ||
-			    (error = dsl_dataset_set_quota(name, intval)) != 0)
-				goto out;
-			break;
-
-		case ZFS_PROP_RESERVATION:
-			if ((error = nvpair_value_uint64(elem, &intval)) != 0 ||
-			    (error = dsl_dir_set_reservation(name,
-			    intval)) != 0)
-				goto out;
-			break;
-
-		case ZFS_PROP_REFRESERVATION:
-			if ((error = nvpair_value_uint64(elem, &intval)) != 0 ||
-			    (error = dsl_dataset_set_reservation(name,
-			    intval)) != 0)
-				goto out;
-			break;
-
-		case ZFS_PROP_VOLSIZE:
-			if ((error = nvpair_value_uint64(elem, &intval)) != 0 ||
-			    (error = zvol_set_volsize(name,
-			    ddi_driver_major(zfs_dip), intval)) != 0)
-				goto out;
-			break;
-
-		case ZFS_PROP_VERSION:
-		{
-			zfsvfs_t *zfsvfs;
-
-			if ((error = nvpair_value_uint64(elem, &intval)) != 0)
-				goto out;
-			if ((error = zfsvfs_hold(name, FTAG, &zfsvfs)) != 0)
-				goto out;
-			error = zfs_set_version(zfsvfs, intval);
-			zfsvfs_rele(zfsvfs, FTAG);
-
-			if (error == 0 && intval >= ZPL_VERSION_USERSPACE) {
-				zfs_cmd_t zc = { 0 };
-				(void) strcpy(zc.zc_name, name);
-				(void) zfs_ioc_userspace_upgrade(&zc);
+		/* Validate value type */
+		if (prop == ZPROP_INVAL) {
+			if (zfs_prop_user(propname)) {
+				if (nvpair_type(propval) != DATA_TYPE_STRING)
+					err = EINVAL;
+			} else if (zfs_prop_userquota(propname)) {
+				if (nvpair_type(propval) !=
+				    DATA_TYPE_UINT64_ARRAY)
+					err = EINVAL;
 			}
-			if (error)
-				goto out;
-			break;
-		}
-
-		case ZFS_PROP_MLSLABEL:
-		{
-			objset_t *os = NULL;
-
-			if ((error = nvpair_value_string(elem, &strval)) != 0)
-				goto out;
-			if ((error = zfs_set_slabel_policy(name, strval,
-			    CRED(), &os)) != 0) {
-				/* error; first release the dataset if needed */
-				if (os)
-					dmu_objset_disown(os, setsl_tag);
-				goto out;
-			}
-
-			error = nvlist_add_nvpair(genericnvl, elem);
-			if (os)
-				dmu_objset_disown(os, setsl_tag);
-			if (error != 0)
-				goto out;
-			break;
-		}
-
-		default:
-			if (nvpair_type(elem) == DATA_TYPE_STRING) {
-				if (zfs_prop_get_type(prop) !=
-				    PROP_TYPE_STRING) {
-					error = EINVAL;
-					goto out;
-				}
-			} else if (nvpair_type(elem) == DATA_TYPE_UINT64) {
+		} else {
+			if (nvpair_type(propval) == DATA_TYPE_STRING) {
+				if (zfs_prop_get_type(prop) != PROP_TYPE_STRING)
+					err = EINVAL;
+			} else if (nvpair_type(propval) == DATA_TYPE_UINT64) {
 				const char *unused;
 
-				VERIFY(nvpair_value_uint64(elem, &intval) == 0);
+				VERIFY(nvpair_value_uint64(propval,
+				    &intval) == 0);
 
 				switch (zfs_prop_get_type(prop)) {
 				case PROP_TYPE_NUMBER:
 					break;
 				case PROP_TYPE_STRING:
-					error = EINVAL;
-					goto out;
+					err = EINVAL;
+					break;
 				case PROP_TYPE_INDEX:
 					if (zfs_prop_index_to_string(prop,
-					    intval, &unused) != 0) {
-						error = EINVAL;
-						goto out;
-					}
+					    intval, &unused) != 0)
+						err = EINVAL;
 					break;
 				default:
 					cmn_err(CE_PANIC,
 					    "unknown property type");
-					break;
 				}
 			} else {
-				error = EINVAL;
-				goto out;
+				err = EINVAL;
+			}
+		}
+
+		/* Validate permissions */
+		if (err == 0)
+			err = zfs_check_settable(dsname, pair, CRED());
+
+		if (err == 0) {
+			err = zfs_prop_set_special(dsname, source, pair);
+			if (err == -1) {
+				/*
+				 * For better performance we build up a list of
+				 * properties to set in a single transaction.
+				 */
+				err = nvlist_add_nvpair(genericnvl, pair);
+			} else if (err != 0 && nvl != retrynvl) {
+				/*
+				 * This may be a spurious error caused by
+				 * receiving quota and reservation out of order.
+				 * Try again in a second pass.
+				 */
+				err = nvlist_add_nvpair(retrynvl, pair);
 			}
-			if ((error = nvlist_add_nvpair(genericnvl, elem)) != 0)
-				goto out;
+		}
+
+		if (err != 0)
+			VERIFY(nvlist_add_int32(errors, propname, err) == 0);
+	}
+
+	if (nvl != retrynvl && !nvlist_empty(retrynvl)) {
+		nvl = retrynvl;
+		goto retry;
+	}
+
+	if (!nvlist_empty(genericnvl) &&
+	    dsl_props_set(dsname, source, genericnvl) != 0) {
+		/*
+		 * If this fails, we still want to set as many properties as we
+		 * can, so try setting them individually.
+		 */
+		pair = NULL;
+		while ((pair = nvlist_next_nvpair(genericnvl, pair)) != NULL) {
+			const char *propname = nvpair_name(pair);
+
+			propval = pair;
+			if (nvpair_type(pair) == DATA_TYPE_NVLIST) {
+				nvlist_t *attrs;
+				VERIFY(nvpair_value_nvlist(pair, &attrs) == 0);
+				VERIFY(nvlist_lookup_nvpair(attrs, ZPROP_VALUE,
+				    &propval) == 0);
+			}
+
+			if (nvpair_type(propval) == DATA_TYPE_STRING) {
+				VERIFY(nvpair_value_string(propval,
+				    &strval) == 0);
+				err = dsl_prop_set(dsname, propname, source, 1,
+				    strlen(strval) + 1, strval);
+			} else {
+				VERIFY(nvpair_value_uint64(propval,
+				    &intval) == 0);
+				err = dsl_prop_set(dsname, propname, source, 8,
+				    1, &intval);
+			}
+
+			if (err != 0) {
+				VERIFY(nvlist_add_int32(errors, propname,
+				    err) == 0);
+			}
 		}
 	}
-
-	if (nvlist_next_nvpair(genericnvl, NULL) != NULL) {
-		error = dsl_props_set(name, genericnvl);
+	nvlist_free(genericnvl);
+	nvlist_free(retrynvl);
+
+	if ((pair = nvlist_next_nvpair(errors, NULL)) == NULL) {
+		nvlist_free(errors);
+		errors = NULL;
+	} else {
+		VERIFY(nvpair_value_int32(pair, &rv) == 0);
 	}
-out:
-	nvlist_free(genericnvl);
-	return (error);
+
+	if (errlist == NULL)
+		nvlist_free(errors);
+	else
+		*errlist = errors;
+
+	return (rv);
 }
 
 /*
@@ -1993,15 +2078,15 @@
 static int
 zfs_check_userprops(char *fsname, nvlist_t *nvl)
 {
-	nvpair_t *elem = NULL;
+	nvpair_t *pair = NULL;
 	int error = 0;
 
-	while ((elem = nvlist_next_nvpair(nvl, elem)) != NULL) {
-		const char *propname = nvpair_name(elem);
+	while ((pair = nvlist_next_nvpair(nvl, pair)) != NULL) {
+		const char *propname = nvpair_name(pair);
 		char *valstr;
 
 		if (!zfs_prop_user(propname) ||
-		    nvpair_type(elem) != DATA_TYPE_STRING)
+		    nvpair_type(pair) != DATA_TYPE_STRING)
 			return (EINVAL);
 
 		if (error = zfs_secpolicy_write_perms(fsname,
@@ -2011,48 +2096,96 @@
 		if (strlen(propname) >= ZAP_MAXNAMELEN)
 			return (ENAMETOOLONG);
 
-		VERIFY(nvpair_value_string(elem, &valstr) == 0);
+		VERIFY(nvpair_value_string(pair, &valstr) == 0);
 		if (strlen(valstr) >= ZAP_MAXVALUELEN)
 			return (E2BIG);
 	}
 	return (0);
 }
 
+static void
+props_skip(nvlist_t *props, nvlist_t *skipped, nvlist_t **newprops)
+{
+	nvpair_t *pair;
+
+	VERIFY(nvlist_alloc(newprops, NV_UNIQUE_NAME, KM_SLEEP) == 0);
+
+	pair = NULL;
+	while ((pair = nvlist_next_nvpair(props, pair)) != NULL) {
+		if (nvlist_exists(skipped, nvpair_name(pair)))
+			continue;
+
+		VERIFY(nvlist_add_nvpair(*newprops, pair) == 0);
+	}
+}
+
+static int
+clear_received_props(objset_t *os, const char *fs, nvlist_t *props,
+    nvlist_t *skipped)
+{
+	int err = 0;
+	nvlist_t *cleared_props = NULL;
+	props_skip(props, skipped, &cleared_props);
+	if (!nvlist_empty(cleared_props)) {
+		/*
+		 * Acts on local properties until the dataset has received
+		 * properties at least once on or after SPA_VERSION_RECVD_PROPS.
+		 */
+		zprop_source_t flags = (ZPROP_SRC_NONE |
+		    (dsl_prop_get_hasrecvd(os) ? ZPROP_SRC_RECEIVED : 0));
+		err = zfs_set_prop_nvlist(fs, flags, cleared_props, NULL);
+	}
+	nvlist_free(cleared_props);
+	return (err);
+}
+
 /*
  * inputs:
  * zc_name		name of filesystem
  * zc_value		name of property to set
  * zc_nvlist_src{_size}	nvlist of properties to apply
- * zc_cookie		clear existing local props?
+ * zc_cookie		received properties flag
  *
- * outputs:		none
+ * outputs:
+ * zc_nvlist_dst{_size} error for each unapplied received property
  */
 static int
 zfs_ioc_set_prop(zfs_cmd_t *zc)
 {
 	nvlist_t *nvl;
+	boolean_t received = zc->zc_cookie;
+	zprop_source_t source = (received ? ZPROP_SRC_RECEIVED :
+	    ZPROP_SRC_LOCAL);
+	nvlist_t *errors = NULL;
 	int error;
 
 	if ((error = get_nvlist(zc->zc_nvlist_src, zc->zc_nvlist_src_size,
 	    zc->zc_iflags, &nvl)) != 0)
 		return (error);
 
-	if (zc->zc_cookie) {
+	if (received) {
 		nvlist_t *origprops;
 		objset_t *os;
 
 		if (dmu_objset_hold(zc->zc_name, FTAG, &os) == 0) {
-			if (dsl_prop_get_all(os, &origprops, TRUE) == 0) {
-				clear_props(zc->zc_name, origprops, nvl);
+			if (dsl_prop_get_received(os, &origprops) == 0) {
+				(void) clear_received_props(os,
+				    zc->zc_name, origprops, nvl);
 				nvlist_free(origprops);
 			}
+
+			dsl_prop_set_hasrecvd(os);
 			dmu_objset_rele(os, FTAG);
 		}
-
 	}
 
-	error = zfs_set_prop_nvlist(zc->zc_name, nvl);
-
+	error = zfs_set_prop_nvlist(zc->zc_name, source, nvl, &errors);
+
+	if (zc->zc_nvlist_dst != NULL && errors != NULL) {
+		(void) put_nvlist(zc, errors);
+	}
+
+	nvlist_free(errors);
 	nvlist_free(nvl);
 	return (error);
 }
@@ -2061,14 +2194,72 @@
  * inputs:
  * zc_name		name of filesystem
  * zc_value		name of property to inherit
+ * zc_cookie		revert to received value if TRUE
  *
  * outputs:		none
  */
 static int
 zfs_ioc_inherit_prop(zfs_cmd_t *zc)
 {
+	const char *propname = zc->zc_value;
+	zfs_prop_t prop = zfs_name_to_prop(propname);
+	boolean_t received = zc->zc_cookie;
+	zprop_source_t source = (received
+	    ? ZPROP_SRC_NONE		/* revert to received value, if any */
+	    : ZPROP_SRC_INHERITED);	/* explicitly inherit */
+
+	if (received) {
+		nvlist_t *dummy;
+		nvpair_t *pair;
+		zprop_type_t type;
+		int err;
+
+		/*
+		 * zfs_prop_set_special() expects properties in the form of an
+		 * nvpair with type info.
+		 */
+		if (prop == ZPROP_INVAL) {
+			if (!zfs_prop_user(propname))
+				return (EINVAL);
+
+			type = PROP_TYPE_STRING;
+		} else {
+			type = zfs_prop_get_type(prop);
+		}
+
+		VERIFY(nvlist_alloc(&dummy, NV_UNIQUE_NAME, KM_SLEEP) == 0);
+
+		switch (type) {
+		case PROP_TYPE_STRING:
+			VERIFY(0 == nvlist_add_string(dummy, propname, ""));
+			break;
+		case PROP_TYPE_NUMBER:
+		case PROP_TYPE_INDEX:
+			VERIFY(0 == nvlist_add_uint64(dummy, propname, 0));
+			break;
+		default:
+			nvlist_free(dummy);
+			return (EINVAL);
+		}
+
+		pair = nvlist_next_nvpair(dummy, NULL);
+		err = zfs_prop_set_special(zc->zc_name, source, pair);
+		nvlist_free(dummy);
+		if (err != -1)
+			return (err); /* special property already handled */
+	} else {
+		/*
+		 * Only check this in the non-received case. We want to allow
+		 * 'inherit -S' to revert non-inheritable properties like quota
+		 * and reservation to the received or default values even though
+		 * they are not considered inheritable.
+		 */
+		if (prop != ZPROP_INVAL && !zfs_prop_inheritable(prop))
+			return (EINVAL);
+	}
+
 	/* the property name has been validated by zfs_secpolicy_inherit() */
-	return (dsl_prop_set(zc->zc_name, zc->zc_value, 0, 0, NULL));
+	return (dsl_prop_set(zc->zc_name, zc->zc_value, source, 0, 0, NULL));
 }
 
 static int
@@ -2077,20 +2268,20 @@
 	nvlist_t *props;
 	spa_t *spa;
 	int error;
-	nvpair_t *elem;
-
-	if ((error = get_nvlist(zc->zc_nvlist_src, zc->zc_nvlist_src_size,
-	    zc->zc_iflags, &props)))
+	nvpair_t *pair;
+
+	if (error = get_nvlist(zc->zc_nvlist_src, zc->zc_nvlist_src_size,
+	    zc->zc_iflags, &props))
 		return (error);
 
 	/*
 	 * If the only property is the configfile, then just do a spa_lookup()
 	 * to handle the faulted case.
 	 */
-	elem = nvlist_next_nvpair(props, NULL);
-	if (elem != NULL && strcmp(nvpair_name(elem),
+	pair = nvlist_next_nvpair(props, NULL);
+	if (pair != NULL && strcmp(nvpair_name(pair),
 	    zpool_prop_to_name(ZPOOL_PROP_CACHEFILE)) == 0 &&
-	    nvlist_next_nvpair(props, elem) == NULL) {
+	    nvlist_next_nvpair(props, pair) == NULL) {
 		mutex_enter(&spa_namespace_lock);
 		if ((spa = spa_lookup(zc->zc_name)) != NULL) {
 			spa_configfile_set(spa, props, B_FALSE);
@@ -2582,7 +2773,9 @@
 	 * It would be nice to do this atomically.
 	 */
 	if (error == 0) {
-		if ((error = zfs_set_prop_nvlist(zc->zc_name, nvprops)) != 0)
+		error = zfs_set_prop_nvlist(zc->zc_name, ZPROP_SRC_LOCAL,
+		    nvprops, NULL);
+		if (error != 0)
 			(void) dmu_objset_destroy(zc->zc_name, B_FALSE);
 	}
 	nvlist_free(nvprops);
@@ -2618,7 +2811,7 @@
 	if (error)
 		goto out;
 
-	if (nvprops != NULL && nvlist_next_nvpair(nvprops, NULL) != NULL &&
+	if (!nvlist_empty(nvprops) &&
 	    zfs_earlier_version(zc->zc_name, SPA_VERSION_SNAP_PROPS)) {
 		error = ENOTSUP;
 		goto out;
@@ -2834,28 +3027,262 @@
 	return (dmu_objset_rename(zc->zc_name, zc->zc_value, recursive));
 }
 
-static void
-clear_props(char *dataset, nvlist_t *props, nvlist_t *newprops)
+static int
+zfs_check_settable(const char *dsname, nvpair_t *pair, cred_t *cr)
+{
+	const char *propname = nvpair_name(pair);
+	boolean_t issnap = (strchr(dsname, '@') != NULL);
+	zfs_prop_t prop = zfs_name_to_prop(propname);
+	uint64_t intval;
+	int err;
+
+	if (prop == ZPROP_INVAL) {
+		if (zfs_prop_user(propname)) {
+			if (err = zfs_secpolicy_write_perms(dsname,
+			    ZFS_DELEG_PERM_USERPROP, cr))
+				return (err);
+			return (0);
+		}
+
+		if (!issnap && zfs_prop_userquota(propname)) {
+			const char *perm = NULL;
+			const char *uq_prefix =
+			    zfs_userquota_prop_prefixes[ZFS_PROP_USERQUOTA];
+			const char *gq_prefix =
+			    zfs_userquota_prop_prefixes[ZFS_PROP_GROUPQUOTA];
+
+			if (strncmp(propname, uq_prefix,
+			    strlen(uq_prefix)) == 0) {
+				perm = ZFS_DELEG_PERM_USERQUOTA;
+			} else if (strncmp(propname, gq_prefix,
+			    strlen(gq_prefix)) == 0) {
+				perm = ZFS_DELEG_PERM_GROUPQUOTA;
+			} else {
+				/* USERUSED and GROUPUSED are read-only */
+				return (EINVAL);
+			}
+
+			if (err = zfs_secpolicy_write_perms(dsname, perm, cr))
+				return (err);
+			return (0);
+		}
+
+		return (EINVAL);
+	}
+
+	if (issnap)
+		return (EINVAL);
+
+	if (nvpair_type(pair) == DATA_TYPE_NVLIST) {
+		/*
+		 * dsl_prop_get_all_impl() returns properties in this
+		 * format.
+		 */
+		nvlist_t *attrs;
+		VERIFY(nvpair_value_nvlist(pair, &attrs) == 0);
+		VERIFY(nvlist_lookup_nvpair(attrs, ZPROP_VALUE,
+		    &pair) == 0);
+	}
+
+	/*
+	 * Check that this value is valid for this pool version
+	 */
+	switch (prop) {
+	case ZFS_PROP_COMPRESSION:
+		/*
+		 * If the user specified gzip compression, make sure
+		 * the SPA supports it. We ignore any errors here since
+		 * we'll catch them later.
+		 */
+		if (nvpair_type(pair) == DATA_TYPE_UINT64 &&
+		    nvpair_value_uint64(pair, &intval) == 0) {
+			if (intval >= ZIO_COMPRESS_GZIP_1 &&
+			    intval <= ZIO_COMPRESS_GZIP_9 &&
+			    zfs_earlier_version(dsname,
+			    SPA_VERSION_GZIP_COMPRESSION)) {
+				return (ENOTSUP);
+			}
+
+			if (intval == ZIO_COMPRESS_ZLE &&
+			    zfs_earlier_version(dsname,
+			    SPA_VERSION_ZLE_COMPRESSION))
+				return (ENOTSUP);
+
+			/*
+			 * If this is a bootable dataset then
+			 * verify that the compression algorithm
+			 * is supported for booting. We must return
+			 * something other than ENOTSUP since it
+			 * implies a downrev pool version.
+			 */
+			if (zfs_is_bootfs(dsname) &&
+			    !BOOTFS_COMPRESS_VALID(intval)) {
+				return (ERANGE);
+			}
+		}
+		break;
+
+	case ZFS_PROP_COPIES:
+		if (zfs_earlier_version(dsname, SPA_VERSION_DITTO_BLOCKS))
+			return (ENOTSUP);
+		break;
+
+	case ZFS_PROP_DEDUP:
+		if (zfs_earlier_version(dsname, SPA_VERSION_DEDUP))
+			return (ENOTSUP);
+		break;
+
+	case ZFS_PROP_SHARESMB:
+		if (zpl_earlier_version(dsname, ZPL_VERSION_FUID))
+			return (ENOTSUP);
+		break;
+
+	case ZFS_PROP_ACLINHERIT:
+		if (nvpair_type(pair) == DATA_TYPE_UINT64 &&
+		    nvpair_value_uint64(pair, &intval) == 0) {
+			if (intval == ZFS_ACL_PASSTHROUGH_X &&
+			    zfs_earlier_version(dsname,
+			    SPA_VERSION_PASSTHROUGH_X))
+				return (ENOTSUP);
+		}
+		break;
+	}
+
+	return (zfs_secpolicy_setprop(dsname, prop, pair, CRED()));
+}
+
+/*
+ * Removes properties from the given props list that fail permission checks
+ * needed to clear them and to restore them in case of a receive error. For each
+ * property, make sure we have both set and inherit permissions.
+ *
+ * Returns the first error encountered if any permission checks fail. If the
+ * caller provides a non-NULL errlist, it also gives the complete list of names
+ * of all the properties that failed a permission check along with the
+ * corresponding error numbers. The caller is responsible for freeing the
+ * returned errlist.
+ *
+ * If every property checks out successfully, zero is returned and the list
+ * pointed at by errlist is NULL.
+ */
+static int
+zfs_check_clearable(char *dataset, nvlist_t *props, nvlist_t **errlist)
 {
 	zfs_cmd_t *zc;
-	nvpair_t *prop;
+	nvpair_t *pair, *next_pair;
+	nvlist_t *errors;
+	int err, rv = 0;
 
 	if (props == NULL)
-		return;
+		return (0);
+
+	VERIFY(nvlist_alloc(&errors, NV_UNIQUE_NAME, KM_SLEEP) == 0);
+
 	zc = kmem_alloc(sizeof (zfs_cmd_t), KM_SLEEP);
 	(void) strcpy(zc->zc_name, dataset);
-	for (prop = nvlist_next_nvpair(props, NULL); prop;
-	    prop = nvlist_next_nvpair(props, prop)) {
-		if (newprops != NULL &&
-		    nvlist_exists(newprops, nvpair_name(prop)))
-			continue;
-		(void) strcpy(zc->zc_value, nvpair_name(prop));
-		if (zfs_secpolicy_inherit(zc, CRED()) == 0)
-			(void) zfs_ioc_inherit_prop(zc);
+	pair = nvlist_next_nvpair(props, NULL);
+	while (pair != NULL) {
+		next_pair = nvlist_next_nvpair(props, pair);
+
+		(void) strcpy(zc->zc_value, nvpair_name(pair));
+		if ((err = zfs_check_settable(dataset, pair, CRED())) != 0 ||
+		    (err = zfs_secpolicy_inherit(zc, CRED())) != 0) {
+			VERIFY(nvlist_remove_nvpair(props, pair) == 0);
+			VERIFY(nvlist_add_int32(errors,
+			    zc->zc_value, err) == 0);
+		}
+		pair = next_pair;
 	}
 	kmem_free(zc, sizeof (zfs_cmd_t));
+
+	if ((pair = nvlist_next_nvpair(errors, NULL)) == NULL) {
+		nvlist_free(errors);
+		errors = NULL;
+	} else {
+		VERIFY(nvpair_value_int32(pair, &rv) == 0);
+	}
+
+	if (errlist == NULL)
+		nvlist_free(errors);
+	else
+		*errlist = errors;
+
+	return (rv);
 }
 
+static boolean_t
+propval_equals(nvpair_t *p1, nvpair_t *p2)
+{
+	if (nvpair_type(p1) == DATA_TYPE_NVLIST) {
+		/* dsl_prop_get_all_impl() format */
+		nvlist_t *attrs;
+		VERIFY(nvpair_value_nvlist(p1, &attrs) == 0);
+		VERIFY(nvlist_lookup_nvpair(attrs, ZPROP_VALUE,
+		    &p1) == 0);
+	}
+
+	if (nvpair_type(p2) == DATA_TYPE_NVLIST) {
+		nvlist_t *attrs;
+		VERIFY(nvpair_value_nvlist(p2, &attrs) == 0);
+		VERIFY(nvlist_lookup_nvpair(attrs, ZPROP_VALUE,
+		    &p2) == 0);
+	}
+
+	if (nvpair_type(p1) != nvpair_type(p2))
+		return (B_FALSE);
+
+	if (nvpair_type(p1) == DATA_TYPE_STRING) {
+		char *valstr1, *valstr2;
+
+		VERIFY(nvpair_value_string(p1, (char **)&valstr1) == 0);
+		VERIFY(nvpair_value_string(p2, (char **)&valstr2) == 0);
+		return (strcmp(valstr1, valstr2) == 0);
+	} else {
+		uint64_t intval1, intval2;
+
+		VERIFY(nvpair_value_uint64(p1, &intval1) == 0);
+		VERIFY(nvpair_value_uint64(p2, &intval2) == 0);
+		return (intval1 == intval2);
+	}
+}
+
+/*
+ * Remove properties from props if they are not going to change (as determined
+ * by comparison with origprops). Remove them from origprops as well, since we
+ * do not need to clear or restore properties that won't change.
+ */
+static void
+props_reduce(nvlist_t *props, nvlist_t *origprops)
+{
+	nvpair_t *pair, *next_pair;
+
+	if (origprops == NULL)
+		return; /* all props need to be received */
+
+	pair = nvlist_next_nvpair(props, NULL);
+	while (pair != NULL) {
+		const char *propname = nvpair_name(pair);
+		nvpair_t *match;
+
+		next_pair = nvlist_next_nvpair(props, pair);
+
+		if ((nvlist_lookup_nvpair(origprops, propname,
+		    &match) != 0) || !propval_equals(pair, match))
+			goto next; /* need to set received value */
+
+		/* don't clear the existing received value */
+		(void) nvlist_remove_nvpair(origprops, match);
+		/* don't bother receiving the property */
+		(void) nvlist_remove_nvpair(props, pair);
+next:
+		pair = next_pair;
+	}
+}
+
+#ifdef	DEBUG
+static boolean_t zfs_ioc_recv_inject_err;
+#endif
+
 /*
  * inputs:
  * zc_name		name of containing filesystem
@@ -2868,6 +3295,8 @@
  *
  * outputs:
  * zc_cookie		number of bytes read
+ * zc_nvlist_dst{_size} error for each unapplied received property
+ * zc_obj		zprop_errflags_t
  */
 static int
 zfs_ioc_recv(zfs_cmd_t *zc)
@@ -2876,13 +3305,17 @@
 	objset_t *os;
 	dmu_recv_cookie_t drc;
 	boolean_t force = (boolean_t)zc->zc_guid;
-	int error, fd;
+	int fd;
+	int error = 0;
+	int props_error = 0;
+	nvlist_t *errors;
 	offset_t off;
-	nvlist_t *props = NULL;
-	nvlist_t *origprops = NULL;
+	nvlist_t *props = NULL; /* sent properties */
+	nvlist_t *origprops = NULL; /* existing properties */
 	objset_t *origin = NULL;
 	char *tosnap;
 	char tofs[ZFS_MAXNAMELEN];
+	boolean_t first_recvd_props = B_FALSE;
 
 	if (dataset_namecheck(zc->zc_value, NULL, NULL) != 0 ||
 	    strchr(zc->zc_value, '@') == NULL ||
@@ -2891,8 +3324,7 @@
 
 	(void) strcpy(tofs, zc->zc_value);
 	tosnap = strchr(tofs, '@');
-	*tosnap = '\0';
-	tosnap++;
+	*tosnap++ = '\0';
 
 	if (zc->zc_nvlist_src != NULL &&
 	    (error = get_nvlist(zc->zc_nvlist_src, zc->zc_nvlist_src_size,
@@ -2906,12 +3338,36 @@
 		return (EBADF);
 	}
 
+	VERIFY(nvlist_alloc(&errors, NV_UNIQUE_NAME, KM_SLEEP) == 0);
+
 	if (props && dmu_objset_hold(tofs, FTAG, &os) == 0) {
+		if ((spa_version(os->os_spa) >= SPA_VERSION_RECVD_PROPS) &&
+		    !dsl_prop_get_hasrecvd(os)) {
+			first_recvd_props = B_TRUE;
+		}
+
 		/*
-		 * If new properties are supplied, they are to completely
-		 * replace the existing ones, so stash away the existing ones.
+		 * If new received properties are supplied, they are to
+		 * completely replace the existing received properties, so stash
+		 * away the existing ones.
 		 */
-		(void) dsl_prop_get_all(os, &origprops, B_TRUE);
+		if (dsl_prop_get_received(os, &origprops) == 0) {
+			nvlist_t *errlist = NULL;
+			/*
+			 * Don't bother writing a property if its value won't
+			 * change (and avoid the unnecessary security checks).
+			 *
+			 * The first receive after SPA_VERSION_RECVD_PROPS is a
+			 * special case where we blow away all local properties
+			 * regardless.
+			 */
+			if (!first_recvd_props)
+				props_reduce(props, origprops);
+			if (zfs_check_clearable(tofs, origprops,
+			    &errlist) != 0)
+				(void) nvlist_merge(errors, errlist, 0);
+			nvlist_free(errlist);
+		}
 
 		dmu_objset_rele(os, FTAG);
 	}
@@ -2930,15 +3386,42 @@
 		goto out;
 
 	/*
-	 * Reset properties.  We do this before we receive the stream
-	 * so that the properties are applied to the new data.
+	 * Set properties before we receive the stream so that they are applied
+	 * to the new data. Note that we must call dmu_recv_stream() if
+	 * dmu_recv_begin() succeeds.
 	 */
 	if (props) {
-		clear_props(tofs, origprops, props);
+		nvlist_t *errlist;
+
+		if (dmu_objset_from_ds(drc.drc_logical_ds, &os) == 0) {
+			if (drc.drc_newfs) {
+				if (spa_version(os->os_spa) >=
+				    SPA_VERSION_RECVD_PROPS)
+					first_recvd_props = B_TRUE;
+			} else if (origprops != NULL) {
+				if (clear_received_props(os, tofs, origprops,
+				    first_recvd_props ? NULL : props) != 0)
+					zc->zc_obj |= ZPROP_ERR_NOCLEAR;
+			} else {
+				zc->zc_obj |= ZPROP_ERR_NOCLEAR;
+			}
+			dsl_prop_set_hasrecvd(os);
+		} else if (!drc.drc_newfs) {
+			zc->zc_obj |= ZPROP_ERR_NOCLEAR;
+		}
+
+		(void) zfs_set_prop_nvlist(tofs, ZPROP_SRC_RECEIVED,
+		    props, &errlist);
+		(void) nvlist_merge(errors, errlist, 0);
+		nvlist_free(errlist);
+	}
+
+	if (fit_error_list(zc, &errors) != 0 || put_nvlist(zc, errors) != 0) {
 		/*
-		 * XXX - Note, this is all-or-nothing; should be best-effort.
+		 * Caller made zc->zc_nvlist_dst less than the minimum expected
+		 * size or supplied an invalid address.
 		 */
-		(void) zfs_set_prop_nvlist(tofs, props);
+		props_error = EINVAL;
 	}
 
 	off = fp->f_offset;
@@ -2973,17 +3456,64 @@
 	if (VOP_SEEK(fp->f_vnode, fp->f_offset, &off, NULL) == 0)
 		fp->f_offset = off;
 
+#ifdef	DEBUG
+	if (zfs_ioc_recv_inject_err) {
+		zfs_ioc_recv_inject_err = B_FALSE;
+		error = 1;
+	}
+#endif
 	/*
 	 * On error, restore the original props.
 	 */
 	if (error && props) {
-		clear_props(tofs, props, NULL);
-		(void) zfs_set_prop_nvlist(tofs, origprops);
+		if (dmu_objset_hold(tofs, FTAG, &os) == 0) {
+			if (clear_received_props(os, tofs, props, NULL) != 0) {
+				/*
+				 * We failed to clear the received properties.
+				 * Since we may have left a $recvd value on the
+				 * system, we can't clear the $hasrecvd flag.
+				 */
+				zc->zc_obj |= ZPROP_ERR_NORESTORE;
+			} else if (first_recvd_props) {
+				dsl_prop_unset_hasrecvd(os);
+			}
+			dmu_objset_rele(os, FTAG);
+		} else if (!drc.drc_newfs) {
+			/* We failed to clear the received properties. */
+			zc->zc_obj |= ZPROP_ERR_NORESTORE;
+		}
+
+		if (origprops == NULL && !drc.drc_newfs) {
+			/* We failed to stash the original properties. */
+			zc->zc_obj |= ZPROP_ERR_NORESTORE;
+		}
+
+		/*
+		 * dsl_props_set() will not convert RECEIVED to LOCAL on or
+		 * after SPA_VERSION_RECVD_PROPS, so we need to specify LOCAL
+		 * explictly if we're restoring local properties cleared in the
+		 * first new-style receive.
+		 */
+		if (origprops != NULL &&
+		    zfs_set_prop_nvlist(tofs, (first_recvd_props ?
+		    ZPROP_SRC_LOCAL : ZPROP_SRC_RECEIVED),
+		    origprops, NULL) != 0) {
+			/*
+			 * We stashed the original properties but failed to
+			 * restore them.
+			 */
+			zc->zc_obj |= ZPROP_ERR_NORESTORE;
+		}
 	}
 out:
 	nvlist_free(props);
 	nvlist_free(origprops);
+	nvlist_free(errors);
 	releasef(fd);
+
+	if (error == 0)
+		error = props_error;
+
 	return (error);
 }
 
@@ -3767,7 +4297,9 @@
 	{ zfs_ioc_release, zfs_secpolicy_release, DATASET_NAME, B_TRUE,
 	    B_TRUE },
 	{ zfs_ioc_get_holds, zfs_secpolicy_read, DATASET_NAME, B_FALSE,
-	    B_TRUE }
+	    B_TRUE },
+	{ zfs_ioc_objset_recvd_props, zfs_secpolicy_read, DATASET_NAME, B_FALSE,
+	    B_FALSE }
 };
 
 int
--- a/usr/src/uts/common/fs/zfs/zfs_vfsops.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/zfs_vfsops.c	Mon Nov 09 20:45:32 2009 -0800
@@ -1356,7 +1356,7 @@
 
 		if (l_to_str_internal(mnt_sl, &str) == 0 &&
 		    dsl_prop_set(osname, zfs_prop_to_name(ZFS_PROP_MLSLABEL),
-		    1, strlen(str) + 1, str) == 0)
+		    ZPROP_SRC_LOCAL, 1, strlen(str) + 1, str) == 0)
 			retv = 0;
 		if (str != NULL)
 			kmem_free(str, strlen(str) + 1);
--- a/usr/src/uts/common/fs/zfs/zvol.c	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/fs/zfs/zvol.c	Mon Nov 09 20:45:32 2009 -0800
@@ -130,8 +130,9 @@
  */
 int zvol_maxphys = DMU_MAX_ACCESS/2;
 
+extern int zfs_set_prop_nvlist(const char *, zprop_source_t,
+    nvlist_t *, nvlist_t **);
 static int zvol_remove_zv(zvol_state_t *);
-extern int zfs_set_prop_nvlist(const char *, nvlist_t *);
 static int zvol_get_data(void *arg, lr_write_t *lr, char *buf, zio_t *zio);
 static int zvol_dumpify(zvol_state_t *zv);
 static int zvol_dump_fini(zvol_state_t *zv);
@@ -1659,7 +1660,8 @@
 		    zfs_prop_to_name(ZFS_PROP_CHECKSUM),
 		    ZIO_CHECKSUM_OFF) == 0);
 
-		error = zfs_set_prop_nvlist(zv->zv_name, nv);
+		error = zfs_set_prop_nvlist(zv->zv_name, ZPROP_SRC_LOCAL,
+		    nv, NULL);
 		nvlist_free(nv);
 
 		if (error)
@@ -1766,7 +1768,8 @@
 	    zfs_prop_to_name(ZFS_PROP_COMPRESSION), compress);
 	(void) nvlist_add_uint64(nv,
 	    zfs_prop_to_name(ZFS_PROP_REFRESERVATION), refresrv);
-	(void) zfs_set_prop_nvlist(zv->zv_name, nv);
+	(void) zfs_set_prop_nvlist(zv->zv_name, ZPROP_SRC_LOCAL,
+	    nv, NULL);
 	nvlist_free(nv);
 
 	zvol_free_extents(zv);
--- a/usr/src/uts/common/sys/fs/zfs.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/sys/fs/zfs.h	Mon Nov 09 20:45:32 2009 -0800
@@ -173,10 +173,27 @@
 	ZPROP_SRC_DEFAULT = 0x2,
 	ZPROP_SRC_TEMPORARY = 0x4,
 	ZPROP_SRC_LOCAL = 0x8,
-	ZPROP_SRC_INHERITED = 0x10
+	ZPROP_SRC_INHERITED = 0x10,
+	ZPROP_SRC_RECEIVED = 0x20
 } zprop_source_t;
 
-#define	ZPROP_SRC_ALL	0x1f
+#define	ZPROP_SRC_ALL	0x3f
+
+#define	ZPROP_SOURCE_VAL_RECVD	"$recvd"
+#define	ZPROP_N_MORE_ERRORS	"N_MORE_ERRORS"
+/*
+ * Dataset flag implemented as a special entry in the props zap object
+ * indicating that the dataset has received properties on or after
+ * SPA_VERSION_RECVD_PROPS. The first such receive blows away local properties
+ * just as it did in earlier versions, and thereafter, local properties are
+ * preserved.
+ */
+#define	ZPROP_HAS_RECVD		"$hasrecvd"
+
+typedef enum {
+	ZPROP_ERR_NOCLEAR = 0x1, /* failure to clear existing props */
+	ZPROP_ERR_NORESTORE = 0x2 /* failure to restore props on error */
+} zprop_errflags_t;
 
 typedef int (*zprop_func)(int, void *);
 
@@ -198,7 +215,7 @@
 const char *zfs_prop_to_name(zfs_prop_t);
 zfs_prop_t zfs_name_to_prop(const char *);
 boolean_t zfs_prop_user(const char *);
-boolean_t zfs_prop_userquota(const char *name);
+boolean_t zfs_prop_userquota(const char *);
 int zfs_prop_index_to_string(zfs_prop_t, uint64_t, const char **);
 int zfs_prop_string_to_index(zfs_prop_t, const char *, uint64_t *);
 uint64_t zfs_prop_random_value(zfs_prop_t, uint64_t seed);
@@ -306,14 +323,15 @@
 #define	SPA_VERSION_19			19ULL
 #define	SPA_VERSION_20			20ULL
 #define	SPA_VERSION_21			21ULL
+#define	SPA_VERSION_22			22ULL
 /*
  * When bumping up SPA_VERSION, make sure GRUB ZFS understands the on-disk
  * format change. Go to usr/src/grub/grub-0.97/stage2/{zfs-include/, fsys_zfs*},
  * and do the appropriate changes.  Also bump the version number in
  * usr/src/grub/capability.
  */
-#define	SPA_VERSION			SPA_VERSION_21
-#define	SPA_VERSION_STRING		"21"
+#define	SPA_VERSION			SPA_VERSION_22
+#define	SPA_VERSION_STRING		"22"
 
 /*
  * Symbolic names for the changes that caused a SPA_VERSION switch.
@@ -356,6 +374,7 @@
 #define	SPA_VERSION_HOLES		SPA_VERSION_19
 #define	SPA_VERSION_ZLE_COMPRESSION	SPA_VERSION_20
 #define	SPA_VERSION_DEDUP		SPA_VERSION_21
+#define	SPA_VERSION_RECVD_PROPS		SPA_VERSION_22
 
 /*
  * ZPL version - rev'd whenever an incompatible on-disk format change
@@ -659,7 +678,8 @@
 	ZFS_IOC_USERSPACE_UPGRADE,
 	ZFS_IOC_HOLD,
 	ZFS_IOC_RELEASE,
-	ZFS_IOC_GET_HOLDS
+	ZFS_IOC_GET_HOLDS,
+	ZFS_IOC_OBJSET_RECVD_PROPS
 } zfs_ioc_t;
 
 /*
--- a/usr/src/uts/common/sys/nvpair.h	Mon Nov 09 20:01:32 2009 -0800
+++ b/usr/src/uts/common/sys/nvpair.h	Mon Nov 09 20:45:32 2009 -0800
@@ -19,15 +19,13 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
 #ifndef	_SYS_NVPAIR_H
 #define	_SYS_NVPAIR_H
 
-#pragma ident	"%Z%%M%	%I%	%E% SMI"
-
 #include <sys/types.h>
 #include <sys/errno.h>
 #include <sys/va_list.h>
@@ -199,6 +197,7 @@
 
 int nvlist_remove(nvlist_t *, const char *, data_type_t);
 int nvlist_remove_all(nvlist_t *, const char *);
+int nvlist_remove_nvpair(nvlist_t *, nvpair_t *);
 
 int nvlist_lookup_boolean(nvlist_t *, const char *);
 int nvlist_lookup_boolean_value(nvlist_t *, const char *, boolean_t *);
@@ -237,9 +236,11 @@
 int nvlist_lookup_nvpair_embedded_index(nvlist_t *, const char *, nvpair_t **,
     int *, char **);
 boolean_t nvlist_exists(nvlist_t *, const char *);
+boolean_t nvlist_empty(nvlist_t *);
 
 /* processing nvpair */
 nvpair_t *nvlist_next_nvpair(nvlist_t *, nvpair_t *);
+nvpair_t *nvlist_prev_nvpair(nvlist_t *, nvpair_t *);
 char *nvpair_name(nvpair_t *);
 data_type_t nvpair_type(nvpair_t *);
 int nvpair_type_is_array(nvpair_t *);