17551576 publication api should not modify package manifests
authorYiteng Zhang <yiteng.zhang@oracle.com>
Fri, 24 Jun 2016 18:04:04 -0700
changeset 3381 a37c074e1170
parent 3380 e06a977abe3c
child 3382 b6e95be081d0
17551576 publication api should not modify package manifests
src/modules/actions/signature.py
src/modules/client/transport/fileobj.py
src/modules/client/transport/repo.py
src/modules/client/transport/transport.py
src/modules/digest.py
src/modules/misc.py
src/modules/publish/transaction.py
src/modules/server/depot.py
src/modules/server/repository.py
src/modules/server/transaction.py
src/pull.py
src/tests/cli/t_pkgrecv.py
src/tests/cli/t_pkgsend.py
src/tests/cli/t_pkgsign.py
src/util/publish/pkgmerge.py
--- a/src/modules/actions/signature.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/actions/signature.py	Fri Jun 24 18:04:04 2016 -0700
@@ -155,8 +155,7 @@
 
                         chain_csizes.append(csize)
                         for attr in chashes:
-                                chain_chshes[attr].append(
-                                    chashes[attr].hexdigest())
+                                chain_chshes[attr].append(chashes[attr])
 
                 # Remove any unused hash attributes.
                 for cattrs in (chain_hshes, chain_chshes):
@@ -269,7 +268,7 @@
                         shutil.rmtree(tmp_dir)
                         tmp_a.attrs["pkg.csize"] = csize
                         for attr in chashes:
-                                tmp_a.attrs[attr] = chashes[attr].hexdigest()
+                                tmp_a.attrs[attr] = chashes[attr]
                 elif self.hash:
                         tmp_a.hash = self.hash
                         for attr in digest.DEFAULT_HASH_ATTRS:
@@ -303,8 +302,7 @@
                         shutil.rmtree(tmp_dir)
                         csizes.append(csize)
                         for attr in chashes:
-                                chain_chashes[attr].append(
-                                    chashes[attr].hexdigest())
+                                chain_chashes[attr].append(chashes[attr])
 
                 if chain_hashes:
                         for attr in digest.DEFAULT_CHAIN_ATTRS:
--- a/src/modules/client/transport/fileobj.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/client/transport/fileobj.py	Fri Jun 24 18:04:04 2016 -0700
@@ -240,6 +240,12 @@
 
         # Header and message methods
 
+        @property
+        def headers(self):
+                if not self.__headers_arrived:
+                        self.__fill_headers()
+                return self.__headers
+
         def get_http_message(self):
                 """Return the status message that may be included
                 with a numerical HTTP response code.  Not all HTTP
--- a/src/modules/client/transport/repo.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/client/transport/repo.py	Fri Jun 24 18:04:04 2016 -0700
@@ -49,7 +49,7 @@
 import pkg.server.repository as svr_repo
 import pkg.server.query_parser as sqp
 
-from pkg.misc import N_, force_str
+from pkg.misc import N_, compute_compressed_attrs, EmptyDict
 
 class TransportRepo(object):
         """The TransportRepo class handles transport requests.
@@ -142,7 +142,11 @@
 
                 raise NotImplementedError
 
-        def publish_add_file(self, action, header=None, trans_id=None):
+        def publish_add_file(self, pth, header=None, trans_id=None,
+            basename=None, progtrack=None):
+                raise NotImplementedError
+
+        def publish_add_manifest(self, pth, header=None, trans_id=None):
                 raise NotImplementedError
 
         def publish_abandon(self, header=None, trans_id=None):
@@ -208,6 +212,18 @@
 
                 raise NotImplementedError
 
+        def get_compressed_attrs(self, fhash, header=None, pub=None,
+            trans_id=None, hashes=True):
+                """Given a fhash, returns a tuple of (csize, chashes) where
+                'csize' is the size of the file in the repository and 'chashes'
+                is a dictionary containing any hashes of the compressed data
+                known by the repository.  If the repository cannot provide the
+                hash information or 'hashes' is False, chashes will be an empty
+                dictionary.  If the repository does not have the file, a tuple
+                of (None, None) will be returned instead."""
+
+                raise NotImplementedError
+
         def build_refetch_header(self, header):
                 """Based on existing header contents, build a header that
                 should be used for a subsequent retry when fetching content
@@ -814,13 +830,21 @@
                     progclass=progclass, progtrack=progtrack)
                 self.__check_response_body(fobj)
 
-        def publish_add_file(self, pth, header=None, trans_id=None):
+        def publish_add_file(self, pth, header=None, trans_id=None,
+            basename=None, progtrack=None):
                 """The publish operation that adds content to a repository.
-                The action must be populated with a data property.
                 Callers may supply a header, and should supply a transaction
                 id in trans_id."""
 
                 attrs = {}
+                progclass = None
+
+                if progtrack:
+                        progclass = FileProgress
+
+                if basename:
+                        attrs["basename"] = basename
+
                 baseurl = self.__get_request_url("file/1/")
                 requesturl = urljoin(baseurl, trans_id)
 
@@ -832,7 +856,33 @@
                 if header:
                         headers.update(header)
 
-                fobj = self._post_url(requesturl, header=headers, data_fp=pth)
+                fobj = self._post_url(requesturl, header=headers, data_fp=pth,
+                    progclass=progclass, progtrack=progtrack)
+                self.__check_response_body(fobj)
+
+        def publish_add_manifest(self, pth, header=None, trans_id=None):
+                """The publish operation that adds content to a repository.
+                Callers may supply a header, and should supply a transaction
+                id in trans_id."""
+
+                baseurl = self.__get_request_url("manifest/1/")
+                requesturl = urljoin(baseurl, trans_id)
+                # Compress the manifest for the HTTPRepo case.
+                size = int(os.path.getsize(pth))
+                with open(pth, "rb") as f:
+                        data = f.read()
+                basename = os.path.basename(pth) + ".gz"
+                dirname = os.path.dirname(pth)
+                pathname = os.path.join(dirname, basename)
+                compute_compressed_attrs(basename,
+                    data=data, size=size, compress_dir=dirname)
+
+                headers = {}
+                if header:
+                        headers.update(header)
+
+                fobj = self._post_url(requesturl, header=header,
+                    data_fp=pathname)
                 self.__check_response_body(fobj)
 
         def publish_abandon(self, header=None, trans_id=None):
@@ -1065,6 +1115,51 @@
 
                 return True
 
+        def get_compressed_attrs(self, fhash, header=None, pub=None,
+            trans_id=None, hashes=True):
+                """Given a fhash, returns a tuple of (csize, chashes) where
+                'csize' is the size of the file in the repository and 'chashes'
+                is a dictionary containing any hashes of the compressed data
+                known by the repository.  If the repository cannot provide the
+                hash information or 'hashes' is False, chashes will be an empty
+                dictionary.  If the repository does not have the file, a tuple
+                of (None, None) will be returned instead."""
+
+                # If the publisher's prefix isn't contained in trans_id,
+                # assume the server doesn't have the file.
+                pfx = getattr(pub, "prefix", None)
+                if (pfx and trans_id and
+                    quote("pkg://{0}/".format(pfx), safe='') not in trans_id):
+                        return (None, None)
+
+                # If caller requests hashes and server supports providing them
+                # (v2 of file operation), then attempt to retrieve size and
+                # hashes.  Otherwise, fallback to the v0 file operation which
+                # only returns size (so is faster).
+                if hashes and self.supports_version("file", [2]) > -1:
+                        version = 2
+                else:
+                        version = 0
+
+                baseurl = self.__get_request_url("file/{0}/".format(version),
+                    pub=pub)
+                requesturl = urljoin(baseurl, fhash)
+
+                try:
+                        # see if repository has file
+                        resp = self._fetch_url_header(requesturl, header)
+                        resp.read()
+                        csize = resp.getheader("Content-Length", None)
+                        chashes = dict(
+                            val.split("=", 1)
+                            for hdr, val in six.iteritems(resp.headers)
+                            if hdr.lower().startswith("x-ipkg-attr")
+                        )
+                        return (csize, chashes)
+                except Exception:
+                        # repository transport issue or does not have file
+                        return (None, None)
+
         def build_refetch_header(self, header):
                 """For HTTP requests that have failed due to corrupt content,
                 if that request didn't specify 'Cache-control: no-cache' in
@@ -1593,7 +1688,7 @@
                     "catalog": ["1"],
                     "close": ["0"],
                     "file": ["0", "1"],
-                    "manifest": ["0"],
+                    "manifest": ["0", "1"],
                     "open": ["0"],
                     "publisher": ["0", "1"],
                     "search": ["1"],
@@ -1642,12 +1737,33 @@
                                 sz = int(action.attrs.get("pkg.size", 0))
                                 progtrack.progress_callback(0, 0, sz, sz)
 
-        def publish_add_file(self, pth, header=None, trans_id=None):
+        def publish_add_file(self, pth, header=None, trans_id=None,
+            basename=None, progtrack=None):
                 """The publish operation that adds a file to an existing
                 transaction."""
 
+                progclass = None
+                if progtrack:
+                        progclass = FileProgress
+                        progtrack = progclass(progtrack)
+
+                try:
+                        self._frepo.add_file(trans_id, pth, basename)
+                except svr_repo.RepositoryError as e:
+                        if progtrack:
+                                progtrack.abort()
+                        raise tx.TransportOperationError(str(e))
+                else:
+                        if progtrack:
+                                sz = int(os.path.getsize(pth))
+                                progtrack.progress_callback(0, 0, sz, sz)
+
+        def publish_add_manifest(self, pth, header=None, trans_id=None):
+                """The publish operation that adds a manifest to an existing
+                transaction."""
+
                 try:
-                        self._frepo.add_file(trans_id, pth)
+                        self._frepo.add_manifest(trans_id, pth)
                 except svr_repo.RepositoryError as e:
                         raise tx.TransportOperationError(str(e))
 
@@ -1814,6 +1930,39 @@
 
                 return True
 
+        def get_compressed_attrs(self, fhash, header=None, pub=None,
+            trans_id=None, hashes=True):
+                """Given a fhash, returns a tuple of (csize, chashes) where
+                'csize' is the size of the file in the repository and 'chashes'
+                is a dictionary containing any hashes of the compressed data
+                known by the repository.  If the repository cannot provide the
+                hash information or 'hashes' is False, chashes will be an empty
+                dictionary.  If the repository does not have the file, a tuple
+                of (None, None) will be returned instead."""
+
+                # If the publisher's prefix isn't contained in trans_id,
+                # assume the server doesn't have the file.
+                pfx = getattr(pub, "prefix", None)
+                if (pfx and trans_id and
+                    quote("pkg://{0}/".format(pfx), safe='') not in trans_id):
+                        return (None, None)
+
+                try:
+                        # see if repository has file
+                        fpath = self._frepo.file(fhash, pub=pfx)
+                        if hashes:
+                                csize, chashes = compute_compressed_attrs(fhash,
+                                    file_path=fpath)
+                        else:
+                                csize = os.stat(fpath).st_size
+                                chashes = EmptyDict
+                        return (csize, chashes)
+                except (EnvironmentError,
+                        svr_repo.RepositoryError,
+                        svr_repo.RepositoryFileNotFoundError):
+                        # repository transport issue or does not have file
+                        return (None, None)
+
         def build_refetch_header(self, header):
                 """Pointless to attempt refetch of corrupt content for
                 this protocol."""
--- a/src/modules/client/transport/transport.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/client/transport/transport.py	Fri Jun 24 18:04:04 2016 -0700
@@ -84,6 +84,9 @@
 
                 self.pkg_pub_map = None
                 self.alt_pubs = None
+                # An integer that indicates the maximum times to check if a
+                # file needs to be uploaded for the transport.
+                self.max_transfer_checks = 20
 
         def add_cache(self, path, layout=None, pub=None, readonly=True):
                 """Adds the directory specified by 'path' as a location to read
@@ -511,7 +514,6 @@
         user_agent = property(__get_user_agent,
             doc="A string that identifies the user agent for the transport.")
 
-
 class LockedTransport(object):
         """A decorator class that wraps transport functions, calling
         their lock and unlock methods.  Due to implementation differences
@@ -566,6 +568,9 @@
                 self.stats = tstats.RepoChooser()
                 self.repo_status = {}
                 self.__tmp_crls = {}
+                # Used to record those actions that will have their payload
+                # transferred.
+                self.__hashes = set()
                 # Used to record those CRLs which are unreachable during the
                 # current operation.
                 self.__bad_crls = set()
@@ -1245,6 +1250,29 @@
                 raise failures
 
         @LockedTransport()
+        def get_compressed_attrs(self, fhash, pub=None, trans_id=None,
+            hashes=True):
+                """Given a fhash, returns a tuple of (csize, chashes) where
+                'csize' is the size of the file in the repository and 'chashes'
+                is a dictionary containing any hashes of the compressed data
+                known by the repository.  If the repository cannot provide the
+                hash information or 'hashes' is False, chashes will be an empty
+                dictionary.  If the repository does not have the file, a tuple
+                of (None, None) will be returned instead."""
+
+                failures = tx.TransportFailures()
+                # If the operation fails, it doesn't matter as it won't cause a
+                # correctness issue, and it could be the repository simply
+                # doesn't have the file, so don't try more than once.
+                retry_count = 1
+                header = self.__build_header(uuid=self.__get_uuid(pub))
+
+                for d, retries in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True):
+                        return d.get_compressed_attrs(fhash, header,
+                            pub=pub, trans_id=trans_id, hashes=hashes)
+
+        @LockedTransport()
         def get_manifest(self, fmri, excludes=misc.EmptyI, intent=None,
             ccancel=None, pub=None, content_only=False, alt_repo=None):
                 """Given a fmri, and optional excludes, return a manifest
@@ -2770,10 +2798,11 @@
                 raise failures
 
         @LockedTransport()
-        def publish_add_file(self, pub, pth, trans_id=None):
+        def publish_add_file(self, pub, pth, trans_id=None, basename=None,
+            progtrack=None):
                 """Perform the 'add_file' publication operation to the publisher
-                supplied in pub.  The caller should include the action in the
-                action argument. The transaction-id is passed in trans_id."""
+                supplied in pub.  The caller should include the path in the
+                pth argument. The transaction-id is passed in trans_id."""
 
                 failures = tx.TransportFailures()
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
@@ -2788,6 +2817,41 @@
                     versions=[1]):
                         try:
                                 d.publish_add_file(pth, header=header,
+                                    trans_id=trans_id, basename=basename,
+                                    progtrack=progtrack)
+                                return
+                        except tx.ExcessiveTransientFailure as ex:
+                                # If an endpoint experienced so many failures
+                                # that we just gave up, grab the list of
+                                # failures that it contains
+                                failures.extend(ex.failures)
+                        except tx.TransportException as e:
+                                if e.retryable:
+                                        failures.append(e)
+                                else:
+                                        raise
+
+                raise failures
+
+        @LockedTransport()
+        def publish_add_manifest(self, pub, pth, trans_id=None):
+                """Perform the 'add_manifest' publication operation to the publisher
+                supplied in pub.  The caller should include the path in the
+                pth argument. The transaction-id is passed in trans_id."""
+
+                failures = tx.TransportFailures()
+                retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+                header = self.__build_header(uuid=self.__get_uuid(pub))
+
+                # Call setup if the transport isn't configured or was shutdown.
+                if not self.__engine:
+                        self.__setup()
+
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True,
+                    operation="manifest", versions=[1]):
+                        try:
+                                d.publish_add_manifest(pth, header=header,
                                     trans_id=trans_id)
                                 return
                         except tx.ExcessiveTransientFailure as ex:
@@ -3140,6 +3204,73 @@
                                 return False
                 return True
 
+        def supports_version(self, pub, op, verlist):
+                """Returns version-id of highest supported version.
+                If the version is not supported, or no data is available,
+                -1 is returned instead."""
+
+                retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+
+                # Call setup if transport isn't configured, or was shutdown.
+                if not self.__engine:
+                        self.__setup()
+
+                # For backward compatibility, we pass version 0 to __gen_repo
+                # so that unsupported operation exception won't be raised if
+                # higher version is not supported, such as manifest/1.
+                for d, retries, v in self.__gen_repo(pub, retry_count,
+                    origin_only=True, single_repository=True,
+                    operation=op, versions=[0]):
+                        return d.supports_version(op, verlist)
+
+        def get_transfer_info(self, pub):
+                """Return a tuple of (compressed, hashes) where 'compressed'
+                indicates whether files can be transferred compressed and
+                'hashes', the set of hashes of those actions that will have
+                their payload transferred."""
+
+                compressed = self.supports_version(pub, 'manifest', [1]) > -1
+                return compressed, self.__hashes
+
+        def get_transfer_size(self, pub, actions):
+                """Return estimated transfer size given a list of actions that
+                will have their payload transferred."""
+
+                for d, retries in self.__gen_repo(pub, 1,
+                    origin_only=True, single_repository=True):
+                        scheme, netloc, path, params, query, fragment = \
+                            urlparse(d._url, "http", allow_fragments=0)
+                        break
+
+                local = scheme == "file"
+                sendb = 0
+                uploaded = 0
+                support = self.supports_version(pub, "manifest", [1]) > -1
+                for a in actions:
+                        if not a.has_payload:
+                                continue
+                        if not support:
+                                sendb += int(a.attrs.get("pkg.size", 0))
+                                continue
+                        if a.hash not in self.__hashes:
+                                if (local or uploaded <
+                                     self.cfg.max_transfer_checks):
+                                        # If the repository is local
+                                        # (filesystem-based) or less than
+                                        # max_transfer_checks, call
+                                        # get_compressed_attrs()...
+                                        has_file, dummy = \
+                                            self.get_compressed_attrs(
+                                            a.hash, pub=pub, hashes=False)
+                                        if has_file:
+                                                continue
+                                # If server doesn't have file, assume it will be
+                                # uploaded.
+                                sendb += int(a.attrs.get("pkg.csize", 0))
+                                self.__hashes.add(a.hash)
+                                uploaded += 1
+                return sendb
+
 
 class MultiXfr(object):
         """A transport object for performing multiple simultaneous
--- a/src/modules/digest.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/digest.py	Fri Jun 24 18:04:04 2016 -0700
@@ -75,6 +75,13 @@
 # using the "most preferred" hash. See get_preferred_hash(..),
 # get_least_preferred_hash(..) and get_common_preferred_hash(..)
 #
+
+LEGACY_HASH_ATTRS = ["hash"]
+LEGACY_CHASH_ATTRS = ["chash"]
+LEGACY_CONTENT_HASH_ATTRS = ["elfhash"]
+LEGACY_CHAIN_ATTRS = ["chain"]
+LEGACY_CHAIN_CHASH_ATTRS = ["chain.chashes"]
+
 if DebugValues["hash"] == "sha1+sha512_256" and sha512_supported:
         # Simulate pkg(7) where SHA-1 and SHA-512/256 are used for publication
         DEFAULT_HASH_ATTRS = ["hash", "pkg.hash.sha512_256"]
--- a/src/modules/misc.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/misc.py	Fri Jun 24 18:04:04 2016 -0700
@@ -642,11 +642,55 @@
                 hash_results[attr] = hash_results[attr].hexdigest()
         return hash_results, content.read()
 
-def compute_compressed_attrs(fname, file_path, data, size, compress_dir,
-    bufsz=64*1024, chash_attrs=None, chash_algs=None):
+
+class _GZWriteWrapper(object):
+        """Used by compute_compressed_attrs to calculate data size and compute
+        hashes as the data is written instead of having to read the written data
+        again later."""
+
+        def __init__(self, path, chashes):
+                """If path is None, the data will be discarded immediately after
+                computing size and hashes."""
+
+                if path:
+                        self._ofile = open(path, "wb")
+                else:
+                        self._ofile = None
+                self._chashes = chashes
+                self._size = 0
+
+        def close(self):
+                """Close the file."""
+                if self._ofile:
+                        self._ofile.close()
+                        self._ofile = None
+
+        def flush(self):
+                """Flush the file."""
+                if self._ofile:
+                        self._ofile.flush()
+
+        @property
+        def size(self):
+                """Return the size of the file."""
+                return self._size
+
+        def write(self, data):
+                """Write data to the file and compute the hashes of the data."""
+                if self._ofile:
+                        self._ofile.write(data)
+                self._size += len(data)
+                for chash_attr in self._chashes:
+                        self._chashes[chash_attr].update(
+                            data) # pylint: disable=E1101
+
+
+def compute_compressed_attrs(fname, file_path=None, data=None, size=None,
+    compress_dir=None, bufsz=64*1024, chash_attrs=None, chash_algs=None):
         """Returns the size and one or more hashes of the compressed data.  If
         the file located at file_path doesn't exist or isn't gzipped, it creates
-        a file in compress_dir named fname.
+        a file in compress_dir named fname.  If compress_dir is None, the
+        attributes are calculated but no data will be written.
 
         'chash_attrs' is a list of the chash attributes we should compute, with
         'chash_algs' being a dictionary that maps the attribute names to the
@@ -658,6 +702,10 @@
         if chash_algs is None:
                 chash_algs = digest.CHASH_ALGS
 
+        chashes = {}
+        for chash_attr in chash_attrs:
+                chashes[chash_attr] = chash_algs[chash_attr]()
+
         #
         # This check prevents compressing a file which is already compressed.
         # This takes CPU load off the depot on large imports of mostly-the-same
@@ -675,8 +723,13 @@
                         opath = file_path
 
         if fileneeded:
-                opath = os.path.join(compress_dir, fname)
-                ofile = PkgGzipFile(opath, "wb")
+                if compress_dir:
+                        opath = os.path.join(compress_dir, fname)
+                else:
+                        opath = None
+
+                fobj = _GZWriteWrapper(opath, chashes)
+                ofile = PkgGzipFile(mode="wb", fileobj=fobj)
 
                 nbuf = size // bufsz
 
@@ -688,32 +741,33 @@
                 m = nbuf * bufsz
                 ofile.write(data[m:])
                 ofile.close()
-
-        data = None
-
-        # Now that the file has been compressed, determine its
-        # size.
-        fs = os.stat(opath)
-        csize = str(fs.st_size)
+                fobj.close()
+                csize = str(fobj.size)
+                for attr in chashes:
+                        chashes[attr] = chashes[attr].hexdigest()
+                return csize, chashes
 
         # Compute the SHA hash of the compressed file.  In order for this to
         # work correctly, we have to use the PkgGzipFile class.  It omits
         # filename and timestamp information from the gzip header, allowing us
         # to generate deterministic hashes for different files with identical
         # content.
-        cfile = open(opath, "rb")
-        chashes = {}
-        for chash_attr in chash_attrs:
-                chashes[chash_attr] = chash_algs[chash_attr]()
-        while True:
-                cdata = cfile.read(bufsz)
-                # cdata is bytes
-                if cdata == b"":
-                        break
-                for chash_attr in chashes:
-                        chashes[chash_attr].update(
-                            cdata) # pylint: disable=E1101
-        cfile.close()
+        fs = os.stat(opath)
+        csize = str(fs.st_size)
+        with open(opath, "rb") as cfile:
+                while True:
+                        cdata = cfile.read(bufsz)
+                        # cdata is bytes
+                        if cdata == b"":
+                                break
+                        for chash_attr in chashes:
+                                chashes[chash_attr].update(
+                                    cdata) # pylint: disable=E1101
+
+        # The returned dictionary can now be populated with the hexdigests
+        # instead of the hash objects themselves.
+        for attr in chashes:
+                chashes[attr] = chashes[attr].hexdigest()
         return csize, chashes
 
 class ProcFS(object):
--- a/src/modules/publish/transaction.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/publish/transaction.py	Fri Jun 24 18:04:04 2016 -0700
@@ -29,12 +29,23 @@
 though the other classes can be referred to for documentation purposes."""
 
 import os
+import shutil
 import six
 from six.moves.urllib.parse import quote, unquote, urlparse, urlunparse
+import tempfile
 
 from pkg.misc import EmptyDict
 import pkg.actions as actions
 import pkg.config as cfg
+import pkg.digest as digest
+
+# If elf module is supported, we will extract ELF information.
+try:
+        import pkg.elf as elf
+        haveelf = True
+except ImportError:
+        haveelf = False
+import pkg.misc as misc
 import pkg.portable.util as os_util
 import pkg.server.repository as sr
 import pkg.client.api_errors as apx
@@ -133,7 +144,7 @@
                 self.progtrack = progtrack
                 self.trans_id = trans_id
 
-        def add(self, action):
+        def add(self, action, exact=False, path=None):
                 """Adds an action and its related content to an in-flight
                 transaction.  Returns nothing."""
 
@@ -206,8 +217,16 @@
                 self.progtrack = progtrack
                 self.transport = xport
                 self.publisher = pub
+                self.__local = False
+                self.__uploaded = 0
+                self.__uploads = {}
+                self.__transactions = {}
+                self._tmpdir = None
+                self._append_mode = False
+                self._upload_mode = None
 
                 if scheme == "file":
+                        self.__local = True
                         self.create_file_repo(repo_props=repo_props,
                             create_repo=create_repo)
                 elif scheme != "file" and create_repo:
@@ -252,7 +271,7 @@
                 self.transport.publish_cache_repository(self.publisher, repo)
 
 
-        def add(self, action):
+        def add(self, action, exact=False, path=None):
                 """Adds an action and its related content to an in-flight
                 transaction.  Returns nothing."""
 
@@ -264,6 +283,22 @@
                         raise TransactionOperationError("add",
                             trans_id=self.trans_id, msg=str(e))
 
+                # If the server supports it, we'll upload the manifest as-is
+                # by accumulating the manifest contents in self.__transactions.
+                man = self.__transactions.get(self.trans_id)
+                if man is not None:
+                        try:
+                                self._process_action(action, exact=exact,
+                                    path=path)
+                        except apx.TransportError as e:
+                                msg = str(e)
+                                raise TransactionOperationError("add",
+                                    trans_id=self.trans_id, msg=msg)
+                        self.__transactions[self.trans_id] = man + \
+                            str(action) + "\n"
+                        return
+
+                # Fallback to older logic.
                 try:
                         self.transport.publish_add(self.publisher,
                             action=action, trans_id=self.trans_id,
@@ -273,7 +308,175 @@
                         raise TransactionOperationError("add",
                             trans_id=self.trans_id, msg=msg)
 
-        def add_file(self, pth):
+        def __get_elf_attrs(self, action, fname, data):
+                """Helper function to get the ELF information."""
+
+                if not haveelf or data[:4] != b"\x7fELF" or (
+                    "elfarch" in action.attrs and
+                    "elfbits" in action.attrs and
+                    "elfhash" in action.attrs):
+                        return misc.EmptyDict
+
+                elf_name = os.path.join(self._tmpdir,
+                    ".temp-{0}".format(fname))
+                with open(elf_name, "wb") as elf_file:
+                        elf_file.write(data)
+
+                try:
+                        elf_info = elf.get_info(elf_name)
+                except elf.ElfError as e:
+                        raise TransactionError(e)
+
+                attrs = {}
+                try:
+                        # Check which content checksums to
+                        # compute and add to the action
+                        elf256 = "pkg.content-type.sha256"
+                        elf1 = "elfhash"
+
+                        if elf256 in \
+                            digest.DEFAULT_CONTENT_HASH_ATTRS:
+                                get_sha256 = True
+                        else:
+                                get_sha256 = False
+
+                        if elf1 in \
+                            digest.DEFAULT_CONTENT_HASH_ATTRS:
+                                get_sha1 = True
+                        else:
+                                get_sha1 = False
+
+                        hashes = elf.get_hashes(elf_name,
+                            sha1=get_sha1, sha256=get_sha256)
+
+                        if get_sha1:
+                                attrs[elf1] = hashes[elf1]
+
+                        if get_sha256:
+                                attrs[elf256] = \
+                                    hashes[elf256]
+
+                except elf.ElfError:
+                        pass
+                attrs["elfbits"] = str(elf_info["bits"])
+                attrs["elfarch"] = elf_info["arch"]
+                os.unlink(elf_name)
+                return attrs
+
+        def __get_compressed_attrs(self, fhash, data, size):
+                """Given a fhash, data, and size of a file, returns a tuple
+                of (csize, chashes) where 'csize' is the size of the file
+                in the repository and 'chashes' is a dictionary containing
+                any hashes of the compressed data known by the repository."""
+
+                if self.__local or self.__uploaded < \
+                    self.transport.cfg.max_transfer_checks:
+                        # If the repository is local (filesystem-based) or
+                        # number of files uploaded is less than
+                        # max_transfer_checks, call get_compressed_attrs()...
+                        csize, chashes = self.transport.get_compressed_attrs(
+                            fhash, pub=self.publisher, trans_id=self.trans_id)
+                else:
+                        # ...or the repository is not filesystem-based and
+                        # enough files are missing that we want to avoid the
+                        # overhead of calling get_compressed_attrs().
+                        csize, chashes = None, None
+
+                if chashes:
+                        # If any of the default content hash attributes we need
+                        # is not available from the repository, they must be
+                        # recomputed below.
+                        for k in digest.DEFAULT_CHASH_ATTRS:
+                                if k not in chashes:
+                                        chashes = None
+                                        break
+                return csize, chashes
+
+        def _process_action(self, action, exact=False, path=None):
+                """Adds all expected attributes to the provided action and
+                upload the file for the action if needed.
+
+                If 'exact' is True and 'path' is 'None', the action won't
+                be modified and no file will be uploaded.
+
+                If 'exact' is True and a 'path' is provided, the file of that
+                path will be uploaded as-is (it is assumed that the file is
+                already in repository format).
+                """
+
+                if self._append_mode and action.name != "signature":
+                        raise TransactionOperationError(non_sig=True)
+
+                size = int(action.attrs.get("pkg.size", 0))
+
+                if action.has_payload and size <= 0:
+                        # XXX hack for empty files
+                        action.data = lambda: open(os.devnull, "rb")
+
+                if action.data is None:
+                        return
+
+                if exact:
+                        if path:
+                                self.add_file(path, basename=action.hash,
+                                    progtrack=self.progtrack)
+                        return
+
+                # Get all hashes for this action.
+                hashes, data = misc.get_data_digest(action.data(),
+                    length=size, return_content=True,
+                    hash_attrs=digest.DEFAULT_HASH_ATTRS,
+                    hash_algs=digest.HASH_ALGS)
+                # Set the hash member for backwards compatibility and
+                # remove it from the dictionary.
+                action.hash = hashes.pop("hash", None)
+                action.attrs.update(hashes)
+
+                # Now set the hash value that will be used for storing the file
+                # in the repository.
+                hash_attr, hash_val, hash_func = \
+                    digest.get_least_preferred_hash(action)
+                fname = hash_val
+
+                hdata = self.__uploads.get(fname)
+                if hdata is not None:
+                        elf_attrs, csize, chashes = hdata
+                else:
+                        # We haven't processed this file before, determine if
+                        # it needs to be uploaded and what information the
+                        # repository knows about it.
+                        elf_attrs = self.__get_elf_attrs(action, fname, data)
+                        csize, chashes = self.__get_compressed_attrs(fname,
+                            data, size)
+
+                        # 'csize' indicates that if file needs to be uploaded.
+                        fileneeded = csize is None
+                        if fileneeded:
+                                fpath = os.path.join(self._tmpdir, fname)
+                                csize, chashes = misc.compute_compressed_attrs(
+                                    fname, data=data, size=size,
+                                    compress_dir=self._tmpdir)
+                                # Upload the compressed file for each action.
+                                self.add_file(fpath, basename=fname,
+                                    progtrack=self.progtrack)
+                                os.unlink(fpath)
+                                self.__uploaded += 1
+                        elif not chashes:
+                                # If not fileneeded, and repository can't
+                                # provide desired hashes, call
+                                # compute_compressed_attrs() in a way that
+                                # avoids writing the file to get the attributes
+                                # we need.
+                                csize, chashes = misc.compute_compressed_attrs(
+                                    fname, data=data, size=size)
+
+                        self.__uploads[fname] = (elf_attrs, csize, chashes)
+
+                action.attrs.update(elf_attrs)
+                action.attrs.update(chashes)
+                action.attrs["pkg.csize"] = csize
+
+        def add_file(self, pth, basename=None, progtrack=None):
                 """Adds an additional file to the inflight transaction so that
                 it will be available for retrieval once the transaction is
                 closed."""
@@ -286,12 +489,39 @@
 
                 try:
                         self.transport.publish_add_file(self.publisher,
-                            pth=pth, trans_id=self.trans_id)
+                            pth=pth, trans_id=self.trans_id, basename=basename,
+                            progtrack=progtrack)
                 except apx.TransportError as e:
                         msg = str(e)
                         raise TransactionOperationError("add_file",
                             trans_id=self.trans_id, msg=msg)
 
+        def add_manifest(self, pth):
+                """Adds an additional manifest to the inflight transaction so
+                that it will be available for retrieval once the transaction is
+                closed."""
+
+                if not os.path.isfile(pth):
+                        raise TransactionOperationError("add_manifest",
+                            trans_id=self.trans_id, msg=str(_("The file to "
+                            "be added is not a file.  The path given was {0}.").format(
+                            pth)))
+
+                try:
+                        self.transport.publish_add_manifest(self.publisher,
+                            pth=pth, trans_id=self.trans_id)
+                except apx.TransportError as e:
+                        msg = str(e)
+                        raise TransactionOperationError("add_manifest",
+                            trans_id=self.trans_id, msg=msg)
+
+        def _cleanup_upload(self):
+                """Remove any temporary files generated in upload mode."""
+
+                if self._tmpdir:
+                        # we don't care if this fails.
+                        shutil.rmtree(self._tmpdir, ignore_errors=True)
+
         def close(self, abandon=False, add_to_catalog=True):
                 """Ends an in-flight transaction.  Returns a tuple containing
                 a package fmri (if applicable) and the final state of the
@@ -306,6 +536,7 @@
                 """
 
                 if abandon:
+                        self.__transactions.pop(self.trans_id, None)
                         try:
                                 state, fmri = self.transport.publish_abandon(
                                     self.publisher, trans_id=self.trans_id)
@@ -313,7 +544,19 @@
                                 msg = str(e)
                                 raise TransactionOperationError("abandon",
                                     trans_id=self.trans_id, msg=msg)
+                        finally:
+                                self._cleanup_upload()
+
                 else:
+                        man = self.__transactions.get(self.trans_id)
+                        if man is not None:
+                                # upload manifest here
+                                path = os.path.join(self._tmpdir, "manifest")
+                                with open(path, "w") as f:
+                                        f.write(man)
+                                self.add_manifest(path)
+                                self.__transactions.pop(self.trans_id, None)
+
                         try:
                                 state, fmri = self.transport.publish_close(
                                     self.publisher, trans_id=self.trans_id,
@@ -322,9 +565,34 @@
                                 msg = str(e)
                                 raise TransactionOperationError("close",
                                     trans_id=self.trans_id, msg=msg)
+                        finally:
+                                self._cleanup_upload()
 
                 return state, fmri
 
+        def _init_upload(self):
+                """Initialization for upload mode."""
+
+                if self._upload_mode or self._upload_mode is not None:
+                        return
+
+                op = "init_upload"
+                try:
+                        self._upload_mode = self.transport.supports_version(
+                            self.publisher, "manifest", [1]) > -1
+                except apx.TransportError as e:
+                        msg = str(e)
+                        raise TransactionOperationError(op,
+                            trans_id=self.trans_id, msg=msg)
+
+                if not self._upload_mode:
+                        return
+
+                # Create temporary directory and initialize self.__transactions.
+                temp_root = misc.config_temp_root()
+                self._tmpdir = tempfile.mkdtemp(dir=temp_root)
+                self.__transactions.setdefault(self.trans_id, "")
+
         def open(self):
                 """Starts an in-flight transaction. Returns a URL-encoded
                 transaction ID on success."""
@@ -347,12 +615,15 @@
                             msg=_("Unknown failure; no transaction ID provided"
                             " in response."))
 
+                self._init_upload()
+
                 return self.trans_id
 
         def append(self):
                 """Starts an in-flight transaction to append to an existing
                 manifest. Returns a URL-encoded transaction ID on success."""
 
+                self._append_mode = True
                 trans_id = None
 
                 try:
@@ -371,6 +642,8 @@
                             msg=_("Unknown failure; no transaction ID provided"
                             " in response."))
 
+                self._init_upload()
+
                 return self.trans_id
 
         def refresh_index(self):
--- a/src/modules/server/depot.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/server/depot.py	Fri Jun 24 18:04:04 2016 -0700
@@ -641,6 +641,28 @@
 
         manifest_0._cp_config = { "response.stream": True }
 
+        def manifest_1(self, *tokens):
+                """Outputs the contents of the manifest or uploads the
+                manifest."""
+
+                method = cherrypy.request.method
+                if method == "GET":
+                        return self.manifest_0(*tokens)
+                elif method in ("POST", "PUT"):
+                        return self.__upload_manifest(*tokens)
+                raise cherrypy.HTTPError(http_client.METHOD_NOT_ALLOWED,
+                    "{0} is not allowed".format(method))
+
+        # We need to prevent cherrypy from processing the request body so that
+        # manifest can parse the request body itself.  In addition, we also need
+        # to set the timeout higher since the default is five minutes; not
+        # really enough for a slow connection to upload content.
+        manifest_1._cp_config = {
+            "request.process_request_body": False,
+            "response.timeout": 3600,
+            "response.stream": True
+        }
+
         @staticmethod
         def _tar_stream_close(**kwargs):
                 """This is a special function to finish a tar_stream-based
@@ -715,6 +737,48 @@
             "response.stream": True
         }
 
+        def file_2(self, *tokens):
+                """Outputs the contents of the file, named by the SHA hash
+                name in the request path, directly to the client."""
+
+                method = cherrypy.request.method
+                if method == "HEAD":
+                        try:
+                                fhash = tokens[0]
+                        except IndexError:
+                                fhash = None
+
+                        try:
+                                fpath = self.repo.file(fhash,
+                                    pub=self._get_req_pub())
+                        except srepo.RepositoryFileNotFoundError as e:
+                                raise cherrypy.HTTPError(http_client.NOT_FOUND,
+                                    str(e))
+                        except srepo.RepositoryError as e:
+                                # Treat any remaining repository error as a 404,
+                                # but log the error and include the real failure
+                                # information.
+                                cherrypy.log("Request failed: {0}".format(
+                                    str(e)))
+                                raise cherrypy.HTTPError(http_client.NOT_FOUND,
+                                    str(e))
+
+                        csize, chashes = misc.compute_compressed_attrs(fhash,
+                            file_path=fpath)
+                        response = cherrypy.response
+                        for i, attr in enumerate(chashes):
+                                response.headers["X-Ipkg-Attr-{0}".format(i)] = \
+                                    "{0}={1}".format(attr, chashes[attr])
+
+                        # set expiration of response to one day
+                        self.__set_response_expires("file", 86400, 86400)
+
+                        return serve_file(fpath, "application/data")
+
+                return self.file_1(*tokens)
+
+        file_2._cp_config = { "response.stream": True }
+
         @cherrypy.tools.response_headers(headers=[("Pragma", "no-cache"),
             ("Cache-Control", "no-cache, no-transform, must-revalidate"),
             ("Expires", 0)])
@@ -1045,9 +1109,46 @@
                         raise cherrypy.HTTPError(http_client.BAD_REQUEST,
                             _("file/1 must be sent a file."))
                 data = request.rfile
+                attrs = dict(
+                    val.split("=", 1)
+                    for hdr, val in request.headers.items()
+                    if hdr.lower().startswith("x-ipkg-setattr")
+                )
+                basename = attrs.get("basename", None)
+                try:
+                        self.repo.add_file(trans_id, data, basename, size)
+                except srepo.RepositoryError as e:
+                        # Assume a bad request was made.  A 404 can't be
+                        # returned here as misc.versioned_urlopen will interpret
+                        # that to mean that the server doesn't support this
+                        # operation.
+                        raise cherrypy.HTTPError(http_client.BAD_REQUEST, str(e))
+                response.headers["Content-Length"] = "0"
+                return response.body
+
+        def __upload_manifest(self, *tokens):
+                """Adds a file to an in-flight transaction for the Transaction
+                ID specified in the request path.  The content is expected to be
+                in the request body.  Returns no output."""
 
                 try:
-                        self.repo.add_file(trans_id, data, size)
+                        # cherrypy decoded it, but we actually need it encoded.
+                        trans_id = quote(tokens[0], "")
+                except IndexError:
+                        raise
+                        trans_id = None
+
+                request = cherrypy.request
+                response = cherrypy.response
+
+                size = int(request.headers.get("Content-Length", 0))
+                if size < 0:
+                        raise cherrypy.HTTPError(http_client.BAD_REQUEST,
+                            _("manifest/1 must be sent a file."))
+                data = request.rfile
+
+                try:
+                        self.repo.add_manifest(trans_id, data)
                 except srepo.RepositoryError as e:
                         # Assume a bad request was made.  A 404 can't be
                         # returned here as misc.versioned_urlopen will interpret
--- a/src/modules/server/repository.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/server/repository.py	Fri Jun 24 18:04:04 2016 -0700
@@ -1162,7 +1162,7 @@
                 finally:
                         self.__unlock_rstore()
 
-        def add_file(self, trans_id, data, size=None):
+        def add_file(self, trans_id, data, basename=None, size=None):
                 """Adds a file to an in-flight transaction.
 
                 'trans_id' is the identifier of a transaction that
@@ -1171,6 +1171,8 @@
                 'data' is the string object containing the payload of the
                 file to add.
 
+                'basename' is the basename of the file.
+
                 'size' is an optional integer value indicating the size of
                 the provided payload.
                 """
@@ -1184,7 +1186,31 @@
 
                 t = self.__get_transaction(trans_id)
                 try:
-                        t.add_file(data, size)
+                        t.add_file(data, basename, size)
+                except trans.TransactionError as e:
+                        raise RepositoryError(e)
+                return
+
+        def add_manifest(self, trans_id, data):
+                """Adds a manifest to an in-flight transaction.
+
+                'trans_id' is the identifier of a transaction that
+                the manifest should be added to.
+
+                'data' is the string object containing the payload of the
+                manifest to add.
+                """
+
+                if self.mirror:
+                        raise RepositoryMirrorError()
+                if self.read_only:
+                        raise RepositoryReadOnlyError()
+                if not self.trans_root:
+                        raise RepositoryUnsupportedOperationError()
+
+                t = self.__get_transaction(trans_id)
+                try:
+                        t.add_manifest(data)
                 except trans.TransactionError as e:
                         raise RepositoryError(e)
                 return
@@ -3512,12 +3538,20 @@
                                 continue
                         rstore.add_content(refresh_index=refresh_index)
 
-        def add_file(self, trans_id, data, size=None):
+        def add_file(self, trans_id, data, basename=None, size=None):
                 """Adds a file to a transaction with the specified Transaction
                 ID."""
 
                 rstore = self.get_trans_rstore(trans_id)
-                return rstore.add_file(trans_id, data=data, size=size)
+                return rstore.add_file(trans_id, data=data, basename=basename,
+                    size=size)
+
+        def add_manifest(self, trans_id, data):
+                """Adds a manifest to a transaction with the specified
+                Transaction ID."""
+
+                rstore = self.get_trans_rstore(trans_id)
+                return rstore.add_manifest(trans_id, data=data)
 
         def rebuild(self, build_catalog=True, build_index=False, pub=None):
                 """Rebuilds the repository catalog and search indexes using the
--- a/src/modules/server/transaction.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/server/transaction.py	Fri Jun 24 18:04:04 2016 -0700
@@ -33,6 +33,7 @@
 import shutil
 import six
 import time
+import zlib
 from six.moves.urllib.parse import quote, unquote
 
 import pkg.actions as actions
@@ -472,7 +473,7 @@
                         # get all hashes for this action
                         hashes, data = misc.get_data_digest(action.data(),
                             length=size, return_content=True,
-                            hash_attrs=digest.DEFAULT_HASH_ATTRS,
+                            hash_attrs=digest.LEGACY_HASH_ATTRS,
                             hash_algs=digest.HASH_ALGS)
 
                         # set the hash member for backwards compatibility and
@@ -510,13 +511,13 @@
                                         elf1 = "elfhash"
 
                                         if elf256 in \
-                                            digest.DEFAULT_CONTENT_HASH_ATTRS:
+                                            digest.LEGACY_CONTENT_HASH_ATTRS:
                                                 get_sha256 = True
                                         else:
                                                 get_sha256 = False
 
                                         if elf1 in \
-                                            digest.DEFAULT_CONTENT_HASH_ATTRS:
+                                            digest.LEGACY_CONTENT_HASH_ATTRS:
                                                 get_sha1 = True
                                         else:
                                                 get_sha1 = False
@@ -550,10 +551,8 @@
                         csize, chashes = misc.compute_compressed_attrs(
                             fname, dst_path, data, size, self.dir)
                         for attr in chashes:
-                                action.attrs[attr] = chashes[attr].hexdigest()
+                                action.attrs[attr] = chashes[attr]
                         action.attrs["pkg.csize"] = csize
-                        chash = None
-                        data = None
 
                 self.remaining_payload_cnt = \
                     len(action.attrs.get("chain.sizes", "").split())
@@ -613,8 +612,39 @@
 
                 self.types_found.add(action.name)
 
-        def add_file(self, f, size=None):
+        def add_file(self, f, basename=None, size=None):
                 """Adds the file to the Transaction."""
+
+                # If basename provided, just store the file as-is with the
+                # basename.
+                if basename:
+                        fileneeded = True
+                        try:
+                                dst_path = self.rstore.file(basename)
+                                fileneeded = False
+                        except Exception as e:
+                                dst_path = os.path.join(self.dir, basename)
+
+                        if not fileneeded:
+                                return
+
+                        if isinstance(f, six.string_types):
+                                portable.copyfile(f, dst_path)
+                                return
+
+                        bufsz = 128 * 1024
+                        if bufsz > size:
+                                bufsz = size
+
+                        with open(dst_path, "wb") as wf:
+                                while True:
+                                        data = f.read(bufsz)
+                                        # data is bytes
+                                        if data == b"":
+                                                break
+                                        wf.write(data)
+                        return
+
                 hashes, data = misc.get_data_digest(f, length=size,
                     return_content=True, hash_attrs=digest.DEFAULT_HASH_ATTRS,
                     hash_algs=digest.HASH_ALGS)
@@ -636,15 +666,82 @@
                                 raise
                         dst_path = None
 
-                csize, chashes = misc.compute_compressed_attrs(fname, dst_path,
+                misc.compute_compressed_attrs(fname, dst_path,
                     data, size, self.dir,
                     chash_attrs=digest.DEFAULT_CHASH_ATTRS,
                     chash_algs=digest.CHASH_ALGS)
-                chashes = None
-                data = None
 
                 self.remaining_payload_cnt -= 1
 
+        def add_manifest(self, f):
+                """Adds the manifest to the Transaction."""
+
+                if isinstance(f, six.string_types):
+                        f = open(f, "rb")
+                # Store the manifest file.
+                fpath = os.path.join(self.dir, "manifest")
+                with open(fpath, "ab+") as wf:
+                        try:
+                                misc.gunzip_from_stream(f, wf, ignore_hash=True)
+                                wf.seek(0)
+                                content = wf.read()
+                        except zlib.error:
+                                # No need to decompress it if it's not a gzipped
+                                # file.
+                                f.seek(0)
+                                content = f.read()
+                                wf.write(content)
+                # Do some sanity checking on packages marked or being marked
+                # obsolete or renamed.
+                m = pkg.manifest.Manifest()
+                m.set_content(misc.force_str(content))
+                for action in m.gen_actions():
+                        if action.name == "set" and \
+                            action.attrs["name"] == "pkg.obsolete" and \
+                            action.attrs["value"] == "true":
+                                self.obsolete = True
+                                if self.types_found.difference(
+                                    set(("set", "signature"))):
+                                        raise TransactionOperationError(_("An obsolete "
+                                            "package cannot contain actions other than "
+                                            "'set' and 'signature'."))
+                        elif action.name == "set" and \
+                            action.attrs["name"] == "pkg.renamed" and \
+                            action.attrs["value"] == "true":
+                                self.renamed = True
+                                if self.types_found.difference(
+                                    set(("depend", "set", "signature"))):
+                                        raise TransactionOperationError(_("A renamed "
+                                            "package cannot contain actions other than "
+                                            "'set', 'depend', and 'signature'."))
+
+                        if not self.has_reqdeps and action.name == "depend" and \
+                            action.attrs["type"] == "require":
+                                self.has_reqdeps = True
+
+                        if self.obsolete and self.renamed:
+                                # Reset either obsolete or renamed, depending on which
+                                # action this was.
+                                if action.attrs["name"] == "pkg.obsolete":
+                                        self.obsolete = False
+                                else:
+                                        self.renamed = False
+                                raise TransactionOperationError(_("A package may not "
+                                    " be marked for both obsoletion and renaming."))
+                        elif self.obsolete and action.name not in ("set", "signature"):
+                                raise TransactionOperationError(_("A '{type}' action "
+                                    "cannot be present in an obsolete package: "
+                                    "{action}").format(
+                                    type=action.name, action=action))
+                        elif self.renamed and action.name not in \
+                            ("depend", "set", "signature"):
+                                raise TransactionOperationError(_("A '{type}' action "
+                                    "cannot be present in a renamed package: "
+                                    "{action}").format(
+                            type=action.name, action=action))
+
+                        self.types_found.add(action.name)
+
         def accept_publish(self, add_to_catalog=True):
                 """Transaction meets consistency criteria, and can be published.
                 Publish, making appropriate catalog entries."""
--- a/src/pull.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/pull.py	Fri Jun 24 18:04:04 2016 -0700
@@ -321,8 +321,10 @@
         sendb = 0
         sendcb = 0
 
+        hashes = set()
         for a in mfst.gen_actions():
-                if a.has_payload:
+                if a.has_payload and a.hash not in hashes:
+                        hashes.add(a.hash)
                         getb += get_pkg_otw_size(a)
                         getf += 1
                         sendb += int(a.attrs.get("pkg.size", 0))
@@ -336,9 +338,11 @@
         """Takes a manifest and a multi object and adds the hashes to the multi
         object."""
 
+        hashes = set()
         for a in mfst.gen_actions():
-                if a.has_payload:
+                if a.has_payload and a.hash not in hashes:
                         multi.add_action(a)
+                        hashes.add(a.hash)
 
 def prune(fmri_list, all_versions, all_timestamps):
         """Returns a filtered version of fmri_list based on the provided
@@ -1518,12 +1522,11 @@
                                 # mogrify is done.
                                 nm = m
 
-                        getb, getf, sendb, sendcb = get_sizes(nm)
+                        getb, getf = get_sizes(nm)[:2]
                         if republish:
-                                # For now, normal republication always uses
-                                # uncompressed data as already compressed data
-                                # is not supported for publication.
-                                send_bytes += sendb
+                                send_bytes += dest_xport.get_transfer_size(
+                                    new_targ_pubs[nf.publisher],
+                                    nm.gen_actions())
 
                         # Store a mapping between new fmri and new manifest for
                         # future use.
@@ -1574,7 +1577,14 @@
                         continue
 
                 processed = 0
+                uploads = set()
                 pkgs_to_get = sorted(pkgs_to_get)
+                hashes = set()
+                if republish and pkgs_to_get:
+                        # If files can be transferred compressed, keep them
+                        # compressed in the source.
+                        keep_compressed, hashes = dest_xport.get_transfer_info(
+                            new_targ_pubs[pkgs_to_get[0].publisher])
                 for nf in pkgs_to_get:
                         tracker.republish_start_pkg(nf)
                         # Processing republish.
@@ -1629,13 +1639,28 @@
                                                 # added to the manifest.
                                                 continue
 
+                                        fname = None
+                                        fhash = None
                                         if a.has_payload:
+                                                fhash = a.hash
                                                 fname = os.path.join(pkgdir,
-                                                    a.hash)
+                                                    fhash)
 
                                                 a.data = lambda: open(fname,
                                                     "rb")
-                                        t.add(a)
+
+                                        if fhash in hashes and \
+                                            fhash not in uploads:
+                                                # If the payload will be
+                                                # transferred and not have been
+                                                # uploaded, upload it...
+                                                t.add(a, exact=True, path=fname)
+                                                uploads.add(fhash)
+                                        else:
+                                                # ...otherwise, just add the
+                                                # action to the transaction.
+                                                t.add(a, exact=True)
+
                                         if a.name == "signature" and \
                                             not do_mog:
                                                 # We always store content in the
@@ -1645,7 +1670,11 @@
                                                     least_preferred=True):
                                                         fname = os.path.join(
                                                             pkgdir, fp)
-                                                        t.add_file(fname)
+                                                        if keep_compressed:
+                                                                t.add_file(fname,
+                                                                    basename=fp)
+                                                        else:
+                                                                t.add_file(fname)
                                 # Always defer catalog update.
                                 t.close(add_to_catalog=False)
                         except trans.TransactionError as e:
--- a/src/tests/cli/t_pkgrecv.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/tests/cli/t_pkgrecv.py	Fri Jun 24 18:04:04 2016 -0700
@@ -1496,6 +1496,18 @@
                 tact = list(tm.gen_actions_by_type('file'))[0]
                 self.assertEqual("42." + oelfhash, tact.attrs["elfhash"])
 
+        def test_16_recv_old_republish(self):
+                """Verify that older logic of republication in pkgrecv works."""
+
+                f = fmri.PkgFmri(self.published[3], None)
+
+                self.dcs[2].stop()
+                self.dcs[2].set_disable_ops(["manifest/1"])
+                self.dcs[2].start()
+
+                self.pkgrecv(self.durl1, "-d {0} {1}".format(self.durl2, f))
+                self.dcs[2].unset_disable_ops()
+
 
 class TestPkgrecvHTTPS(pkg5unittest.HTTPSTestClass):
 
--- a/src/tests/cli/t_pkgsend.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/tests/cli/t_pkgsend.py	Fri Jun 24 18:04:04 2016 -0700
@@ -1215,23 +1215,7 @@
                         # the expected name.
                         shutil.rmtree(rpath)
 
-        def test_22_publish(self):
-                """Verify that pkgsend publish works as expected."""
-
-                rootdir = self.test_root
-                dir_1 = os.path.join(rootdir, "dir_1")
-                dir_2 = os.path.join(rootdir, "dir_2")
-                os.mkdir(dir_1)
-                os.mkdir(dir_2)
-                open(os.path.join(dir_1, "A"), "w").close()
-                open(os.path.join(dir_2, "B"), "w").close()
-                mfpath = os.path.join(rootdir, "manifest_test")
-                with open(mfpath, "w") as mf:
-                        mf.write("""file NOHASH mode=0755 owner=root group=bin path=/A
-                            file NOHASH mode=0755 owner=root group=bin path=/B
-                            set name=pkg.fmri [email protected],5.10
-                            """)
-
+        def __test_publish(self, dir_1, dir_2, mfpath):
                 dhurl = self.dc.get_depot_url()
                 # -s may be specified either as a global option or as a local
                 # option for the publish subcommand.
@@ -1254,6 +1238,31 @@
                 self.pkg("verify")
                 self.image_destroy()
 
+        def test_22_publish(self):
+                """Verify that pkgsend publish works as expected."""
+
+                rootdir = self.test_root
+                dir_1 = os.path.join(rootdir, "dir_1")
+                dir_2 = os.path.join(rootdir, "dir_2")
+                os.mkdir(dir_1)
+                os.mkdir(dir_2)
+                open(os.path.join(dir_1, "A"), "w").close()
+                open(os.path.join(dir_2, "B"), "w").close()
+                mfpath = os.path.join(rootdir, "manifest_test")
+                with open(mfpath, "w") as mf:
+                        mf.write("""file NOHASH mode=0755 owner=root group=bin path=/A
+                            file NOHASH mode=0755 owner=root group=bin path=/B
+                            set name=pkg.fmri [email protected],5.10
+                            """)
+                self.__test_publish(dir_1, dir_2, mfpath)
+
+                # Verify that older logic for pkgsend publish works.
+                self.dc.stop()
+                self.dc.set_disable_ops(["manifest/1"])
+                self.dc.start()
+                self.__test_publish(dir_1, dir_2, mfpath)
+                self.dc.unset_disable_ops()
+
         def test_23_pkgsend_no_version(self):
                 """Verify that FMRI without version cannot be specified."""
 
@@ -1330,7 +1339,8 @@
                 self.pkgsend("", "-s {0} publish {1}".format(furi, mfpath))
                 self.image_create(furi)
                 self.pkg("contents -rm multihash")
-                self.assertTrue("pkg.hash.{0}=spaghetti".format(hash_alg in self.output))
+                self.assertTrue("pkg.hash.{0}=spaghetti".format(
+                    hash_alg) in self.output)
 
                 self.pkgsend("", "-s {0} publish {1}".format(furi, mfpath),
                     debug_hash="sha1+{0}".format(hash_alg))
--- a/src/tests/cli/t_pkgsign.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/tests/cli/t_pkgsign.py	Fri Jun 24 18:04:04 2016 -0700
@@ -657,7 +657,7 @@
 
                 self.pkg("install example_pkg", exit=1)
 
-        def test_sign_5(self):
+        def base_sign_5(self):
                 """Test that http repos work."""
 
                 self.dcs[1].start()
@@ -685,6 +685,16 @@
                 api_obj = self.get_img_api_obj()
                 self._api_install(api_obj, ["example_pkg"])
 
+        def test_sign_5(self):
+                """Test that http repos work."""
+
+                self.base_sign_5()
+
+                # Verify that older logic of publication api works.
+                self.dcs[1].stop()
+                self.dcs[1].set_disable_ops(["manifest/1"])
+                self.base_sign_5()
+
         def test_length_two_chains(self):
                 """Check that chains of length two work correctly."""
 
@@ -2008,7 +2018,10 @@
 
                 plist = self.pkgsend_bulk(self.rurl1, self.example_pkg10)
 
-                self.dcs[1].set_disable_ops(["add"])
+                # New publication uses manifest/1 to upload manifest as-is
+                # and avoid using add ops. Disable manifest/1 to fall back
+                # to older logic here for testing.
+                self.dcs[1].set_disable_ops(["add", "manifest/1"])
                 self.dcs[1].start()
 
                 sign_args = "-k {key} -c {cert} {pkg}".format(
@@ -2023,7 +2036,11 @@
                 """Test that publishing to a depot which doesn't support file
                 fails as expected."""
 
-                self.dcs[1].set_disable_ops(["file"])
+                # New publication uses manifest/1 which uses file/1, so if we
+                # disable file ops, we can't use the new publication model.
+                # Disable manifest/1 to fall back to older logic here for
+                # testing.
+                self.dcs[1].set_disable_ops(["file", "manifest/1"])
                 self.dcs[1].start()
 
                 plist = self.pkgsend_bulk(self.durl1, self.example_pkg10)
@@ -2842,12 +2859,13 @@
 
         def setUp(self):
                 pkg5unittest.ManyDepotTestCase.setUp(self,
-                    ["test", "test", "crl"])
+                    ["test", "test", "crl", "test"])
                 self.make_misc_files(self.misc_files)
                 self.durl1 = self.dcs[1].get_depot_url()
                 self.rurl1 = self.dcs[1].get_repo_url()
                 self.durl2 = self.dcs[2].get_depot_url()
                 self.rurl2 = self.dcs[2].get_repo_url()
+                self.durl4 = self.dcs[4].get_depot_url()
                 DebugValues["crl_host"] = self.dcs[3].get_depot_url()
                 self.ta_dir = None
 
@@ -3073,16 +3091,27 @@
                 are signed with the same certificates and intermediate
                 certificates are involved, pkgrecv continues to work."""
 
+                self.__test_sign_pkgrecv_cache_sign_interaction()
+                # Verify that older logic of publication api works.
+                self.dcs[1].stop()
+                self.dcs[2].stop()
+                self.dcs[1].set_disable_ops(["manifest/1"])
+                self.dcs[2].set_disable_ops(["manifest/1"])
+                self.__test_sign_pkgrecv_cache_sign_interaction()
+
+        def __test_sign_pkgrecv_cache_sign_interaction(self):
+                self.dcs[1].start()
+                self.dcs[2].start()
                 manf = """
 open a@1,5.11-0
 close
 """
-                self.pkgsend_bulk(self.rurl2, manf)
+                self.pkgsend_bulk(self.durl2, manf)
                 manf = """
 open b@1,5.11-0
 close
 """
-                self.pkgsend_bulk(self.rurl2, manf)
+                self.pkgsend_bulk(self.durl2, manf)
 
                 ta_path = os.path.join(self.raw_trust_anchor_dir,
                     "ta2_cert.pem")
@@ -3093,11 +3122,11 @@
                       pkg="'*'"
                    )
 
-                self.pkgsign(self.rurl2, sign_args)
+                self.pkgsign(self.durl2, sign_args)
 
                 cache_dir = os.path.join(self.test_root, "cache")
-                self.pkgrecv(self.rurl2, "-c {0} -d {1} '*'".format(
-                    cache_dir, self.rurl1))
+                self.pkgrecv(self.durl2, "-c {0} -d {1} '*'".format(
+                    cache_dir, self.durl1))
 
         def test_sign_pkgrecv_a(self):
                 """Check that signed packages can be archived."""
@@ -3236,6 +3265,58 @@
                                         cnt += 1
                 self.assertEqual(cnt, 2)
 
+        def test_sign_pkgrecv_across_repositories(self):
+                """Check that signed packages can be pkgrecved to a new
+                repository that enables new hashes but the new hashes won't
+                be added to the packages so that the existing signatures won't
+                be invalidated"""
+
+                # We create an image simply so we can use "contents -g" to
+                # inspect the repository.
+                self.image_create()
+                self.dcs[1].start()
+                self.dcs[2].start()
+                plist = self.pkgsend_bulk(self.rurl2, self.example_pkg10)
+                ta_path = os.path.join(self.raw_trust_anchor_dir,
+                    "ta3_cert.pem")
+                sign_args = "-k {key} -c {cert} -i {ch1} -i {ta} " \
+                    "{name}".format(**{
+                        "name": plist[0],
+                        "key": os.path.join(self.keys_dir,
+                            "cs1_ch1_ta3_key.pem"),
+                        "cert": os.path.join(self.cs_dir,
+                            "cs1_ch1_ta3_cert.pem"),
+                        "ch1": os.path.join(self.chain_certs_dir,
+                            "ch1_ta3_cert.pem"),
+                        "ta": ta_path,
+                })
+
+                self.pkgsign(self.rurl2, sign_args)
+                self.pkgrecv(self.rurl2, "-d {0} example_pkg".format(self.durl1))
+                self.pkg("contents -g {0} -m example_pkg".format(self.durl1))
+                self.assertTrue("pkg.hash.sha256" not in self.output)
+                self.image_create(self.durl1)
+                self.seed_ta_dir("ta3")
+                self.pkg("set-property signature-policy verify")
+                self.pkg("install example_pkg")
+                self.image_destroy()
+
+                self.dcs[4].set_debug_feature("hash=sha1+sha256")
+                self.dcs[4].start()
+                self.image_create(self.durl4, destroy=True)
+                # pkgrecv to a new repository which enables SHA-2 hashes
+                self.pkgrecv(self.durl1, "-d {0} example_pkg".format(self.durl4))
+                self.pkg("contents -g {0} -m example_pkg".format(self.durl4))
+                # make sure that we don not get multiple hashes
+                self.assertTrue("pkg.hash.sha256" not in self.output)
+                self.seed_ta_dir("ta3")
+                self.pkg("set-property signature-policy verify")
+                # should not invalidate the signature
+                self.pkg("install example_pkg")
+
+                self.dcs[4].stop()
+                self.dcs[4].unset_debug_feature("hash=sha1+sha256")
+
 
 if __name__ == "__main__":
         unittest.main()
--- a/src/util/publish/pkgmerge.py	Fri Jun 24 10:34:56 2016 -0700
+++ b/src/util/publish/pkgmerge.py	Fri Jun 24 18:04:04 2016 -0700
@@ -412,13 +412,9 @@
                                     patterns=processdict[entry]))
                                 continue
 
-                # we're ready to merge
-                if not dry_run:
-                        target_pub = transport.setup_publisher(dest_repo,
-                            pub.prefix, dest_xport, dest_xport_cfg,
-                            remote_prefix=True)
-                else:
-                        target_pub = None
+                target_pub = transport.setup_publisher(dest_repo,
+                    pub.prefix, dest_xport, dest_xport_cfg,
+                    remote_prefix=True)
 
                 tracker.republish_set_goal(len(processdict), 0, 0)
                 # republish packages for this publisher. If we encounter any
@@ -504,10 +500,8 @@
                 # Determine total bytes to send for this package; this must be
                 # done using the manifest since retrievals are coalesced based
                 # on hash, but sends are not.
-                sendbytes = sum(
-                    int(a.attrs.get("pkg.size", 0))
-                    for a in man.gen_actions()
-                )
+                sendbytes = dest_xport.get_transfer_size(target_pub,
+                    man.gen_actions())
 
                 f = man.fmri