9203 pkgrecv should check for valid destination before beginning download
authorjohansen <johansen@opensolaris.org>
Thu, 15 Jul 2010 15:40:39 -0700
changeset 1974 6af683c5bde5
parent 1973 6da2ece05476
child 1975 f9c11db6b94c
9203 pkgrecv should check for valid destination before beginning download 9872 publication api should use modern transport subsystem 9873 pkgrecv operation failure case could be improved 9910 pkgrecv doesn't limit POST sizes on filelist requests 10444 pkgrecv should use new client transport subsystem 12519 merge.py should support file:// URLs 12646 pkgrecv should print error text instead of function address 13941 download progress assertion with pkgrecv 15320 pkgsend will traceback if unable to parse server error response 16534 importer should use modern transport 16535 solaris.py is defunct and should be removed
src/man/pkgrecv.1.txt
src/modules/actions/license.py
src/modules/client/image.py
src/modules/client/transport/__init__.py
src/modules/client/transport/engine.py
src/modules/client/transport/fileobj.py
src/modules/client/transport/repo.py
src/modules/client/transport/transport.py
src/modules/depotcontroller.py
src/modules/manifest.py
src/modules/misc.py
src/modules/publish/transaction.py
src/modules/server/depot.py
src/modules/urlhelpers.py
src/pkgdefs/SUNWipkg/prototype
src/publish.py
src/pull.py
src/tests/cli/t_pkg_depotd.py
src/tests/cli/t_publish_api.py
src/util/distro-import/importer.py
src/util/distro-import/solaris.py
src/util/publish/merge.py
--- a/src/man/pkgrecv.1.txt	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/man/pkgrecv.1.txt	Thu Jul 15 15:40:39 2010 -0700
@@ -5,8 +5,8 @@
      pkgrecv - image packaging system content retrieval utility
 
 SYNOPSIS
-     /usr/bin/pkgrecv [-s src_uri] [-d (path|dest_uri)] [-kr] [-m match]
-         (fmri|pattern) ...
+     /usr/bin/pkgrecv [-s src_uri] [-d (path|dest_uri)] [-c cache_dir]
+         [-kr] [-m match] (fmri|pattern) ...
      /usr/bin/pkgrecv [-s src_uri] -n
 
 DESCRIPTION
@@ -21,6 +21,13 @@
 OPTIONS
      The following options are supported:
 
+     -c cache_dir    The path to a directory that will be used to cache
+		     downloaded content.  If one is not supplied, the
+		     client will automatically pick a cache directory.
+		     In the case where a download is interrupted, and a
+		     cache directory was automatically chosen, use this
+		     option to resume the download.
+
      -d path_or_uri  The path of a directory to save the retrieved package
                      to, or the URI of a repository to republish it to.  If
                      not provided, the default value is the current working
--- a/src/modules/actions/license.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/actions/license.py	Thu Jul 15 15:40:39 2010 -0700
@@ -168,7 +168,8 @@
                         return opener().read()
 
                 try:
-                        return img.transport.get_content(fmri, self.hash)
+                        pub = img.get_publisher(fmri.get_publisher())
+                        return img.transport.get_content(pub, self.hash)
                 finally:
                         img.cleanup_downloads()
 
--- a/src/modules/client/image.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/client/image.py	Thu Jul 15 15:40:39 2010 -0700
@@ -49,7 +49,7 @@
 import pkg.client.pkgplan               as pkgplan
 import pkg.client.progress              as progress
 import pkg.client.publisher             as publisher
-import pkg.client.transport.transport   as transport
+import pkg.client.transport             as transport
 import pkg.fmri
 import pkg.manifest                     as manifest
 import pkg.misc                         as misc
@@ -211,7 +211,8 @@
                 self.__lockf = None
 
                 # Transport operations for this image
-                self.transport = transport.Transport(self)
+                self.transport = transport.Transport(
+                    transport.ImageTransportCfg(self))
 
                 if should_exist:
                         self.find_root(self.root, user_provided_dir,
@@ -1158,7 +1159,6 @@
 
                 try:
                         ret = manifest.CachedManifest(fmri, self.pkgdir,
-                            self.cfg_cache.preferred_publisher,
                             excludes)
                         # if we have a intent string, let depot
                         # know for what we're using the cached manifest
--- a/src/modules/client/transport/__init__.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/client/transport/__init__.py	Thu Jul 15 15:40:39 2010 -0700
@@ -21,8 +21,99 @@
 #
 
 #
-# Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
-# Use is subject to license terms.
+# Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
 #
 
 __all__ = [ "transport" ]
+
+import pkg.misc as misc
+import pkg.client.publisher as publisher
+
+def Transport(tcfg):
+        """Returns a transport object"""
+
+        from pkg.client.transport.transport import Transport
+        return Transport(tcfg)
+
+def ImageTransportCfg(image):
+        """Returns an ImageTransportCfg object"""
+
+        from pkg.client.transport.transport import ImageTransportCfg
+        return ImageTransportCfg(image)
+
+def GenericTransportCfg(publishers=misc.EmptyI, c_download=None,
+            i_download=None, pkgdir=None, policy_map=misc.ImmutableDict()):
+        """Returns GenericTransportCfg object"""
+
+        from pkg.client.transport.transport import GenericTransportCfg
+        return GenericTransportCfg(publishers=publishers,
+            c_download=c_download, i_download=i_download, pkgdir=pkgdir,
+            policy_map=policy_map)
+
+# The following two methods are to be used by clients without an Image that 
+# need to configure a transport and or publishers.
+
+def setup_publisher(repo_uri, prefix, xport, xport_cfg,
+    remote_prefix=False, remote_publishers=False):
+        """Given transport 'xport' and publisher configuration 'xport_cfg'
+        take the string that identifies a repository by uri in 'repo_uri'
+        and create a publisher object.  The caller must specify the prefix.
+
+        If remote_prefix is True, the caller will contact the remote host
+        and use its publisher info to determine the publisher's actual prefix.
+
+        If remote_publishers is True, the caller will obtain the prefix and
+        repository information from the repo's publisher info."""
+        
+
+        if isinstance(repo_uri, list):
+                repo = publisher.Repository(origins=repo_uri)
+                repouri_list = repo_uri
+        else:
+                repouri_list = [publisher.RepositoryURI(repo_uri)]
+                repo = publisher.Repository(origins=repouri_list)
+
+        pub = publisher.Publisher(prefix=prefix, repositories=[repo])
+
+        if not remote_prefix and not remote_publishers:
+                xport_cfg.add_publisher(pub)
+                return pub
+
+        try:
+                newpubs = xport.get_publisherdata(pub) 
+        except apx.UnsupportedRepositoryOperation:
+                newpubs = None
+
+        if not newpubs:
+                xport_cfg.add_publisher(pub)
+                return pub
+
+        for p in newpubs:
+                psr = p.selected_repository
+
+                if not psr:
+                        p.add_repository(repo)
+                elif remote_publishers:
+                        if not psr.origins:
+                                for r in repouri_list:
+                                        psr.add_origin(r)
+                        elif repo not in psr.origins:
+                                for i, r in enumerate(repouri_list):
+                                        psr.origins.insert(i, r)
+                else:
+                        psr.origins = repouri_list
+
+                xport_cfg.add_publisher(p)
+
+        # Return first publisher in list
+        return newpubs[0]
+
+def setup_transport():
+        """Initialize the transport and transport configuration. The caller
+        must manipulate the transport configuration and add publishers
+        once it receives control of the objects."""
+
+        xport_cfg = GenericTransportCfg()
+        xport = Transport(xport_cfg)
+
+        return xport, xport_cfg
--- a/src/modules/client/transport/engine.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/client/transport/engine.py	Thu Jul 15 15:40:39 2010 -0700
@@ -92,6 +92,7 @@
                         eh.url = None
                         eh.repourl = None
                         eh.fobj = None
+                        eh.r_fobj = None
                         eh.filepath = None
                         eh.success = False
                         eh.fileprog = None
@@ -672,8 +673,9 @@
                 self.__success = []
                 self.__orphans = set()
 
-        def send_data(self, url, data, header=None, sslcert=None, sslkey=None,
-            repourl=None, ccancel=None, sock_path=None):
+        def send_data(self, url, data=None, header=None, sslcert=None,
+            sslkey=None, repourl=None, ccancel=None, sock_path=None,
+            data_fobj=None):
                 """Invoke the engine to retrieve a single URL.  
                 This routine sends the data in data, and returns the
                 server's response.  
@@ -694,7 +696,7 @@
                     hdrfunc=fobj.get_header_func(), header=header, data=data,
                     httpmethod="POST", sslcert=sslcert, sslkey=sslkey,
                     repourl=repourl, progfunc=progfunc, uuid=fobj.uuid,
-                    sock_path=None)
+                    sock_path=None, read_fobj=data_fobj)
 
                 self.__req_q.appendleft(t)
 
@@ -840,20 +842,62 @@
                 if not proto in ("http", "https"):
                         return
 
+                if treq.read_filepath:
+                        try:
+                                hdl.r_fobj = open(treq.read_filepath, "rb",
+                                    self.__file_bufsz)
+                        except EnvironmentError, e:
+                                if e.errno == errno.EACCES:
+                                        raise api_errors.PermissionsException(
+                                            e.filename)
+                                # Raise OperationError if it's not EACCES
+                                # or EROFS.
+                                raise tx.TransportOperationError(
+                                    "Unable to open file: %s" % e)
+
                 if treq.compressible:
                         hdl.setopt(pycurl.ENCODING, "")
 
                 if treq.hdrfunc:
                         hdl.setopt(pycurl.HEADERFUNCTION, treq.hdrfunc)
 
-                if treq.httpmethod == "HEAD":
+                if treq.httpmethod == "GET":
+                        hdl.setopt(pycurl.HTTPGET, True)
+                elif treq.httpmethod == "HEAD":
                         hdl.setopt(pycurl.NOBODY, True)
                 elif treq.httpmethod == "POST":
                         hdl.setopt(pycurl.POST, True)
-                        hdl.setopt(pycurl.POSTFIELDS, treq.data)
+                        if treq.data is not None:
+                                hdl.setopt(pycurl.POSTFIELDS, treq.data)
+                        elif hdl.r_fobj or treq.read_fobj:
+                                if not hdl.r_fobj:
+                                        hdl.r_fobj = treq.read_fobj
+                                hdl.setopt(pycurl.READDATA, hdl.r_fobj)
+                                hdl.setopt(pycurl.POSTFIELDSIZE,
+                                    os.fstat(hdl.r_fobj.fileno()).st_size)
+                        else:
+                                raise tx.TransportOperationError("Transport "
+                                    "operation for POST URL %s did not "
+                                    "supply data or read_fobj.  At least one "
+                                    "is required." % treq.url)
+                elif treq.httpmethod == "PUT":
+                        hdl.setopt(pycurl.UPLOAD, True)
+                        if hdl.r_fobj or treq.read_fobj:
+                                if not hdl.r_fobj:
+                                        hdl.r_fobj = treq.read_fobj
+                                hdl.setopt(pycurl.READDATA, hdl.r_fobj)
+                                hdl.setopt(pycurl.INFILESIZE,
+                                    os.fstat(hdl.r_fobj.fileno()).st_size)
+                        else:
+                                raise tx.TransportOperationError("Transport "
+                                    "operation for PUT URL %s did not "
+                                    "supply a read_fobj.  One is required."
+                                     % treq.url)
+                elif treq.httpmethod == "DELETE":
+                        hdl.setopt(pycurl.CUSTOMREQUEST, "DELETE")
                 else:
-                        # Default to GET
-                        hdl.setopt(pycurl.HTTPGET, True)
+                        raise tx.TransportOperationError("Invalid http method "
+                            "'%s' specified." % treq.httpmethod)
 
                 # Set up SSL options
                 if treq.sslcert:
@@ -924,6 +968,9 @@
                                         ft = hdl.filetime
                                         os.utime(hdl.filepath, (ft, ft))
 
+                if hdl.r_fobj:
+                        hdl.r_fobj.close()
+                        hdl.r_fobj = None
 
                 hdl.url = None
                 hdl.repourl = None
@@ -944,7 +991,7 @@
             hdrfunc=None, header=None, data=None, httpmethod="GET",
             progclass=None, progtrack=None, sslcert=None, sslkey=None,
             repourl=None, compressible=False, progfunc=None, uuid=None,
-            sock_path=None):
+            sock_path=None, read_fobj=None, read_filepath=None):
                 """Create a TransportRequest with the following parameters:
 
                 url - The url that the transport engine should retrieve
@@ -988,6 +1035,14 @@
                 light-weight implementations may use progfunc instead,
                 especially if they don't need per-file updates.
 
+                read_filepath - If the request is sending a file, include
+                the path here, as this is the most efficient way to send
+                the data.
+
+                read_fobj - If the request is sending a large payload,
+                this points to a fileobject from which the data may be
+                read.
+
                 repouri - This is the URL stem that identifies the repo.
                 It's a subset of url.  It's also used by the stats system.
 
@@ -1019,3 +1074,5 @@
                 self.compressible = compressible
                 self.uuid = uuid
                 self.socket_path = sock_path
+                self.read_fobj = read_fobj
+                self.read_filepath = read_filepath
--- a/src/modules/client/transport/fileobj.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/client/transport/fileobj.py	Thu Jul 15 15:40:39 2010 -0700
@@ -247,7 +247,7 @@
                 if not self.__headers_arrived:
                         self.__fill_headers()
 
-                return self.__headers.get(hdr, default)
+                return self.__headers.get(hdr.lower(), default)
 
         def _prime(self):
                 """Used by the underlying transport before handing this
@@ -374,7 +374,7 @@
                 elif data.find(":") > -1:
                         k, v = data.split(":", 1)
                         if v:
-                                self.__headers[k] = v.strip()
+                                self.__headers[k.lower()] = v.strip()
 
 class DummyLock(object):
         """This has the same external interface as threading.Lock,
--- a/src/modules/client/transport/repo.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/client/transport/repo.py	Thu Jul 15 15:40:39 2010 -0700
@@ -118,6 +118,45 @@
 
                 raise NotImplementedError
 
+        def publish_add(self, action, header=None, trans_id=None):
+                """The publish operation that adds content to a repository.
+                The action must be populated with a data property.
+                Callers may supply a header, and should supply a transaction
+                id in trans_id."""
+
+                raise NotImplementedError
+
+        def publish_abandon(self, header=None, trans_id=None):
+                """The 'abandon' publication operation, that tells a
+                Repository to abort the current transaction.  The caller
+                must specify the transaction id in trans_id. Returns
+                a (publish-state, fmri) tuple."""
+
+                raise NotImplementedError
+
+        def publish_close(self, header=None, trans_id=None, refresh_index=False,
+            add_to_catalog=False):
+                """The close operation tells the Repository to commit
+                the transaction identified by trans_id.  The caller may
+                specify refresh_index and add_to_catalog, if needed.
+                This method returns a (publish-state, fmri) tuple."""
+
+                raise NotImplementedError
+
+        def publish_open(self, header=None, client_release=None, pkg_name=None):
+                """Begin a publication operation by calling 'open'.
+                The caller must specify the client's OS release in
+                client_release, and the package's name in pkg_name.
+                Returns a transaction-ID."""
+
+                raise NotImplementedError
+
+        def publish_refresh_index(self, header=None):
+                """If the Repo points to a Repository that has a refresh-able
+                index, refresh the index."""
+
+                raise NotImplementedError
+
         def touch_manifest(self, fmri, header=None, ccancel=None):
                 """Send data about operation intent without actually
                 downloading a manifest."""
@@ -210,10 +249,11 @@
                     repourl=self._url, ccancel=ccancel,
                     sock_path=self._sock_path)
 
-        def _post_url(self, url, data, header=None, ccancel=None):
-                return self._engine.send_data(url, data, header,
+        def _post_url(self, url, data=None, header=None, ccancel=None,
+            data_fobj=None):
+                return self._engine.send_data(url, data=data, header=header,
                     repourl=self._url, ccancel=ccancel,
-                    sock_path=self._sock_path)
+                    sock_path=self._sock_path, data_fobj=data_fobj)
 
         def add_version_data(self, verdict):
                 """Cache the information about what versions a repository
@@ -491,6 +531,122 @@
 
                 return self._verdata is not None
 
+        def publish_add(self, action, header=None, trans_id=None):
+                """The publish operation that adds content to a repository.
+                The action must be populated with a data property.
+                Callers may supply a header, and should supply a transaction
+                id in trans_id."""
+
+                attrs = action.attrs
+                data_fobj = None
+                data = None
+                methodstr = "add/0/"
+
+                baseurl = urlparse.urljoin(self._repouri.uri, methodstr)
+                request_str = "%s/%s" % (trans_id, action.name)
+                requesturl = urlparse.urljoin(baseurl, request_str)
+
+                if action.data:
+                        data_fobj = action.data()
+                else:
+                        data = ""
+
+                headers = dict(
+                    ("X-IPkg-SetAttr%s" % i, "%s=%s" % (k, attrs[k]))
+                    for i, k in enumerate(attrs)
+                )
+
+                if header:
+                        headers.update(header)
+
+                fobj = self._post_url(requesturl, header=headers,
+                    data_fobj=data_fobj, data=data)
+
+                # Discard response body
+                fobj.read()
+
+        def publish_abandon(self, header=None, trans_id=None):
+                """The 'abandon' publication operation, that tells a
+                Repository to abort the current transaction.  The caller
+                must specify the transaction id in trans_id. Returns
+                a (publish-state, fmri) tuple."""
+
+                methodstr = "abandon/0/"
+
+                baseurl = urlparse.urljoin(self._repouri.uri, methodstr)
+                request_str = trans_id
+                requesturl = urlparse.urljoin(baseurl, request_str)
+
+                fobj = self._fetch_url(requesturl, header=header)
+
+                # Discard response body
+                fobj.read()
+
+                return fobj.getheader("State", None), \
+                     fobj.getheader("Package-FMRI", None)
+
+        def publish_close(self, header=None, trans_id=None, refresh_index=False,
+            add_to_catalog=False):
+                """The close operation tells the Repository to commit
+                the transaction identified by trans_id.  The caller may
+                specify refresh_index and add_to_catalog, if needed.
+                This method returns a (publish-state, fmri) tuple."""
+
+                methodstr = "close/0/"
+                headers = {}
+                if not refresh_index:
+                        headers["X-IPkg-Refresh-Index"] = 0
+                if not add_to_catalog:
+                        headers["X-IPkg-Add-To-Catalog"] = 0
+                if header:
+                        headers.update(header)
+
+                baseurl = urlparse.urljoin(self._repouri.uri, methodstr)
+                request_str = trans_id
+                requesturl = urlparse.urljoin(baseurl, request_str)
+
+                fobj = self._fetch_url(requesturl, header=headers)
+
+                # Discard response body
+                fobj.read()
+
+                return fobj.getheader("State", None), \
+                     fobj.getheader("Package-FMRI", None)
+
+        def publish_open(self, header=None, client_release=None, pkg_name=None):
+                """Begin a publication operation by calling 'open'.
+                The caller must specify the client's OS release in
+                client_release, and the package's name in pkg_name.
+                Returns a transaction-ID."""
+
+                methodstr = "open/0/"
+                baseurl = urlparse.urljoin(self._repouri.uri, methodstr)
+                request_str = urllib.quote(pkg_name, "")
+                requesturl = urlparse.urljoin(baseurl, request_str)
+
+                headers = {"Client-Release": client_release}
+                if header:
+                        headers.update(header)
+
+                fobj = self._fetch_url(requesturl, header=headers)
+
+                # Discard response body
+                fobj.read()
+
+                return fobj.getheader("Transaction-ID", None)
+
+        def publish_refresh_index(self, header=None):
+                """If the Repo points to a Repository that has a refresh-able
+                index, refresh the index."""
+
+                methodstr = "index/0/refresh/"
+                requesturl = urlparse.urljoin(self._repouri.uri, methodstr)
+
+                fobj = self._fetch_url(requesturl, header=header)
+
+                # Discard response body
+                fobj.read()
+
         def supports_version(self, op, ver):
                 """Returns true if operation named in string 'op'
                 supports integer version in 'ver' argument."""
@@ -549,30 +705,38 @@
                     sslkey=self._repouri.ssl_key, repourl=self._url,
                     ccancel=ccancel, sock_path=self._sock_path)
 
-        def _post_url(self, url, data, header=None, ccancel=None):
-                return self._engine.send_data(url, data, header=header,
+        def _post_url(self, url, data=None, header=None, ccancel=None,
+            data_fobj=None):
+                return self._engine.send_data(url, data=data, header=header,
                     sslcert=self._repouri.ssl_cert,
                     sslkey=self._repouri.ssl_key, repourl=self._url,
-                    ccancel=ccancel, sock_path=self._sock_path)
+                    ccancel=ccancel, sock_path=self._sock_path,
+                    data_fobj=data_fobj)
 
 
 class FileRepo(TransportRepo):
 
-        def __init__(self, repostats, repouri, engine):
+        def __init__(self, repostats, repouri, engine, frepo=None):
                 """Create a file repo.  Repostats is a RepoStats object.
                 Repouri is a RepositoryURI object.  Engine is a transport
-                engine object.
+                engine object.  If the caller wants to pass a Repository
+                object instead of having FileRepo create one, it should
+                pass the object in the frepo argument.
 
                 The convenience function new_repo() can be used to create
                 the correct repo."""
 
-                self._frepo = None
+                self._frepo = frepo
                 self._url = repostats.url
                 self._repouri = repouri
                 self._engine = engine
                 self._verdata = None
                 self.__stats = repostats
 
+                # If caller supplied a Repository object, we're done. Return.
+                if self._frepo:
+                        return
+
                 try:
                         scheme, netloc, path, params, query, fragment = \
                             urlparse.urlparse(self._repouri.uri, "file",
@@ -989,6 +1153,69 @@
 
                 return self._verdata is not None
 
+        def publish_add(self, action, header=None, trans_id=None):
+                """The publish operation that adds an action and its
+                payload (if applicable) to an existing transaction in a
+                repository.  The action must be populated with a data property.
+                Callers may supply a header, and should supply a transaction
+                id in trans_id."""
+
+                try:
+                        self._frepo.add(trans_id, action)
+                except svr_repo.RepositoryError, e:
+                        raise tx.TransportOperationError(str(e))
+
+        def publish_abandon(self, header=None, trans_id=None):
+                """The abandon operation, that tells a Repository to abort
+                the current transaction.  The caller must specify the
+                transaction id in trans_id. Returns a (publish-state, fmri)
+                tuple."""
+
+                try:
+                        pkg_state = self._frepo.abandon(trans_id)
+                except svr_repo.RepositoryError, e:
+                        raise tx.TransportOperationError(str(e))
+
+                return None, pkg_state
+
+        def publish_close(self, header=None, trans_id=None, refresh_index=False,
+            add_to_catalog=False):
+                """The close operation tells the Repository to commit
+                the transaction identified by trans_id.  The caller may
+                specify refresh_index and add_to_catalog, if needed.
+                This method returns a (publish-state, fmri) tuple."""
+
+                try:
+                        pkg_fmri, pkg_state = self._frepo.close(trans_id,
+                            refresh_index=refresh_index,
+                            add_to_catalog=add_to_catalog)
+                except svr_repo.RepositoryError, e:
+                        raise tx.TransportOperationError(str(e))
+
+                return pkg_fmri, pkg_state
+
+        def publish_open(self, header=None, client_release=None, pkg_name=None):
+                """Begin a publication operation by calling 'open'.
+                The caller must specify the client's OS release in
+                client_release, and the package's name in pkg_name.
+                Returns a transaction-ID string."""
+
+                try:
+                        trans_id = self._frepo.open(client_release, pkg_name)
+                except svr_repo.RepositoryError, e:
+                        raise tx.TransportOperationError(str(e))
+
+                return trans_id
+
+        def publish_refresh_index(self, header=None):
+                """If the Repo points to a Repository that has a refresh-able
+                index, refresh the index."""
+
+                try:
+                        self._frepo.refresh_index()
+                except svr_repo.RepositoryError, e:
+                        raise tx.TransportOperationError(str(e))
+
         def supports_version(self, op, ver):
                 """Returns true if operation named in string 'op'
                 supports integer version in 'ver' argument."""
@@ -1110,12 +1337,19 @@
             "https": HTTPSRepo,
         }
 
+        update_schemes = {
+            "file": FileRepo
+        }
+
         def __init__(self, engine):
                 """Caller must include a TransportEngine."""
 
                 self.__engine = engine
                 self.__cache = {}
 
+        def __contains__(self, url):
+                return url in self.__cache
+
         def clear_cache(self):
                 """Flush the contents of the cache."""
 
@@ -1142,6 +1376,28 @@
 
                 return repo
 
+        def update_repo(self, repostats, repouri, repository):
+                """For the FileRepo, some callers need to update its
+                Repository object.  They should use this method to do so.
+                If the Repo isn't in the cache, it's created and added."""
+
+                origin_url = repostats.url
+                urltuple = urlparse.urlparse(origin_url)
+                scheme = urltuple[0]
+
+                if scheme not in RepoCache.update_schemes:
+                        return
+
+                if origin_url in self.__cache:
+                        repo = self.__cache[origin_url]
+                        repo._frepo = repository
+                        return
+
+                repo = RepoCache.update_schemes[scheme](repostats, repouri,
+                    self.__engine, frepo=repository)
+
+                self.__cache[origin_url] = repo
+
         def remove_repo(self, repo=None, url=None):
                 """Remove a repo from the cache.  Caller must supply
                 either a RepositoryURI object or a URL."""
--- a/src/modules/client/transport/transport.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/client/transport/transport.py	Thu Jul 15 15:40:39 2010 -0700
@@ -29,6 +29,7 @@
 import httplib
 import os
 import statvfs
+import tempfile
 import urllib
 import urlparse
 import zlib
@@ -55,18 +56,141 @@
 from pkg.client import global_settings
 logger = global_settings.logger
 
+class TransportCfg(object):
+        """Contains configuration needed by the transport for proper
+        operations.  Clients must create one of these objects, and then pass
+        it to a transport instance when it is initialized.  This is the base
+        class."""
+
+        def gen_publishers(self):
+                raise NotImplementedError
+
+        def get_policy(self, policy_name):
+                raise NotImplementedError
+
+        def get_publisher(self, publisher_name):
+                raise NotImplementedError
+
+        cached_download_dir = property(None, None, None, 
+            "Absolute pathname to directory of cached, completed downloads.")
+
+        incoming_download_dir = property(None, None, None,
+            "Absolute pathname to directory of in-progress downloads.")
+
+        pkgdir = property(None, None, None,
+            "Absolute pathname to pkgdir, where the manifest files live.")
+
+        user_agent = property(None, None, None,
+            "A string that identifies the user agent for the transport.")
+
+
+class ImageTransportCfg(TransportCfg):
+        """A subclass of TransportCfg that gets its configuration information
+        from an Image object."""
+
+        def __init__(self, image):
+                self.__img = image
+        
+        def gen_publishers(self):
+                return self.__img.gen_publishers()
+
+        def get_policy(self, policy_name):
+                if not self.__img.cfg_cache:
+                        return False
+                return self.__img.cfg_cache.get_policy(policy_name)
+
+        def get_publisher(self, publisher_name):
+                return self.__img.get_publisher(publisher_name)
+
+        def __get_user_agent(self):
+                return misc.user_agent_str(self.__img,
+                    global_settings.client_name)
+
+        cached_download_dir = property(
+            lambda self: self.__img.cached_download_dir(),
+            None, None, "Absolute pathname to directory of cached, "
+            "completed downloads.")
+
+        incoming_download_dir = property(
+            lambda self: self.__img.incoming_download_dir(),
+            None, None, "Absolute pathname to directory of in-progress "
+            "downloads.")
+
+        pkgdir = property(lambda self: self.__img.pkgdir, None, None,
+            "Absolute pathname to pkgdir, where the manifest files live.")
+
+        user_agent = property(__get_user_agent, None, None,
+            "A string that identifies the user agent for the transport.")
+
+
+class GenericTransportCfg(TransportCfg):
+        """A subclass of TransportCfg for use by transport clients that
+        do not have an image."""
+
+        def __init__(self, publishers=misc.EmptyI, c_download=None,
+            i_download=None, pkgdir=None, policy_map=misc.EmptyDict):
+
+                self.__publishers = {}
+                self.__cached_download_dir = c_download
+                self.__incoming_download_dir = i_download
+                self.__pkgdir = pkgdir
+                self.__policy_map = policy_map
+
+                for p in publishers:
+                        self.__publishers[p.prefix] = p
+
+        def add_publisher(self, pub):
+                self.__publishers[pub.prefix] = pub
+
+        def gen_publishers(self):
+                return (p for p in self.__publishers.values())
+
+        def get_policy(self, policy_name):
+                return self.__policy_map.get(policy_name, False)
+
+        def get_publisher(self, publisher_name):
+                return self.__publishers.get(publisher_name)
+
+        def remove_publisher(self, publisher_name):
+                return self.__publishers.pop(publisher_name, None)
+
+        def __get_user_agent(self):
+                return misc.user_agent_str(None, global_settings.client_name)
+
+        def __set_c_dl_dir(self, dl_dir):
+                self.__cached_download_dir = dl_dir
+
+        def __set_i_dl_dir(self, dl_dir):
+                self.__incoming_download_dir = dl_dir
+
+        def __set_pkgdir(self, pkgdir):
+                self.__pkgdir = pkgdir
+
+        cached_download_dir = property(
+            lambda self: self.__cached_download_dir, __set_c_dl_dir, None,
+            "Absolute pathname to directory of cached, completed downloads.")
+
+        incoming_download_dir = property(
+            lambda self: self.__incoming_download_dir, __set_i_dl_dir, None,
+            "Absolute pathname to directory of in-progress downloads.")
+
+        pkgdir = property(lambda self: self.__pkgdir, __set_pkgdir, None,
+            "Absolute pathname to pkgdir, where the manifest files live.")
+
+        user_agent = property(__get_user_agent, None, None,
+            "A string that identifies the user agent for the transport.")
+
 
 class Transport(object):
         """The generic transport wrapper object.  Its public methods should
         be used by all client code that wishes to perform file/network
         packaging operations."""
 
-        def __init__(self, img):
-                """Initialize the Transport object.  If an Image object
-                is provided in img, use that to determine some of the
-                destination locations for transport operations."""
-
-                self.__img = img
+        def __init__(self, tcfg):
+                """Initialize the Transport object. Caller must supply
+                a TransportCfg object."""
+
+                self.__tcfg = tcfg
                 self.__engine = None
                 self.__cadir = None
                 self.__portal_test_executed = False
@@ -79,15 +203,16 @@
         def __setup(self):
                 self.__engine = engine.CurlTransportEngine(self)
 
-                # Configure engine's user agent based upon img configuration
-                ua = misc.user_agent_str(self.__img,
-                    global_settings.client_name)
-                self.__engine.set_user_agent(ua)
+                # Configure engine's user agent
+                self.__engine.set_user_agent(self.__tcfg.user_agent)
 
                 self.__repo_cache = trepo.RepoCache(self.__engine)
 
-                cc = self.__img.cfg_cache
-                if cc and cc.get_policy(imageconfig.MIRROR_DISCOVERY):
+                if not self._caches and self.__tcfg.cached_download_dir:
+                        self.add_cache(self.__tcfg.cached_download_dir,
+                            readonly=False)
+
+                if self.__tcfg.get_policy(imageconfig.MIRROR_DISCOVERY):
                         self.__dynamic_mirrors = mdetect.MirrorDetector()
                         try:
                                 self.__dynamic_mirrors.locate()
@@ -100,12 +225,13 @@
                 # For now, transport write caches all publisher data in one
                 # place regardless of publisher source.
                 self._caches = {}
-                self.add_cache(self.__img.cached_download_dir(),
-                    readonly=False)
+                if self.__tcfg.cached_download_dir:
+                        self.add_cache(self.__tcfg.cached_download_dir,
+                            readonly=False)
 
                 # Automatically add any publisher repository origins
                 # or mirrors that are filesystem-based as read-only caches.
-                for pub in self.__img.gen_publishers():
+                for pub in self.__tcfg.gen_publishers():
                         repo = pub.selected_repository
                         if not repo:
                                 continue
@@ -194,6 +320,9 @@
                 if not pub:
                         pub = '__all'
 
+                if self._caches is None:
+                        self._caches = {}
+
                 pub_caches = self._caches.setdefault(pub, [])
 
                 write_caches = [
@@ -249,6 +378,9 @@
                 elif not pub or not isinstance(pub, basestring):
                         pub = None
 
+                if self._caches is None:
+                        self._caches = {}
+
                 caches = [
                     cache
                     for cache in self._caches.get(pub, [])
@@ -524,7 +656,7 @@
                         completed_dir = path
                 else:
                         completed_dir = pub.catalog_root
-                download_dir = self.__img.incoming_download_dir()
+                download_dir = self.__tcfg.incoming_download_dir
 
                 # Call setup if the transport isn't configured or was shutdown.
                 if not self.__engine:
@@ -725,7 +857,7 @@
                             "publisher/0")
                 raise failures
 
-        def get_content(self, fmri, fhash, ccancel=None):
+        def get_content(self, pub, fhash, ccancel=None):
                 """Given a fmri and fhash, return the uncompressed content
                 from the remote object.  This is similar to get_datstream,
                 except that the transport handles retrieving and decompressing
@@ -733,22 +865,20 @@
                
                 self.__lock.acquire()
                 try:
-                        content = self._get_content(fmri, fhash,
+                        content = self._get_content(pub, fhash,
                             ccancel=ccancel)
                 finally:
                         self.__lock.release()
 
                 return content
 
-        def _get_content(self, fmri, fhash, ccancel=None):
+        def _get_content(self, pub, fhash, ccancel=None):
                 """This is the function that implements get_content.
                 The other function is a wrapper for this one, which handles
                 the transport locking correctly."""
 
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 failures = tx.TransportFailures()
-                pub_prefix = fmri.get_publisher()
-                pub = self.__img.get_publisher(pub_prefix)
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
                 # Call setup if the transport isn't configured or was shutdown.
@@ -811,7 +941,7 @@
 
                 failures = tx.TransportFailures()
                 pub_prefix = fmri.get_publisher()
-                pub = self.__img.get_publisher(pub_prefix)
+                pub = self.__tcfg.get_publisher(pub_prefix)
                 mfst = fmri.get_url_path()
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(intent=intent,
@@ -845,32 +975,37 @@
                 raise failures
 
         def get_manifest(self, fmri, excludes=misc.EmptyI, intent=None,
-            ccancel=None):
+            ccancel=None, pub=None, content_only=False):
                 """Given a fmri, and optional excludes, return a manifest
                 object."""
 
                 self.__lock.acquire()
                 try:
                         m = self._get_manifest(fmri, excludes, intent,
-                            ccancel=ccancel)
+                            ccancel=ccancel, pub=pub, content_only=content_only)
                 finally:
                         self.__lock.release()
 
                 return m
 
         def _get_manifest(self, fmri, excludes=misc.EmptyI, intent=None,
-            ccancel=None):
+            ccancel=None, pub=None, content_only=False):
                 """This is the implementation of get_manifest.  The
                 get_manifest function wraps this."""
 
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 failures = tx.TransportFailures()
                 pub_prefix = fmri.get_publisher()
-                pub = self.__img.get_publisher(pub_prefix)
-                download_dir = self.__img.incoming_download_dir()
+                download_dir = self.__tcfg.incoming_download_dir
                 mcontent = None
-                header = self.__build_header(intent=intent,
-                    uuid=self.__get_uuid(pub))
+                header = None
+
+                if not pub:
+                        pub = self.__tcfg.get_publisher(pub_prefix)
+
+                if isinstance(pub, publisher.Publisher):
+                        header = self.__build_header(intent=intent,
+                            uuid=self.__get_uuid(pub))
 
                 # Call setup if the transport isn't configured or was shutdown.
                 if not self.__engine:
@@ -896,11 +1031,12 @@
                                 verified = self._verify_manifest(fmri,
                                     content=mcontent)
 
+                                if content_only:
+                                        return mcontent
+
                                 m = manifest.CachedManifest(fmri,
-                                    self.__img.pkgdir,
-                                    self.__img.cfg_cache.preferred_publisher,
-                                    excludes,
-                                    mcontent)
+                                    self.__tcfg.pkgdir, excludes, mcontent)
+
                                 return m
 
                         except tx.ExcessiveTransientFailure, ex:
@@ -959,7 +1095,7 @@
                 """This is the implementation of prefetch_manifests.
                 The other function is a wrapper for this one."""
 
-                download_dir = self.__img.incoming_download_dir()
+                download_dir = self.__tcfg.incoming_download_dir
 
                 # Call setup if the transport isn't configured or was shutdown.
                 if not self.__engine:
@@ -996,7 +1132,7 @@
                 mx_pub = {}
                 for fmri, intent in fetchlist:
                         pub_prefix = fmri.get_publisher()
-                        pub = self.__img.get_publisher(pub_prefix)
+                        pub = self.__tcfg.get_publisher(pub_prefix)
                         header = self.__build_header(intent=intent,
                             uuid=self.__get_uuid(pub))
                         if pub_prefix not in mx_pub:
@@ -1036,7 +1172,7 @@
                 progtrack = mxfr.get_progtrack()
 
                 # download_dir is temporary download path.
-                download_dir = self.__img.incoming_download_dir()
+                download_dir = self.__tcfg.incoming_download_dir
 
                 for d in self.__gen_origins(pub, retry_count):
 
@@ -1104,15 +1240,12 @@
                                         failedreqs.append(s)
                                         continue
 
-                                pref_pub = \
-                                    self.__img.cfg_cache.preferred_publisher
-
                                 try:
                                         mf = file(dl_path)
                                         mcontent = mf.read()
                                         mf.close()
                                         manifest.CachedManifest(fmri,
-                                            self.__img.pkgdir, pref_pub,
+                                            self.__tcfg.pkgdir,
                                             excludes, mcontent)
                                 except (apx.InvalidPackageErrors,
                                     ActionError), e:
@@ -1162,7 +1295,11 @@
                 must be used."""
 
                 # Get publisher information from FMRI.
-                pub = self.__img.get_publisher(fmri.get_publisher())
+                pub = self.__tcfg.get_publisher(fmri.get_publisher())
+
+                if not pub:
+                        return False
+
                 # Use the publisher to get the catalog and its signature info.
                 try:
                         sigs = dict(pub.catalog.get_entry_signatures(fmri))
@@ -1219,7 +1356,7 @@
                 return header
 
         def __get_uuid(self, pub):
-                if not self.__img.cfg_cache.get_policy(imageconfig.SEND_UUID):
+                if not self.__tcfg.get_policy(imageconfig.SEND_UUID):
                         return None
 
                 try:
@@ -1257,12 +1394,20 @@
                 filelist = flist
                 pub = mfile.get_publisher()
                 progtrack = mfile.get_progtrack()
-                header = self.__build_header(uuid=self.__get_uuid(pub))
+                header = None
+
+                if isinstance(pub, publisher.Publisher):
+                        header = self.__build_header(uuid=self.__get_uuid(pub))
 
                 # download_dir is temporary download path.
-                download_dir = self.__img.incoming_download_dir()
-
-                cache = self._get_caches(pub, readonly=False)[0]
+                download_dir = self.__tcfg.incoming_download_dir
+
+                cache = self._get_caches(pub, readonly=False)
+                if cache:
+                        # For now, pick first cache in list, if any are
+                        # present.
+                        cache = cache[0]
+
                 for d in self.__gen_repos(pub, retry_count):
 
                         failedreqs = []
@@ -1334,9 +1479,11 @@
                                                 filelist = failedreqs
                                         continue
 
-                                cpath = cache.insert(s, dl_path)
-                                mfile.make_openers(s, cpath)
-                                mfile.del_hash(s)
+                                if cache:
+                                        cpath = cache.insert(s, dl_path)
+                                        mfile.file_done(s, cpath)
+                                else:
+                                        mfile.file_done(s, dl_path)
 
                         # Return if everything was successful
                         if not filelist and not errlist:
@@ -1357,7 +1504,7 @@
                 function is a wrapper around this function, mainly for
                 locking purposes."""
 
-                download_dir = self.__img.incoming_download_dir()
+                download_dir = self.__tcfg.incoming_download_dir
                 pub = mfile.get_publisher()
 
                 # Call setup if the transport isn't configured or was shutdown.
@@ -1554,6 +1701,34 @@
                         for rs, ruri in rslist:
                                 yield self.__repo_cache.new_repo(rs, ruri)
 
+        def __gen_publication_origin(self, pub, count):
+                """The pub argument may either be a Publisher or a
+                RepositoryURI object.  This function is specific to
+                publication operations because it ensures that clients
+                are using properly configured Publisher objects."""
+
+                # Call setup if the transport isn't configured or was shutdown.
+                if not self.__engine:
+                        self.__setup()
+
+                # This is specifically to retry against a single origin
+                # in the case that network failures have occurred.  If
+                # the caller supplied a Publisher argument, make sure it
+                # has only one origin.
+                if isinstance(pub, publisher.Publisher):
+                        origins = pub.selected_repository.origins
+                        assert len(origins) == 1 
+                else:
+                        # If search was invoked with -s option, we'll have a
+                        # RepoURI instead of a publisher.  Convert this to a
+                        # repo uri
+                        origins = [pub]
+
+                for i in xrange(count):
+                        rslist = self.stats.get_repostats(origins, origins)
+                        for rs, ruri in rslist:
+                                yield self.__repo_cache.new_repo(rs, ruri)
+
         def __gen_origins_byversion(self, pub, count, operation, version,
             ccancel=None):
                 """Return origin repos for publisher pub, that support
@@ -1597,13 +1772,20 @@
                 if not self.__engine:
                         self.__setup()
 
-                for i in xrange(count):
+                if isinstance(pub, publisher.Publisher):
                         repo = pub.selected_repository
                         repolist = repo.mirrors[:]
                         repolist.extend(repo.origins)
                         repolist.extend(self.__dynamic_mirrors)
-                        rslist = self.stats.get_repostats(repolist,
-                            repo.origins)
+                        origins = repo.origins
+                else:
+                        # If caller passed RepositoryURI object in as
+                        # pub argument, repolist is the RepoURI
+                        repolist = [pub]
+                        origins = repolist
+
+                for i in xrange(count):
+                        rslist = self.stats.get_repostats(repolist, origins)
                         for rs, ruri in rslist:
                                 yield self.__repo_cache.new_repo(rs, ruri)
 
@@ -1620,12 +1802,15 @@
                 if not self.__engine:
                         self.__setup()
 
-                repo = pub.selected_repository
-                if origin_only:
+                if isinstance(pub, publisher.Publisher):
+                        repo = pub.selected_repository
                         repolist = repo.origins[:]
+                        if not origin_only:
+                                repolist.extend(repo.mirrors)
                 else:
-                        repolist = repo.mirrors[:]
-                        repolist.extend(repo.origins)
+                        # If caller passed RepositoryURI object in as
+                        # pub argument, repolist is the RepoURI
+                        repolist = [pub]
 
                 n = len(repolist)
                 m = self.stats.get_num_visited(repolist)
@@ -1691,7 +1876,7 @@
                 self.__portal_test_executed = True
                 vd = None
 
-                for pub in self.__img.gen_publishers():
+                for pub in self.__tcfg.gen_publishers():
                         try:
                                 vd = self._get_versions(pub, ccancel=ccancel)
                         except tx.TransportException, ex:
@@ -1776,11 +1961,29 @@
                 if not self.__engine:
                         self.__setup()
                 
-                publisher = self.__img.get_publisher(fmri.get_publisher())
+                publisher = self.__tcfg.get_publisher(fmri.get_publisher())
                 mfile = MultiFile(publisher, self, progtrack, ccancel)
 
                 return mfile
 
+        def multi_file_ni(self, publisher, final_dir, decompress=False,
+            progtrack=None, ccancel=None):
+                """Creates a MultiFileNI object for this transport.
+                The caller may add actions to the multifile object
+                and wait for the download to complete.
+
+                This is used by callers who want to download files,
+                but not install them through actions."""
+
+                # Call setup if the transport isn't configured or was shutdown.
+                if not self.__engine:
+                        self.__setup()
+                
+                mfile = MultiFileNI(publisher, self, final_dir,
+                    decompress=decompress, progtrack=progtrack, ccancel=ccancel)
+
+                return mfile
+
         def _action_cached(self, action, pub):
                 """If a file with the name action.hash is cached,
                 and if it has the same content hash as action.chash,
@@ -1847,6 +2050,261 @@
                             "chash failure: expected: %s computed: %s" % \
                             (chash, newhash), size=s.st_size)
 
+        def publish_add(self, pub, action=None, trans_id=None):
+                """Perform the 'add' publication operation to the publisher
+                supplied in pub.  The caller should include the action in the
+                action argument. The transaction-id is passed in trans_id."""
+
+                self.__lock.acquire()
+                try:
+                        self._publish_add(pub, action=action, trans_id=trans_id)
+                finally:
+                        self.__lock.release()
+
+        def _publish_add(self, pub, action=None, trans_id=None):
+                """Implementation of publish_add.  The current publish_add
+                function is a locking wrapper for the transport."""
+
+                failures = tx.TransportFailures()
+                retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+                header = self.__build_header(uuid=self.__get_uuid(pub))
+
+                # Call setup if the transport isn't configured or was shutdown.
+                if not self.__engine:
+                        self.__setup()
+
+                for d in self.__gen_publication_origin(pub, retry_count):
+                        try:
+                                d.publish_add(action, header=header,
+                                    trans_id=trans_id)
+                                return
+                        except tx.ExcessiveTransientFailure, ex:
+                                # If an endpoint experienced so many failures
+                                # that we just gave up, grab the list of
+                                # failures that it contains
+                                failures.extend(ex.failures)
+                        except tx.TransportException, e:
+                                if e.retryable:
+                                        failures.append(e)
+                                else:
+                                        raise
+
+                raise failures
+
+        def publish_abandon(self, pub, trans_id=None):
+                """Perform an 'abandon' publication operation to the
+                publisher supplied in the pub argument.  The caller should
+                also include the transaction id in trans_id."""
+
+                self.__lock.acquire()
+                try:
+                        state, fmri = self._publish_abandon(pub,
+                            trans_id=trans_id)
+                finally:
+                        self.__lock.release()
+
+                return state, fmri
+
+        def _publish_abandon(self, pub, trans_id=None):
+                """Implementation of publish_abandon.  The current
+                publish_abandon function is a locking wrapper for the
+                transport."""
+
+                failures = tx.TransportFailures()
+                retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+                header = self.__build_header(uuid=self.__get_uuid(pub))
+
+                # Call setup if transport isn't configured, or was shutdown.
+                if not self.__engine:
+                        self.__setup()
+
+                for d in self.__gen_publication_origin(pub, retry_count):
+                        try:
+                                state, fmri = d.publish_abandon(header=header,
+                                    trans_id=trans_id)
+                                return state, fmri
+                        except tx.ExcessiveTransientFailure, ex:
+                                # If an endpoint experienced so many failures
+                                # that we just gave up, grab the list of
+                                # failures that it contains
+                                failures.extend(ex.failures)
+                        except tx.TransportException, e:
+                                if e.retryable:
+                                        failures.append(e)
+                                else:
+                                        raise
+
+                raise failures
+
+        def publish_close(self, pub, trans_id=None, refresh_index=False,
+            add_to_catalog=False):
+                """Perform a 'close' publication operation to the
+                publisher supplied in the pub argument.  The caller should
+                also include the transaction id in trans_id.  If
+                the refresh_index argument is true, the repository
+                will be told to refresh its index.  If add_to_catalog
+                is true, the pkg will be added to the catalog once
+                the transactions close.  Not all transport methods
+                recognize this parameter."""
+
+                self.__lock.acquire()
+                try:
+                        state, fmri = self._publish_close(pub,
+                            trans_id=trans_id, refresh_index=refresh_index,
+                            add_to_catalog=add_to_catalog)
+                finally:
+                        self.__lock.release()
+
+                return state, fmri
+
+        def _publish_close(self, pub, trans_id=None, refresh_index=False,
+            add_to_catalog=False):
+                """Implementation of publish_close.  The current
+                publish_close function is a locking wrapper for the
+                transport."""
+
+                failures = tx.TransportFailures()
+                retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+                header = self.__build_header(uuid=self.__get_uuid(pub))
+
+                # Call setup if transport isn't configured, or was shutdown.
+                if not self.__engine:
+                        self.__setup()
+
+                for d in self.__gen_publication_origin(pub, retry_count):
+                        try:
+                                state, fmri = d.publish_close(header=header,
+                                    trans_id=trans_id,
+                                    refresh_index=refresh_index,
+                                    add_to_catalog=add_to_catalog)
+                                return state, fmri
+                        except tx.ExcessiveTransientFailure, ex:
+                                # If an endpoint experienced so many failures
+                                # that we just gave up, grab the list of
+                                # failures that it contains
+                                failures.extend(ex.failures)
+                        except tx.TransportException, e:
+                                if e.retryable:
+                                        failures.append(e)
+                                else:
+                                        raise
+
+                raise failures
+
+        def publish_open(self, pub, client_release=None, pkg_name=None):
+                """Perform an 'open' transaction to start a publication
+                transaction to the publisher named in pub.  The caller should
+                supply the client's OS release in client_release, and the
+                package's name in pkg_name."""
+
+                self.__lock.acquire()
+                try:
+                        trans_id = self._publish_open(pub,
+                            client_release=client_release, pkg_name=pkg_name)
+                finally:
+                        self.__lock.release()
+
+                return trans_id
+
+        def _publish_open(self, pub, client_release=None, pkg_name=None):
+                """Implementation of publish_open.  The current publish_open
+                function is a locking wrapper for the transport."""
+
+                failures = tx.TransportFailures()
+                retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+                header = self.__build_header(uuid=self.__get_uuid(pub))
+
+                # Call setup if transport isn't configured, or was shutdown.
+                if not self.__engine:
+                        self.__setup()
+
+                for d in self.__gen_publication_origin(pub, retry_count):
+                        try:
+                                trans_id = d.publish_open(header=header,
+                                    client_release=client_release,
+                                    pkg_name=pkg_name)
+                                return trans_id
+                        except tx.ExcessiveTransientFailure, ex:
+                                # If an endpoint experienced so many failures
+                                # that we just gave up, grab the list of
+                                # failures that it contains
+                                failures.extend(ex.failures)
+                        except tx.TransportException, e:
+                                if e.retryable:
+                                        failures.append(e)
+                                else:
+                                        raise
+
+                raise failures
+
+        def publish_refresh_index(self, pub):
+                """Instructs the repositories named by Publisher pub
+                to refresh their index."""
+
+                self.__lock.acquire()
+                try:
+                        self._publish_refresh_index(pub)
+                finally:
+                        self.__lock.release()
+
+        def _publish_refresh_index(self, pub):
+                """Implmentation of publish_refresh_index.  The current
+                publish_refresh_index function is a locking wrapper for
+                the transport."""
+
+                failures = tx.TransportFailures()
+                retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+                header = self.__build_header(uuid=self.__get_uuid(pub))
+
+                # Call setup if transport isn't configured, or was shutdown.
+                if not self.__engine:
+                        self.__setup()
+
+                for d in self.__gen_publication_origin(pub, retry_count):
+                        try:
+                                d.publish_refresh_index(header=header)
+                                return
+                        except tx.ExcessiveTransientFailure, ex:
+                                # If an endpoint experienced so many failures
+                                # that we just gave up, grab the list of
+                                # failures that it contains
+                                failures.extend(ex.failures)
+                        except tx.TransportException, e:
+                                if e.retryable:
+                                        failures.append(e)
+                                else:
+                                        raise
+
+                raise failures
+
+        def publish_cache_repository(self, pub, repo):
+                """If the caller needs to override the underlying Repository
+                object kept by the transport, it should use this method
+                to replace the cached Repository object."""
+
+                assert(isinstance(pub, publisher.Publisher))
+
+                if not self.__engine:
+                        self.__setup()
+
+                origins = [pub.selected_repository.origins[0]]
+                rslist = self.stats.get_repostats(origins, origins)
+                rs, ruri = rslist[0]
+
+                self.__repo_cache.update_repo(rs, ruri, repo)
+
+        def publish_cache_contains(self, pub):
+                """Returns true if the publisher's origin is cached
+                in the repo cache."""
+
+                if not self.__engine:
+                        self.__setup()
+
+                originuri = pub.selected_repository.origins[0].uri
+                return originuri in self.__repo_cache
+
+
+
 class MultiXfr(object):
         """A transport object for performing multiple simultaneous
         requests.  This object matches publisher to list of requests, and
@@ -1930,16 +2388,17 @@
 
         def add_action(self, action):
                 """The multiple file retrieval operation is asynchronous.
-                Add files to retrieve with this function.  Supply the
-                publisher in pub and the list of files in filelist.
-                Wait for the operation by calling waitFiles."""
+                Add files to retrieve with this function.  The caller
+                should pass the action, which causes its file to
+                be added to an internal retrieval list."""
 
                 cpath = self._transport._action_cached(action,
                     self.get_publisher())
                 if cpath:
                         action.data = self._make_opener(cpath)
-                        filesz = int(misc.get_pkg_otw_size(action))
-                        self._progtrack.download_add_progress(1, filesz)
+                        if self._progtrack:
+                                filesz = int(misc.get_pkg_otw_size(action))
+                                self._progtrack.download_add_progress(1, filesz)
                         return
 
                 hashval = action.hash
@@ -1959,7 +2418,13 @@
                         return f
                 return opener
 
-        def make_openers(self, hashval, cache_path):
+        def file_done(self, hashval, current_path):
+                """Tell MFile that the transfer completed successfully."""
+
+                self._make_openers(hashval, current_path)
+                self.del_hash(hashval)
+
+        def _make_openers(self, hashval, cache_path):
                 """Find each action associated with the hash value hashval.
                 Create an opener that points to the cache file for the
                 action's data method."""
@@ -1984,7 +2449,9 @@
                 # by the difference between what we have and the total we should
                 # have received.
                 nbytes = int(totalsz - filesz)
-                self._progtrack.download_add_progress((nactions - 1), nbytes)
+                if self._progtrack:
+                        self._progtrack.download_add_progress((nactions - 1),
+                            nbytes)
 
         def subtract_progress(self, size):
                 """Subtract the progress accumulated by the download of
@@ -1993,6 +2460,9 @@
                 it has happened before make_openers, so it's only necessary
                 to adjust the progress for a single file."""
 
+                if not self._progtrack:
+                        return
+
                 self._progtrack.download_add_progress(-1, int(-size))
 
         def wait_files(self):
@@ -2002,3 +2472,126 @@
                 if self._hash:
                         self._transport._get_files(self)
 
+class MultiFileNI(MultiFile):
+        """A transport object for performing multi-file requests
+        using pkg actions.  This takes care of matching the publisher
+        with the actions, and performs the download and content
+        verification necessary to assure correct content installation.
+
+        This subclass is used when the actions won't be installed, but
+        are used to identify and verify the content.  Additional parameters
+        define what happens when download finishes successfully."""
+
+        def __init__(self, pub, xport, final_dir, decompress=False,
+            progtrack=None, ccancel=None):
+                """Supply the destination publisher in the pub argument.
+                The transport object should be passed in xport."""
+
+                MultiFile.__init__(self, pub, xport, progtrack=progtrack,
+                    ccancel=ccancel)
+
+                self._final_dir = final_dir
+                self._decompress = decompress
+
+        def add_action(self, action):
+                """The multiple file retrieval operation is asynchronous.
+                Add files to retrieve with this function.   The caller
+                should pass the action, which causes its file to
+                be added to an internal retrieval list."""
+
+                cpath = self._transport._action_cached(action,
+                    self.get_publisher())
+                hashval = action.hash
+
+                if cpath:
+                        self._final_copy(hashval, cpath)
+                        if self._progtrack:
+                                filesz = int(misc.get_pkg_otw_size(action))
+                                self._progtrack.download_add_progress(1, filesz)
+                        return
+
+                self.add_hash(hashval, action)
+
+        def file_done(self, hashval, current_path):
+                """Tell MFile that the transfer completed successfully."""
+
+                totalsz = 0
+                nactions = 0
+
+                filesz = os.stat(current_path).st_size
+                for action in self._hash[hashval]:
+                        nactions += 1
+                        totalsz += misc.get_pkg_otw_size(action)
+
+                # The progress tracker accounts for the sizes of all actions
+                # even if we only have to perform one download to satisfy
+                # multiple actions with the same hashval.  Since we know
+                # the size of the file we downloaded, but not necessarily
+                # the size of the action responsible for the download,
+                # generate the total size and subtract the size that was
+                # downloaded.  The downloaded size was already accounted for in
+                # the engine's progress tracking.  Adjust the progress tracker
+                # by the difference between what we have and the total we should
+                # have received.
+                nbytes = int(totalsz - filesz)
+                if self._progtrack:
+                        self._progtrack.download_add_progress((nactions - 1),
+                            nbytes)
+
+                self._final_copy(hashval, current_path)
+                self.del_hash(hashval)
+
+        def _final_copy(self, hashval, current_path):
+                """Copy the file named by hashval from current_path
+                to the final destination, decompressing, if necessary."""
+
+                dest = os.path.join(self._final_dir, hashval)
+                tmp_prefix = "%s." % hashval
+
+                try:
+                        os.makedirs(self._final_dir, mode=misc.PKG_DIR_MODE)
+                except EnvironmentError, e:
+                        if e.errno == errno.EACCES:
+                                raise apx.PermissionsException(e.filename)
+                        if e.errno == errno.EROFS:
+                                raise apx.ReadOnlyFileSystemException(
+                                    e.filename)
+                        if e.errno != errno.EEXIST:
+                                raise
+
+                try:
+                        fd, fn = tempfile.mkstemp(dir=self._final_dir,
+                            prefix=tmp_prefix)
+                except EnvironmentError, e:
+                        if e.errno == errno.EACCES:
+                                raise apx.PermissionsException(
+                                    e.filename)
+                        if e.errno == errno.EROFS:
+                                raise apx.ReadOnlyFileSystemException(
+                                    e.filename)
+                        raise
+
+                src = file(current_path, "rb")
+                outfile = os.fdopen(fd, "wb")
+                if self._decompress:
+                        misc.gunzip_from_stream(src, outfile)
+                else:
+                        while True:
+                                buf = src.read(64 * 1024)
+                                if buf == "":
+                                        break
+                                outfile.write(buf)
+                outfile.close()
+                src.close()
+
+                try:
+                        os.chmod(fn, misc.PKG_FILE_MODE)
+                        portable.rename(fn, dest)
+                except EnvironmentError, e:
+                        if e.errno == errno.EACCES:
+                                raise apx.PermissionsException(e.filename)
+                        if e.errno == errno.EROFS:
+                                raise apx.ReadOnlyFileSystemException(
+                                    e.filename)
+                        raise
+
--- a/src/modules/depotcontroller.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/depotcontroller.py	Thu Jul 15 15:40:39 2010 -0700
@@ -34,8 +34,6 @@
 import urllib2
 import urlparse
 
-from pkg.misc import versioned_urlopen
-
 class DepotStateException(Exception):
 
         def __init__(self, reason):
@@ -241,8 +239,9 @@
                         return self.__ping_unix_socket()
 
                 try:
-                        versioned_urlopen(self.get_depot_url(),
-                            "versions", [0])
+                        repourl = urlparse.urljoin(self.get_depot_url(),
+                            "versions/0")
+                        urllib2.urlopen(repourl)
                 except urllib2.HTTPError, e:
                         # Server returns NOT_MODIFIED if catalog is up
                         # to date
--- a/src/modules/manifest.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/manifest.py	Thu Jul 15 15:40:39 2010 -0700
@@ -667,8 +667,7 @@
                 return os.path.join(self.__pkgdir,
                     self.fmri.get_dir_path())
 
-        def __init__(self, fmri, pkgdir, preferred_pub, excludes=EmptyI,
-            contents=None):
+        def __init__(self, fmri, pkgdir, excludes=EmptyI, contents=None):
                 """Raises KeyError exception if cached manifest
                 is not present and contents are None; delays
                 reading of manifest until required if cache file
@@ -676,7 +675,6 @@
 
                 Manifest.__init__(self)
                 self.__pkgdir = pkgdir
-                self.__pub    = preferred_pub
                 self.loaded   = False
                 self.set_fmri(None, fmri)
                 self.excludes = excludes
--- a/src/modules/misc.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/misc.py	Thu Jul 15 15:40:39 2010 -0700
@@ -27,29 +27,24 @@
 import datetime
 import errno
 import hashlib
-import httplib
 import locale
 import OpenSSL.crypto as osc
 import operator
 import os
 import pkg.client.api_errors as api_errors
 import pkg.portable as portable
-import pkg.urlhelpers as urlhelpers
 import platform
 import re
 import shutil
-import socket
 import stat
 import struct
 import sys
 import time
 import urllib
-import urllib2
 import urlparse
 import zlib
 
 from pkg.client.imagetypes import img_type_names, IMG_NONE
-from pkg.client import global_settings
 from pkg import VERSION
 
 # Minimum number of days to issue warning before a certificate expires
@@ -117,90 +112,6 @@
 
         return useragent
 
-def versioned_urlopen(base_uri, operation, versions = None, tail = None,
-    data = None, headers = None, ssl_creds = None, imgtype = IMG_NONE,
-    method = "GET", uuid = None):
-        """Open the best URI for an operation given a set of versions.
-
-        Both the client and the server may support multiple versions of
-        the protocol of a particular operation.  The client will pass
-        this method an ordered array of versions it understands, along
-        with the base URI and the operation it wants.  This method will
-        open the URL corresponding to the best version both the client
-        and the server understand, returning a tuple of the open URL and
-        the version used on success, and throwing an exception if no
-        matching version can be found.
-        """
-        # Ignore http_proxy for localhost case, by overriding
-        # default proxy behaviour of urlopen().
-        netloc = urlparse.urlparse(base_uri)[1]
-
-        if not netloc:
-                raise ValueError, "Malformed URL: %s" % base_uri
-
-        if urllib.splitport(netloc)[0] == "localhost":
-                # XXX cache this opener?
-                proxy_handler = urllib2.ProxyHandler({})
-                opener_dir = urllib2.build_opener(proxy_handler)
-                url_opener = opener_dir.open
-        elif ssl_creds and ssl_creds != (None, None):
-                cert_handler = urlhelpers.HTTPSCertHandler(
-                    key_file = ssl_creds[0], cert_file = ssl_creds[1])
-                opener_dir = urllib2.build_opener(
-                    urlhelpers.HTTPSProxyHandler, cert_handler)
-                url_opener = opener_dir.open
-        else:
-                url_opener = urllib2.urlopen
-
-        if not versions:
-                versions = []
-
-        if not headers:
-                headers = {}
-
-        for i, version in enumerate(versions):
-                if base_uri[-1] != '/':
-                        base_uri += '/'
-
-                if tail:
-                        tail_str = tail
-                        if isinstance(tail, list):
-                                tail_str = tail[i]
-                        uri = urlparse.urljoin(base_uri, "%s/%s/%s" % \
-                            (operation, version, tail_str))
-                else:
-                        uri = urlparse.urljoin(base_uri, "%s/%s" % \
-                            (operation, version))
-
-                headers["User-Agent"] = \
-                    _client_version % (img_type_names[imgtype],
-                        global_settings.client_name)
-                if uuid:
-                        headers["X-IPkg-UUID"] = uuid
-                req = urllib2.Request(url = uri, headers = headers)
-                if method == "HEAD":
-                        # Must override urllib2's get_method since it doesn't
-                        # natively support this operation.
-                        req.get_method = lambda: "HEAD"
-                elif data is not None:
-                        req.add_data(data)
-
-                try:
-                        c = url_opener(req)
-                except urllib2.HTTPError, e:
-                        if e.code != httplib.NOT_FOUND:
-                                raise
-                        continue
-                # XXX catch BadStatusLine and convert to INTERNAL_SERVER_ERROR?
-
-                return c, version
-        else:
-                # Couldn't find a version that we liked.
-                raise RuntimeError, \
-                    "%s doesn't speak a known version of %s operation" % \
-                    (base_uri, operation)
-
-
 _hostname_re = re.compile("^[a-zA-Z0-9](?:[a-zA-Z0-9\-]*[a-zA-Z0-9]+\.?)*$")
 _invalid_host_chars = re.compile(".*[^a-zA-Z0-9\-\.]+")
 _valid_proto = ["file", "http", "https"]
@@ -432,8 +343,10 @@
         if the content should be discarded during processing."""
 
         bufsz = 128 * 1024
+        closefobj = False
         if isinstance(data, basestring):
                 f = file(data, "rb", bufsz)
+                closefobj = True
         else:
                 f = data
 
@@ -455,7 +368,8 @@
                         break
                 length -= l
         content.reset()
-        f.close()
+        if closefobj:
+                f.close()
 
         return fhash.hexdigest(), content.read()
 
--- a/src/modules/publish/transaction.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/publish/transaction.py	Thu Jul 15 15:40:39 2010 -0700
@@ -28,18 +28,15 @@
 repository.  Note that only the Transaction class should be used directly,
 though the other classes can be referred to for documentation purposes."""
 
-import httplib
-import os
-import StringIO
 import urllib
-import urllib2
 import urlparse
 
-from pkg.misc import versioned_urlopen, EmptyDict
+from pkg.misc import EmptyDict
 import pkg.actions as actions
 import pkg.config as cfg
 import pkg.portable.util as os_util
 import pkg.server.repository as sr
+import pkg.client.api_errors as apx
 
 class TransactionError(Exception):
         """Base exception class for all Transaction exceptions."""
@@ -128,352 +125,13 @@
                     "type": self._args.get("type", "") }
 
 
-class FileTransaction(object):
-        """Provides a publishing interface for file-based repositories."""
-
-        # Used to avoid the overhead of initializing the repository for
-        # successive transactions.
-        __repo_cache = {}
-
-        def __init__(self, origin_url, create_repo=False, pkg_name=None,
-            repo_props=EmptyDict, trans_id=None, refresh_index=True):
-                scheme, netloc, path, params, query, fragment = \
-                    urlparse.urlparse(origin_url, "file", allow_fragments=0)
-                path = urllib.url2pathname(path)
-
-                repo_cache = self.__class__.__repo_cache
-
-                if not os.path.isabs(path):
-                        raise TransactionRepositoryURLError(origin_url,
-                            msg=_("Not an absolute path."))
-
-                if origin_url not in repo_cache:
-                        try:
-                                repo = sr.Repository(auto_create=create_repo,
-                                    properties=repo_props, repo_root=path, 
-                                    refresh_index=refresh_index)
-                        except EnvironmentError, e:
-                                raise TransactionOperationError(None, msg=_(
-                                    "An error occurred while trying to "
-                                    "initialize the repository directory "
-                                    "structures:\n%s") % e)
-                        except cfg.ConfigError, e:
-                                raise TransactionRepositoryConfigError(str(e))
-                        except sr.RepositoryInvalidError, e:
-                                raise TransactionRepositoryInvalidError(str(e))
-                        except sr.RepositoryError, e:
-                                raise TransactionOperationError(None,
-                                    msg=str(e))
-                        repo_cache[origin_url] = repo
-
-                self.__repo = repo_cache[origin_url]
-                self.origin_url = origin_url
-                self.pkg_name = pkg_name
-                self.trans_id = trans_id
-
-        def add(self, action):
-                """Adds an action and its related content to an in-flight
-                transaction.  Returns nothing."""
-
-                try:
-                        # Perform additional publication-time validation of
-                        # actions before further processing is done.
-                        action.validate()
-
-                        # Now add to the repository.
-                        self.__repo.add(self.trans_id, action)
-                except (actions.ActionError, sr.RepositoryError), e:
-                        raise TransactionOperationError("add",
-                            trans_id=self.trans_id, msg=str(e))
-
-        def close(self, abandon=False, refresh_index=True, add_to_catalog=True):
-                """Ends an in-flight transaction.  Returns a tuple containing
-                a package fmri (if applicable) and the final state of the
-                related package.
-
-                If 'abandon' is omitted or False, the package will be published;
-                otherwise the server will discard the current transaction and
-                its related data.
-
-                If 'refresh_index' is True, the repository will be instructed
-                to update its search indices after publishing.  Has no effect
-                if 'abandon' is True."""
-
-                if abandon:
-                        try:
-                                pkg_fmri = None
-                                pkg_state = self.__repo.abandon(self.trans_id)
-                        except sr.RepositoryError, e:
-                                raise TransactionOperationError("abandon",
-                                    trans_id=self.trans_id, msg=str(e))
-                else:
-                        try:
-                                pkg_fmri, pkg_state = self.__repo.close(
-                                    self.trans_id, 
-                                    refresh_index=refresh_index,
-                                    add_to_catalog=add_to_catalog)
-                        except sr.RepositoryError, e:
-                                raise TransactionOperationError("close",
-                                    trans_id=self.trans_id, msg=str(e))
-                return pkg_fmri, pkg_state
-
-        def open(self):
-                """Starts an in-flight transaction. Returns a URL-encoded
-                transaction ID on success."""
-
-                try:
-                        self.trans_id = self.__repo.open(
-                            os_util.get_os_release(), self.pkg_name)
-                except sr.RepositoryError, e:
-                        raise TransactionOperationError("open",
-                            trans_id=self.trans_id, msg=str(e))
-                return self.trans_id
-
-        def refresh_index(self):
-                """Instructs the repository to refresh its search indices.
-                Returns nothing."""
-
-                try:
-                        self.__repo.refresh_index()
-                except sr.RepositoryError, e:
-                        raise TransactionOperationError("refresh_index",
-                            msg=str(e))
-
-
-class HTTPTransaction(object):
-        """Provides a publishing interface for HTTP(S)-based repositories."""
-
-        def __init__(self, origin_url, create_repo=False, pkg_name=None,
-            repo_props=EmptyDict, trans_id=None, refresh_index=True):
-
-                if create_repo:
-                        scheme, netloc, path, params, query, fragment = \
-                            urlparse.urlparse(origin_url, "http",
-                            allow_fragments=0)
-                        raise UnsupportedRepoTypeOperationError("create_repo",
-                            type=scheme)
-
-                self.origin_url = origin_url
-                self.pkg_name = pkg_name
-                self.trans_id = trans_id
-
-        @staticmethod
-        def __get_urllib_error(e):
-                """Analyzes the server error response and returns a tuple of
-                status (server response code), message (the textual response
-                from the server if available)."""
-
-                status = httplib.INTERNAL_SERVER_ERROR
-                msg = None
-
-
-                if not e:
-                        return status, msg
-
-                if hasattr(e, "code"):
-                        status = e.code
-
-                if hasattr(e, "read") and callable(e.read):
-                        # Extract the message from the server output.
-                        msg = ""
-                        from xml.dom.minidom import Document, parse
-                        output = e.read()
-                        dom = parse(StringIO.StringIO(output))
-
-                        paragraphs = []
-                        if not isinstance(dom, Document):
-                                # Assume the output was the message.
-                                msg = output
-                        else:
-                                paragraphs = dom.getElementsByTagName("p")
-
-                        # XXX this is specific to the depot server's current
-                        # error output style.
-                        for p in paragraphs:
-                                for c in p.childNodes:
-                                        if c.nodeType == c.TEXT_NODE:
-                                                value = c.nodeValue
-                                                if value is not None:
-                                                        msg += ("\n%s" % value)
-
-                if not msg and status == httplib.NOT_FOUND:
-                        msg = _("Unsupported or temporarily unavailable "
-                            "operation requested.")
-                elif not msg:
-                        msg = str(e)
-
-                return status, msg
-
-        def add(self, action):
-                """Adds an action and its related content to an in-flight
-                transaction.  Returns nothing."""
-
-                try:
-                        # Perform additional publication-time validation of
-                        # actions before further processing is done.
-                        action.validate()
-                except actions.ActionError, e:
-                        raise TransactionOperationError("add",
-                            trans_id=self.trans_id, msg=str(e))
-
-                attrs = action.attrs
-                if action.data != None:
-                        datastream = action.data()
-                        # XXX Need to handle large files better;
-                        # versioned_urlopen requires the whole file to be in
-                        # memory because of the underlying request library.
-                        data = datastream.read()
-                        sz = int(attrs["pkg.size"])
-                else:
-                        data = ""
-                        sz = 0
-
-                headers = dict(
-                    ("X-IPkg-SetAttr%s" % i, "%s=%s" % (k, attrs[k]))
-                    for i, k in enumerate(attrs)
-                )
-                headers["Content-Length"] = sz
-
-                try:
-                        c, v = versioned_urlopen(self.origin_url, "add",
-                            [0], "%s/%s" % (self.trans_id, action.name),
-                            data=data, headers=headers)
-                except (httplib.BadStatusLine, RuntimeError), e:
-                        status = httplib.INTERNAL_SERVER_ERROR
-                        msg = str(e)
-                except (urllib2.HTTPError, urllib2.URLError), e:
-                        status, msg = self.__get_urllib_error(e)
-                else:
-                        msg = None
-                        status = c.code
-
-                if status / 100 == 4 or status / 100 == 5:
-                        raise TransactionOperationError("add",
-                            trans_id=self.trans_id, status=status, msg=msg)
-
-        def close(self, abandon=False, refresh_index=True, add_to_catalog=False):
-                """Ends an in-flight transaction.  Returns a tuple containing
-                a package fmri (if applicable) and the final state of the
-                related package.
-
-                If 'abandon' is omitted or False, the package will be published;
-                otherwise the server will discard the current transaction and
-                its related data.
-
-                If 'refresh_index' is True, the repository will be instructed
-                to update its search indices after publishing.  Has no effect
-                if 'abandon' is True.
-                
-                'add_to_catalog' isn't supported w/ http transport, but ignoring
-                it will affect performance only.                
-                """
-
-                
-                op = "close"
-                if abandon:
-                        op = "abandon"
-
-                headers = {}
-                if not refresh_index:
-                        # The default is to do so, so only send this if false.
-                        headers["X-IPkg-Refresh-Index"] = 0
-
-                try:
-                        c, v = versioned_urlopen(self.origin_url, op, [0],
-                            self.trans_id, headers=headers)
-                except (httplib.BadStatusLine, RuntimeError), e:
-                        status = httplib.INTERNAL_SERVER_ERROR
-                        msg = str(e)
-                except (urllib2.HTTPError, urllib2.URLError), e:
-                        status, msg = self.__get_urllib_error(e)
-                except RuntimeError, e:
-                        # Assume the server didn't find the transaction or
-                        # can't perform the operation.
-                        status = httplib.NOT_FOUND
-                        msg = str(e)
-                else:
-                        msg = None
-                        status = c.code
-
-                if status / 100 == 4 or status / 100 == 5:
-                        raise TransactionOperationError(op,
-                            trans_id=self.trans_id, status=status, msg=msg)
-
-                # Return only the headers the client should care about.
-                hdrs = c.info()
-                return hdrs.get("State", None), hdrs.get("Package-FMRI", None)
-
-        def open(self):
-                """Starts an in-flight transaction. Returns a URL-encoded
-                transaction ID on success."""
-
-                # XXX This opens a Transaction, but who manages the server
-                # connection?  If we want a pipelined HTTP session (multiple
-                # operations -- even if it's only one Transaction -- over a
-                # single connection), then we can't call HTTPConnection.close()
-                # here, and we shouldn't reopen the connection in add(),
-                # close(), etc.
-                try:
-                        headers = {"Client-Release": os_util.get_os_release()}
-                        c, v = versioned_urlopen(self.origin_url, "open",
-                            [0], urllib.quote(self.pkg_name, ""),
-                            headers=headers)
-                        self.trans_id = c.headers.get("Transaction-ID", None)
-                except (httplib.BadStatusLine, RuntimeError), e:
-                        status = httplib.INTERNAL_SERVER_ERROR
-                        msg = str(e)
-                except (urllib2.HTTPError, urllib2.URLError), e:
-                        status, msg = self.__get_urllib_error(e)
-                else:
-                        msg = None
-                        status = c.code
-
-                if status / 100 == 4 or status / 100 == 5:
-                        raise TransactionOperationError("open",
-                            trans_id=self.trans_id, status=status, msg=msg)
-                elif self.trans_id is None:
-                        raise TransactionOperationError("open",
-                            status=status, msg=_("Unknown failure; no "
-                            "transaction ID provided in response: %s") % msg)
-
-                return self.trans_id
-
-        def refresh_index(self):
-                """Instructs the repository to refresh its search indices.
-                Returns nothing."""
-
-                op = "index"
-                subop = "refresh"
-
-                headers = {}
-
-                try:
-                        c, v = versioned_urlopen(self.origin_url, op, [0],
-                            subop, headers=headers)
-                except (httplib.BadStatusLine, RuntimeError), e:
-                        status = httplib.INTERNAL_SERVER_ERROR
-                        msg = str(e)
-                except (urllib2.HTTPError, urllib2.URLError), e:
-                        status, msg = self.__get_urllib_error(e)
-                except RuntimeError, e:
-                        # Assume the server can't perform the operation.
-                        status = httplib.NOT_FOUND
-                        msg = str(e)
-                else:
-                        msg = None
-                        status = c.code
-
-                if status / 100 == 4 or status / 100 == 5:
-                        raise TransactionOperationError(op,
-                            trans_id=self.trans_id, status=status, msg=msg)
-
-
 class NullTransaction(object):
         """Provides a simulated publishing interface suitable for testing
         purposes."""
 
         def __init__(self, origin_url, create_repo=False, pkg_name=None,
-            repo_props=EmptyDict, trans_id=None, refresh_index=True):
+            repo_props=EmptyDict, trans_id=None, refresh_index=True,
+            xport=None, pub=None):
                 self.create_repo = create_repo
                 self.origin_url = origin_url
                 self.pkg_name = pkg_name
@@ -516,6 +174,162 @@
                 Returns nothing."""
                 pass
 
+class TransportTransaction(object):
+        """Provides a publishing interface that uses client transport."""
+
+        def __init__(self, origin_url, create_repo=False, pkg_name=None,
+            repo_props=EmptyDict, trans_id=None, refresh_index=True,
+            xport=None, pub=None):
+
+                scheme, netloc, path, params, query, fragment = \
+                    urlparse.urlparse(origin_url, "http", allow_fragments=0)
+
+                self.pkg_name = pkg_name
+                self.trans_id = trans_id
+                self.scheme = scheme
+                self.path = path
+                self.transport = xport
+                self.publisher = pub
+
+                if scheme == "file":
+                        self.create_file_repo(origin_url, repo_props=repo_props,
+                            create_repo=create_repo,
+                            refresh_index=refresh_index)
+                elif scheme != "file" and create_repo:
+                        raise UnsupportedRepoTypeOperationError("create_repo",
+                            type=scheme)
+
+
+        def create_file_repo(self, origin_url, repo_props=EmptyDict,
+            create_repo=False, refresh_index=True):
+
+                if self.transport.publish_cache_contains(self.publisher):
+                        return
+        
+                try:
+                        repo = sr.Repository(auto_create=create_repo,
+                            properties=repo_props, repo_root=self.path, 
+                            refresh_index=refresh_index)
+                except EnvironmentError, e:
+                        raise TransactionOperationError(None, msg=_(
+                            "An error occurred while trying to "
+                            "initialize the repository directory "
+                            "structures:\n%s") % e)
+                except cfg.ConfigError, e:
+                        raise TransactionRepositoryConfigError(str(e))
+                except sr.RepositoryInvalidError, e:
+                        raise TransactionRepositoryInvalidError(str(e))
+                except sr.RepositoryError, e:
+                        raise TransactionOperationError(None,
+                            msg=str(e))
+
+                self.transport.publish_cache_repository(self.publisher, repo)
+
+
+        def add(self, action):
+                """Adds an action and its related content to an in-flight
+                transaction.  Returns nothing."""
+
+                try:
+                        # Perform additional publication-time validation of
+                        # actions before further processing is done.
+                        action.validate()
+                except actions.ActionError, e:
+                        raise TransactionOperationError("add",
+                            trans_id=self.trans_id, msg=str(e))
+
+                try:
+                        self.transport.publish_add(self.publisher,
+                            action=action, trans_id=self.trans_id)
+                except apx.TransportError, e:
+                        msg = str(e)
+                        raise TransactionOperationError("add",
+                            trans_id=self.trans_id, msg=msg)
+
+        def close(self, abandon=False, refresh_index=True, add_to_catalog=True):
+                """Ends an in-flight transaction.  Returns a tuple containing
+                a package fmri (if applicable) and the final state of the
+                related package.
+
+                If 'abandon' is omitted or False, the package will be published;
+                otherwise the server will discard the current transaction and
+                its related data.
+
+                If 'refresh_index' is True, the repository will be instructed
+                to update its search indices after publishing.  Has no effect
+                if 'abandon' is True.
+                
+                'add_to_catalog' tells the depot to add a package to the
+                catalog, if True.
+                """
+
+                if abandon:
+                        try:
+                                state, fmri = self.transport.publish_abandon(
+                                    self.publisher, trans_id=self.trans_id)
+                        except apx.TransportError, e:
+                                msg = str(e)
+                                raise TransactionOperationError("abandon",
+                                    trans_id=self.trans_id, msg=msg)
+                else:
+
+                        # If caller hasn't supplied add_to_catalog, pick an
+                        # appropriate default, based upon the transport.
+                        if add_to_catalog is None:
+                                if self.scheme == "file":
+                                        add_to_catalog = True
+                                else:
+                                        add_to_catalog = False
+                        
+                        try:
+                                state, fmri = self.transport.publish_close(
+                                    self.publisher, trans_id=self.trans_id,
+                                    refresh_index=refresh_index,
+                                    add_to_catalog=add_to_catalog)
+                        except apx.TransportError, e:
+                                msg = str(e)
+                                raise TransactionOperationError("close",
+                                    trans_id=self.trans_id, msg=msg)
+
+                return state, fmri
+
+        def open(self):
+                """Starts an in-flight transaction. Returns a URL-encoded
+                transaction ID on success."""
+
+                trans_id = None
+
+                try:
+                        trans_id = self.transport.publish_open(self.publisher,
+                            client_release=os_util.get_os_release(),
+                            pkg_name=self.pkg_name)
+                except apx.TransportError, e:
+                        msg = str(e)
+                        raise TransactionOperationError("open",
+                            trans_id=self.trans_id, msg=msg)
+
+                self.trans_id = trans_id
+
+                if self.trans_id is None:
+                        raise TransactionOperationError("open",
+                            msg=_("Unknown failure; no transaction ID provided"
+                            " in response."))
+
+                return self.trans_id
+
+        def refresh_index(self):
+                """Instructs the repository to refresh its search indices.
+                Returns nothing."""
+
+                op = "index"
+
+                try:
+                        self.transport.publish_refresh_index(self.publisher)
+                except apx.TransportError, e:
+                        msg = str(e)
+                        raise TransactionOperationError(op,
+                            trans_id=self.trans_id, msg=msg)
+
 
 class Transaction(object):
         """Returns an object representing a publishing "transaction" interface
@@ -543,20 +357,25 @@
         """
 
         __schemes = {
-            "file": FileTransaction,
-            "http": HTTPTransaction,
-            "https": HTTPTransaction,
+            "file": TransportTransaction,
+            "http": TransportTransaction,
+            "https": TransportTransaction,
             "null": NullTransaction,
         }
 
-        def __new__(cls, origin_url, add_to_catalog=True, create_repo=False, pkg_name=None,
-            repo_props=EmptyDict, trans_id=None, noexecute=False, refresh_index=True):
+        def __new__(cls, origin_url, add_to_catalog=True, create_repo=False,
+            pkg_name=None, repo_props=EmptyDict, trans_id=None,
+            noexecute=False, refresh_index=True, xport=None, pub=None):
+
                 scheme, netloc, path, params, query, fragment = \
                     urlparse.urlparse(origin_url, "http", allow_fragments=0)
                 scheme = scheme.lower()
 
                 if noexecute:
                         scheme = "null"
+                if scheme != "null" and (not xport or not pub):
+                        raise TransactionError("Caller must supply transport "
+                            "and publisher.")
                 if scheme not in cls.__schemes:
                         raise TransactionRepositoryURLError(origin_url,
                             scheme=scheme)
@@ -566,10 +385,12 @@
                 if scheme.startswith("file"):
                         if netloc:
                                 raise TransactionRepositoryURLError(origin_url,
-                                    msg="'%s' contains host information, which is not "
-                                        "supported for filesystem operations." % netloc)
-                        # as we're urlunparsing below, we need to ensure that the path
-                        # starts with only one '/' character, if any are present
+                                    msg="'%s' contains host information, which "
+                                    "is not supported for filesystem "
+                                    "operations." % netloc)
+                        # as we're urlunparsing below, we need to ensure that
+                        # the path starts with only one '/' character, if any
+                        # are present
                         if path.startswith("/"):
                                 path = "/" + path.lstrip("/")
 
@@ -578,5 +399,6 @@
                     query, fragment))
 
                 return cls.__schemes[scheme](origin_url,
-                    create_repo=create_repo, pkg_name=pkg_name, refresh_index=refresh_index,
-                    repo_props=repo_props, trans_id=trans_id)
+                    create_repo=create_repo, pkg_name=pkg_name,
+                    refresh_index=refresh_index, repo_props=repo_props,
+                    trans_id=trans_id, xport=xport, pub=pub)
--- a/src/modules/server/depot.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/modules/server/depot.py	Thu Jul 15 15:40:39 2010 -0700
@@ -771,8 +771,23 @@
                             "X-IPkg-Refresh-Index: %s" % e)
 
                 try:
+                        # Assume "True" for backwards compatibility.
+                        add_to_catalog = int(request.headers.get(
+                            "X-IPkg-Add-To-Catalog", 1))
+
+                        # Force a boolean value.
+                        if add_to_catalog:
+                                add_to_catalog = True
+                        else:
+                                add_to_catalog = False
+                except ValueError, e:
+                        raise cherrypy.HTTPError(httplib.BAD_REQUEST,
+                            "X-IPkg-Add-To-Catalog" % e)
+
+                try:
                         pfmri, pstate = self.repo.close(trans_id,
-                            refresh_index=refresh_index)
+                            refresh_index=refresh_index,
+                            add_to_catalog=add_to_catalog)
                 except repo.RepositoryError, e:
                         # Assume a bad request was made.  A 404 can't be
                         # returned here as misc.versioned_urlopen will interpret
--- a/src/modules/urlhelpers.py	Tue Jul 13 18:17:07 2010 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,115 +0,0 @@
-#!/usr/bin/python
-# Additional classes to supplement functionality in urllib2
-
-import urllib2
-import urllib
-from urlparse import urlparse
-import httplib
-import socket
-import base64
-
-class HTTPSCertHandler(urllib2.HTTPSHandler):
-
-        def __init__(self, key_file=None, cert_file=None, strict=None):
-                self.key = key_file
-                self.cert = cert_file
-                self.strict = strict
-
-                urllib2.AbstractHTTPHandler.__init__(self)
-
-        def https_open(self, req):
-                if hasattr(req, 'connection'):
-                        # have the connection from the proxy, make it ssl
-                        h = req.connection
-                        ssl = socket.ssl(h.sock, self.key, self.cert)
-                        h.sock = httplib.FakeSocket(h.sock, ssl)
-                        h.strict = self.strict
-                else:
-                        host = req.get_host()
-                        if not host:
-                                raise urllib2.URLError('no host given')
-
-                        h = httplib.HTTPSConnection(host, key_file=self.key,
-                            cert_file=self.cert, strict=self.strict)
-                        h.set_debuglevel(self._debuglevel)
-                        self.connection = h
-
-                headers = dict(req.headers)
-                headers.update(req.unredirected_hdrs)
-                headers["Connection"] = "close"
-                try:
-                        h.request(req.get_method(), req.get_selector(), req.data, headers)
-                        r = h.getresponse()
-                except socket.error, err:
-                        raise urllib2.URLError(err)
-
-                r.recv = r.read
-                fp = socket._fileobject(r)
-
-                resp = urllib.addinfourl(fp, r.msg, req.get_full_url())
-                resp.code = r.status
-                resp.msg = r.reason
-                return resp
-
-
-        https_request = urllib2.AbstractHTTPHandler.do_request_
-
-class HTTPSProxyHandler(urllib2.ProxyHandler):
-        # Proxies must be in front
-        handler_order = 100
-
-        def __init__(self, proxies=None):
-                if proxies is None:
-                        proxies = urllib2.getproxies()
-                assert isinstance(proxies, dict)
-                # only handle https proxy
-                self.proxy = None
-                if 'https' in proxies:
-                        self.proxy = proxies['https']
-
-        def https_open(self, req):
-                # do nothing if no proxy is defined
-                if not self.proxy:
-                        return None
-
-                realurl = urlparse(req.get_full_url())
-                assert(realurl[0] == 'https')
-                real_host, real_port = urllib.splitport(realurl[1])
-                if real_port is None:
-                        real_port = 443
-
-                proxyurl = urlparse(self.proxy)
-                phost = proxyurl[1]
-                pw_hdr = ''
-                if '@' in phost:
-                        user_pass, phost = host.split('@', 1)
-                        if ':' in user_pass:
-                                user, password = user_pass.split(':', 1)
-                                user_pass = base64.encodestring(
-                                    '%s:%s' % (unquote(user),
-                                    unquote(password))).strip()
-                        pw_hdr = 'Proxy-authorization: Basic %s\r\n' % user_pass
-                phost = urllib.unquote(phost)
-                req.set_proxy(phost, proxyurl[0])
-
-                h = httplib.HTTPConnection(phost)
-                h.connect()
-                # send proxy CONNECT request
-                h.send("CONNECT %s:%d HTTP/1.0%s\r\n\r\n" % \
-                    (real_host, real_port, pw_hdr))
-                # expect a HTTP/1.0 200 Connection established
-                response = h.response_class(h.sock, strict=h.strict, 
-                    method=h._method)
-                response.begin()
-                if response.status != httplib.OK:
-                        # proxy returned an error: abort connection, 
-                        # and raise exception
-                        h.close()
-                        raise urllib2.HTTPError, \
-                            (self.proxy, response.status,
-                            "proxy connection failed: %s" % response.reason,
-                            None, None)
-
-                # make the connection available for HTTPSCertHandler
-                req.connection = h
-                return None        
--- a/src/pkgdefs/SUNWipkg/prototype	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/pkgdefs/SUNWipkg/prototype	Thu Jul 15 15:40:39 2010 -0700
@@ -248,8 +248,6 @@
 d none usr/lib/python2.6/vendor-packages/pkg/um 755 root bin
 f none usr/lib/python2.6/vendor-packages/pkg/updatelog.py 444 root bin
 f none usr/lib/python2.6/vendor-packages/pkg/updatelog.pyc 444 root bin
-f none usr/lib/python2.6/vendor-packages/pkg/urlhelpers.py 444 root bin
-f none usr/lib/python2.6/vendor-packages/pkg/urlhelpers.pyc 444 root bin
 f none usr/lib/python2.6/vendor-packages/pkg/variant.py 444 root bin
 f none usr/lib/python2.6/vendor-packages/pkg/variant.pyc 444 root bin
 f none usr/lib/python2.6/vendor-packages/pkg/version.py 444 root bin
--- a/src/publish.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/publish.py	Thu Jul 15 15:40:39 2010 -0700
@@ -51,7 +51,10 @@
 import pkg.fmri
 import pkg.manifest
 import pkg.publish.transaction as trans
+import pkg.client.transport as transport
+import pkg.client.publisher as publisher
 from pkg.misc import msg, emsg, PipeError
+from pkg.client import global_settings
 
 nopub_actions = [ "unknown" ]
 
@@ -121,9 +124,11 @@
                         repo_props.setdefault(p_sec, {})
                         repo_props[p_sec][p_name] = p_value
 
+        xport, pub = setup_transport_and_pubs(repo_uri)
+
         try:
                 trans.Transaction(repo_uri, create_repo=True,
-                    repo_props=repo_props)
+                    repo_props=repo_props, xport=xport, pub=pub)
         except trans.TransactionRepositoryConfigError, e:
                 error(e, cmd="create-repository")
                 emsg(_("Invalid repository configuration values were "
@@ -154,7 +159,9 @@
         if len(pargs) != 1:
                 usage(_("open requires one package name"), cmd="open")
 
-        t = trans.Transaction(repo_uri, pkg_name=pargs[0])
+        xport, pub = setup_transport_and_pubs(repo_uri)
+
+        t = trans.Transaction(repo_uri, pkg_name=pargs[0], xport=xport, pub=pub)
         if eval_form:
                 msg("export PKG_TRANS_ID=%s" % t.open())
         else:
@@ -186,8 +193,9 @@
                         usage(_("No transaction ID specified using -t or in "
                             "$PKG_TRANS_ID."), cmd="close")
 
+        xport, pub = setup_transport_and_pubs(repo_uri)
         t = trans.Transaction(repo_uri, trans_id=trans_id,
-            add_to_catalog=add_to_catalog)
+            add_to_catalog=add_to_catalog, xport=xport, pub=pub)
         pkg_state, pkg_fmri = t.close(abandon, refresh_index)
         for val in (pkg_state, pkg_fmri):
                 if val is not None:
@@ -210,7 +218,9 @@
                 error(_("invalid action for publication: %s") % action, cmd="add")
                 return 1
 
-        t = trans.Transaction(repo_uri, trans_id=trans_id)
+        xport, pub = setup_transport_and_pubs(repo_uri)
+        t = trans.Transaction(repo_uri, trans_id=trans_id, xport=xport,
+            pub=pub)
         t.add(action)
         return 0
 
@@ -275,7 +285,7 @@
                         if lineno > tup[0] and lineno <= tup[1]:
                                 filename = filelist[i][0]
                                 lineno -= tup[0]
-                                break;
+                                break
                 else:
                         filename = "???"
                         lineno = "???"
@@ -291,7 +301,9 @@
                         return 1
                 pkg_name = pkg.fmri.PkgFmri(m["pkg.fmri"]).get_short_fmri()
 
-        t = trans.Transaction(repo_uri, pkg_name=pkg_name, refresh_index=refresh_index)
+        xport, pub = setup_transport_and_pubs(repo_uri)
+        t = trans.Transaction(repo_uri, pkg_name=pkg_name,
+            refresh_index=refresh_index, xport=xport, pub=pub)
         t.open()
 
         for a in m.gen_actions():
@@ -333,7 +345,9 @@
                 except KeyError:
                         usage(_("No transaction ID specified in $PKG_TRANS_ID"),
                             cmd="include")
-                t = trans.Transaction(repo_uri, trans_id=trans_id)
+                xport, pub = setup_transport_and_pubs(repo_uri)
+                t = trans.Transaction(repo_uri, trans_id=trans_id, xport=xport,
+                    pub=pub)
         else:
                 t = transaction
 
@@ -371,7 +385,7 @@
                         if lineno > tup[0] and lineno <= tup[1]:
                                 filename = filelist[i][0]
                                 lineno -= tup[0]
-                                break;
+                                break
                 else:
                         filename = "???"
                         lineno = "???"
@@ -437,7 +451,9 @@
         if not args:
                 usage(_("No arguments specified for subcommand."),
                     cmd="import")
-        t = trans.Transaction(repo_uri, trans_id=trans_id)
+
+        xport, pub = setup_transport_and_pubs(repo_uri)
+        t = trans.Transaction(repo_uri, trans_id=trans_id, xport=xport, pub=pub)
 
         try:
                 for action, err in gen_actions(pargs, timestamp_files,
@@ -479,8 +495,8 @@
                     cmd="generate")
 
         try:
-                 for action, err in gen_actions(pargs, timestamp_files,
-                     target_files):
+                for action, err in gen_actions(pargs, timestamp_files,
+                    target_files):
                         if "path" in action.attrs and hasattr(action, "hash") \
                             and action.hash == "NOHASH":
                                 action.hash = action.attrs["path"]
@@ -505,13 +521,28 @@
                 usage(_("command does not take operands"),
                     cmd="refresh-index")
 
+        xport, pub = setup_transport_and_pubs(repo_uri)
         try:
-                t = trans.Transaction(repo_uri).refresh_index()
+                t = trans.Transaction(repo_uri, xport=xport, pub=pub).refresh_index()
         except trans.TransactionError, e:
                 error(e, cmd="refresh-index")
                 return 1
         return 0
 
+def setup_transport_and_pubs(repo_uri):
+
+        try:
+                repo = publisher.Repository(origins=[repo_uri])
+                pub = publisher.Publisher(prefix="default", repositories=[repo])
+                xport = transport.Transport(transport.GenericTransportCfg(
+                    publishers=[pub]))
+        except apx.UnsupportedRepositoryURI:
+                if repo_uri.startswith("null:"):
+                        return None, None
+                raise
+
+        return xport, pub
+
 def main_func():
         gettext.install("pkg", "/usr/share/locale")
 
@@ -521,6 +552,7 @@
                 repo_uri = "http://localhost:10000"
 
         show_usage = False
+        global_settings.client_name = "pkgsend"
         try:
                 opts, pargs = getopt.getopt(sys.argv[1:], "s:?", ["help"])
                 for opt, arg in opts:
@@ -580,8 +612,9 @@
 
         try:
                 __ret = main_func()
-        except (pkg.actions.ActionError, apx.InvalidPackageErrors,
-            trans.TransactionError, RuntimeError, pkg.fmri.IllegalFmri), _e:
+        except (pkg.actions.ActionError, trans.TransactionError,
+            RuntimeError, pkg.fmri.IllegalFmri, apx.BadRepositoryURI,
+            apx.UnsupportedRepositoryURI, apx.InvalidPackageErrors), _e:
                 print >> sys.stderr, "pkgsend: %s" % _e
                 __ret = 1
         except (PipeError, KeyboardInterrupt):
--- a/src/pull.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/pull.py	Thu Jul 15 15:40:39 2010 -0700
@@ -42,23 +42,26 @@
 import pkg.fmri
 import pkg.manifest as manifest
 import pkg.client.api_errors as api_errors
+import pkg.client.publisher as publisher
+import pkg.client.transport as transport
 import pkg.misc as misc
-import pkg.pkgtarfile as ptf
-import pkg.portable as portable
 import pkg.publish.transaction as trans
 import pkg.search_errors as search_errors
-import pkg.server.catalog as sc
 import pkg.server.repository as sr
 import pkg.version as version
 
 from pkg.client import global_settings
-from pkg.misc import (emsg, get_pkg_otw_size, gunzip_from_stream, msg,
-    versioned_urlopen, PipeError)
+from pkg.misc import (emsg, get_pkg_otw_size, msg, PipeError)
 
 # Globals
+cache_dir = None
 complete_catalog = None
+download_start = False
 repo_cache = {}
 tmpdirs = []
+temp_root = None
+xport = None
+xport_cfg = None
 
 def error(text):
         """Emit an error message prefixed by the command name """
@@ -90,6 +93,13 @@
         pkgrecv [-s src_repo_uri] -n
 
 Options:
+        -c cache_dir    The path to a directory that will be used to cache
+                        downloaded content.  If one is not supplied, the
+                        client will automatically pick a cache directory.
+                        In the case where a download is interrupted, and a
+                        cache directory was automatically chosen, use this
+                        option to resume the download.
+
         -d path_or_uri  The path of a directory to save the retrieved package
                         to, or the URI of a repository to republish it to.  If
                         not provided, the default value is the current working
@@ -122,9 +132,18 @@
         PKG_SRC         Source repository URI"""))
         sys.exit(retcode)
 
-def cleanup():
+def cleanup(caller_error=False):
         """To be called at program finish."""
+
         for d in tmpdirs:
+                # If the cache_dir is in the list of directories that should
+                # be cleaned up, but we're exiting with an error, then preserve
+                # the directory so downloads may be resumed.
+                if d == cache_dir and caller_error and download_start:
+                        error(_("\n\nCached files were preserved in the "
+                            "following directory:\n\t%s\nUse pkgrecv -c "
+                            "to resume the interrupted download.") % cache_dir)
+                        continue
                 shutil.rmtree(d, True)
 
 def abort(err=None, retcode=1):
@@ -149,44 +168,24 @@
                         progresstracker = progress.CommandLineProgressTracker()
         return progresstracker
 
-def get_manifest(src_uri, pfmri, basedir, contents=False):
+def get_manifest(pfmri, basedir, contents=False):
 
         m = None
         pkgdir = os.path.join(basedir, pfmri.get_dir_path())
         mpath = os.path.join(pkgdir, "manifest")
 
-        raw = None
-        overwrite = False
         if not os.path.exists(mpath):
-                raw = fetch_manifest(src_uri, pfmri)
-                overwrite = True
+                m = xport.get_manifest(pfmri)
         else:
                 try:
-                        raw = file(mpath, "rb").read()
+                        m = manifest.CachedManifest(pfmri, basedir)
                 except:
-                        abort(err=_("Unable to load manifest '%(mpath)s' for "
-                            "package '%(pfmri)s'.") % locals())
+                        abort(err=_("Unable to parse manifest '%(mpath)s' for "
+                            "package '%(pfmri)s'") % locals())
 
         if contents:
-                return raw
-
-        try:
-                m = manifest.CachedManifest(pfmri, basedir, None,
-                    contents=raw)
-        except:
-                abort(err=_("Unable to parse manifest '%(mpath)s' for package "
-                    "'%(pfmri)s'") % locals())
+                return m.tostr_unsorted()
 
-        if overwrite:
-                # Overwrite the manifest file so that the on-disk version will
-                # be consistent with the server (due to fmri addition).
-                try:
-                        f = open(mpath, "wb")
-                        f.write(raw)
-                        f.close()
-                except:
-                        abort(err=_("Unable to write manifest '%(mpath)s' for "
-                            "package '%(pfmri)s'.") % locals())
         return m
 
 def get_repo(uri):
@@ -216,38 +215,6 @@
         repo_cache[uri] = repo
         return repo
 
-def fetch_manifest(src_uri, pfmri):
-        """Return the manifest data for package-fmri 'fmri' from the repository
-        at 'src_uri'."""
-
-        if src_uri.startswith("file://"):
-                try:
-                        r = get_repo(src_uri)
-                        m = file(r.manifest(pfmri), "rb")
-                except (EnvironmentError, sr.RepositoryError), e:
-                        abort(err=e)
-        else:
-                # Request manifest from repository.
-                try:
-                        m = versioned_urlopen(src_uri, "manifest", [0],
-                            pfmri.get_url_path())[0]
-                except Exception, e:
-                        abort(err=_("Unable to retrieve manifest %s from "
-                            "%s: %s") % (pfmri.get_url_path(), src_uri, e))
-                except:
-                        abort()
-
-        # Read from repository, return to caller.
-        try:
-                mfst_str = m.read()
-        except:
-                abort(err=_("Error occurred while reading from: %s") % src_uri)
-
-        if hasattr(m, "close"):
-                m.close()
-
-        return mfst_str
-
 def expand_fmri(pfmri, constraint=version.CONSTRAINT_AUTO):
         """Find matching fmri using CONSTRAINT_AUTO cache for performance.
         Returns None if no matching fmri is found."""
@@ -318,7 +285,7 @@
         tracker.evaluate_progress(fmri=pfmri)
         s.add(pfmri)
 
-        m = get_manifest(src_uri, pfmri, basedir)
+        m = get_manifest(pfmri, basedir)
         for a in m.gen_actions_by_type("depend"):
                 new_fmri = expand_fmri(a.attrs["fmri"])
                 if new_fmri and new_fmri not in s:
@@ -326,26 +293,20 @@
                             tracker)
         return s
 
-def get_hashes_and_sizes(m):
-        """Returns a dict of hashes and transfer sizes of actions with content
-        in a manifest."""
+def add_hashes_to_multi(mfst, multi):
+        """Takes a manifest and a multi object. Adds the hashes to the
+        multi object, returns (nfiles, nbytes) tuple."""
+
+        nf = 0
+        nb = 0
 
-        seen_hashes = set()
-        def repeated(a):
-                if a in seen_hashes:
-                        return True
-                seen_hashes.add(a)
-                return False
-
-        cshashes = {}
         for atype in ("file", "license"):
-                for a in m.gen_actions_by_type(atype):
-                        if hasattr(a, "hash") and not repeated(a.hash):
-                                sz = int(a.attrs.get("pkg.size", 0))
-                                csize = int(a.attrs.get("pkg.csize", 0))
-                                otw_sz = get_pkg_otw_size(a)
-                                cshashes[a.hash] = (sz, csize, otw_sz)
-        return cshashes
+                for a in mfst.gen_actions_by_type(atype):
+                        if a.needsdata(None, None):
+                                multi.add_action(a)
+                                nf += 1
+                                nb += get_pkg_otw_size(a)
+        return nf, nb
 
 def prune(fmri_list, all_versions, all_timestamps):
         """Returns a filtered version of fmri_list based on the provided
@@ -365,125 +326,6 @@
                 fmri_list = [sorted(dedup[f], reverse=True)[0] for f in dedup]
         return fmri_list
 
-def fetch_files_byhash(src_uri, cshashes, destdir, keep_compressed, tracker):
-        """Given a list of tuples containing content hash, and size and download
-        the content from src_uri into destdir."""
-
-        def valid_file(h):
-                # XXX this should check data digest
-                fname = os.path.join(destdir, h)
-                if os.path.exists(fname):
-                        if keep_compressed:
-                                sz = cshashes[h][1]
-                        else:
-                                sz = cshashes[h][0]
-
-                        if sz == 0:
-                                return True
-
-                        try:
-                                fs = os.stat(fname)
-                        except:
-                                pass
-                        else:
-                                if fs.st_size == sz:
-                                        return True
-                return False
-
-        if src_uri.startswith("file://"):
-                try:
-                        r = get_repo(src_uri)
-                except sr.RepositoryError, e:
-                        abort(err=e)
-
-                for h in cshashes.keys():
-                        dest = os.path.join(destdir, h)
-
-                        # Check to see if the file already exists first, so the
-                        # user can continue interrupted pkgrecv operations.
-                        retrieve = not valid_file(h)
-
-                        try:
-                                if retrieve and keep_compressed:
-                                        src = r.file(h)
-                                        shutil.copy(src, dest)
-                                elif retrieve:
-                                        src = file(r.file(h), "rb")
-                                        outfile = open(dest, "wb")
-                                        gunzip_from_stream(src, outfile)
-                                        outfile.close()
-                        except (EnvironmentError, sr.RepositoryError), e:
-                                try:
-                                        portable.remove(dest)
-                                except:
-                                        pass
-                                abort(err=e)
-
-                        tracker.download_add_progress(1, cshashes[h][2])
-                return
-
-        req_dict = {}
-        for i, k in enumerate(cshashes.keys()):
-                # Check to see if the file already exists first, so the user can
-                # continue interrupted pkgrecv operations.
-                if valid_file(k):
-                        tracker.download_add_progress(1, cshashes[k][2])
-                        continue
-
-                entry = "File-Name-%s" % i
-                req_dict[entry] = k
-
-        req_str = urllib.urlencode(req_dict)
-        if not req_str:
-                # Nothing to retrieve.
-                return
-
-        tmpdir = tempfile.mkdtemp()
-        tmpdirs.append(tmpdir)
-
-        try:
-                f = versioned_urlopen(src_uri, "filelist", [0],
-                    data=req_str)[0]
-        except:
-                abort(err=_("Unable to retrieve content from: %s") % src_uri)
-
-        tar_stream = ptf.PkgTarFile.open(mode = "r|", fileobj = f)
-
-        for info in tar_stream:
-                gzfobj = None
-                try:
-                        if not keep_compressed:
-                                # Uncompress as we retrieve the files
-                                gzfobj = tar_stream.extractfile(info)
-                                fpath = os.path.join(tmpdir,
-                                    info.name)
-                                outfile = open(fpath, "wb")
-                                gunzip_from_stream(gzfobj, outfile)
-                                outfile.close()
-                                gzfobj.close()
-                        else:
-                                # We want to keep the files compressed
-                                # on disk.
-                                tar_stream.extract_to(info, tmpdir,
-                                    info.name)
-
-                        # Copy the file into place (rename can cause a cross-
-                        # link device failure) and then remove the original.
-                        src = os.path.join(tmpdir, info.name)
-                        shutil.copy(src, os.path.join(destdir, info.name))
-                        portable.remove(src)
-
-                        tracker.download_add_progress(1, cshashes[info.name][2])
-                except KeyboardInterrupt:
-                        raise
-                except:
-                        abort(err=_("Unable to extract file: %s") % info.name)
-
-        shutil.rmtree(tmpdirs.pop(), True)
-
-        tar_stream.close()
-        f.close()
-
 def list_newest_fmris(fmri_list):
         """List the provided fmris."""
 
@@ -504,43 +346,25 @@
                 fm_list.append(l[0])
 
         for e in fm_list:
-                msg(e)
+                msg(e.get_fmri(anarchy=True))
 
-def fetch_catalog(src_uri, tracker):
+def fetch_catalog(src_pub, tracker):
         """Fetch the catalog from src_uri."""
         global complete_catalog
 
+        src_uri = src_pub.selected_repository.origins[0].uri
         tracker.catalog_start(src_uri)
 
-        if src_uri.startswith("file://"):
-                try:
-                        r = get_repo(src_uri)
-                        c = r.catalog_0()
-                except sr.RepositoryError, e:
-                        error(e)
-                        abort()
-        else:
-                # open connection for catalog
-                try:
-                        c = versioned_urlopen(src_uri, "catalog", [0])[0]
-                except:
-                        abort(err=_("Unable to download catalog from: %s") % \
-                            src_uri)
+        if not src_pub.meta_root:
+                # Create a temporary directory for catalog.
+                cat_dir = tempfile.mkdtemp(dir=temp_root)
+                tmpdirs.append(cat_dir)
+                src_pub.meta_root = cat_dir
 
-        # Create a temporary directory for catalog.
-        cat_dir = tempfile.mkdtemp()
-        tmpdirs.append(cat_dir)
+        src_pub.transport = xport
+        src_pub.refresh(True, True)
 
-        # Call catalog.recv to retrieve catalog.
-        try:
-                sc.ServerCatalog.recv(c, cat_dir)
-        except Exception, e:
-                abort(err=_("Error: %s while reading from: %s") % (e, src_uri))
-
-        if hasattr(c, "close"):
-                c.close()
-
-        cat = sc.ServerCatalog(cat_dir, read_only=True)
+        cat = src_pub.catalog
 
         d = {}
         fmri_list = []
@@ -554,7 +378,28 @@
         tracker.catalog_done()
         return fmri_list
 
+def config_temp_root():
+        """Examine the environment.  If the environment has set TMPDIR, TEMP,
+        or TMP, return None.  This tells tempfile to use the environment
+        settings when creating temporary files/directories.  Otherwise,
+        return a path that the caller should pass to tempfile instead."""
+
+        default_root = "/var/tmp"
+
+        # In Python's tempfile module, the default temp directory
+        # includes some paths that are suboptimal for holding large numbers
+        # of files.  If the user hasn't set TMPDIR, TEMP, or TMP in the
+        # environment, override the default directory for creating a tempfile.
+        tmp_envs = [ "TMPDIR", "TEMP", "TMP" ]
+        for ev in tmp_envs:
+                env_val = os.getenv(ev)
+                if env_val:
+                        return None
+
+        return default_root
+
 def main_func():
+        global cache_dir, download_start, xport, xport_cfg
         all_timestamps = False
         all_versions = False
         keep_compressed = False
@@ -562,6 +407,11 @@
         recursive = False
         src_uri = None
         target = None
+        incoming_dir = None
+        src_pub = None
+        targ_pub = None
+
+        temp_root = config_temp_root()
 
         gettext.install("pkg", "/usr/share/locale")
 
@@ -570,12 +420,14 @@
         src_uri = os.environ.get("PKG_SRC", None)
 
         try:
-                opts, pargs = getopt.getopt(sys.argv[1:], "d:hkm:nrs:")
+                opts, pargs = getopt.getopt(sys.argv[1:], "c:d:hkm:nrs:")
         except getopt.GetoptError, e:
                 usage(_("Illegal option -- %s") % e.opt)
 
         for opt, arg in opts:
-                if opt == "-d":
+                if opt == "-c":
+                        cache_dir = arg
+                elif opt == "-d":
                         target = arg
                 elif opt == "-h":
                         usage(retcode=0)
@@ -598,12 +450,30 @@
         if not src_uri:
                 usage(_("a source repository must be provided"))
 
+        if not cache_dir:
+                cache_dir = tempfile.mkdtemp(dir=temp_root)
+                # Only clean-up cache dir if implicitly created by pkgrecv.
+                # User's cache-dirs should be preserved
+                tmpdirs.append(cache_dir)
+
+        incoming_dir = tempfile.mkdtemp(dir=temp_root)
+        tmpdirs.append(incoming_dir)
+
+        # Create transport and transport config
+        xport, xport_cfg = transport.setup_transport()
+        xport_cfg.cached_download_dir = cache_dir
+        xport_cfg.incoming_download_dir = incoming_dir
+
+        # Configure src publisher
+        src_pub = transport.setup_publisher(src_uri, "source", xport, xport_cfg,
+            remote_publishers=True)
+
         tracker = get_tracker()
         if list_newest:
                 if pargs or len(pargs) > 0:
                         usage(_("-n takes no options"))
 
-                fmri_list = fetch_catalog(src_uri, tracker)
+                fmri_list = fetch_catalog(src_pub, tracker)
                 list_newest_fmris(fmri_list)
                 return 0
 
@@ -616,10 +486,13 @@
         if not target:
                 target = basedir = os.getcwd()
         elif target.find("://") != -1:
-                basedir = tempfile.mkdtemp()
+                basedir = tempfile.mkdtemp(dir=temp_root)
                 tmpdirs.append(basedir)
                 republish = True
 
+                targ_pub = transport.setup_publisher(target, "target",
+                    xport, xport_cfg)
+
                 # Files have to be decompressed for republishing.
                 keep_compressed = False
                 if target.startswith("file://"):
@@ -629,12 +502,13 @@
 
                         # Check to see if the repository exists first.
                         try:
-                                t = trans.Transaction(target)
+                                t = trans.Transaction(target, xport=xport,
+                                    pub=targ_pub)
                         except trans.TransactionRepositoryInvalidError, e:
                                 txt = str(e) + "\n\n"
                                 txt += _("To create a repository, use the "
                                     "pkgsend command.")
-                                abort(err=msg)
+                                abort(err=txt)
                         except trans.TransactionRepositoryConfigError, e:
                                 txt = str(e) + "\n\n"
                                 txt += _("The repository configuration for "
@@ -656,7 +530,9 @@
                                     basedir)
                                 return 1
 
-        all_fmris = fetch_catalog(src_uri, tracker)
+        xport_cfg.pkgdir = basedir
+
+        all_fmris = fetch_catalog(src_pub, tracker)
         fmri_arguments = pargs
         fmri_list = prune(list(set(expand_matching_fmris(all_fmris,
             fmri_arguments))), all_versions, all_timestamps)
@@ -686,18 +562,19 @@
         retrieve_list = []
         while fmri_list:
                 f = fmri_list.pop()
-                m = get_manifest(src_uri, f, basedir)
-                cshashes = get_hashes_and_sizes(m)
+                m = get_manifest(f, basedir)
+                pkgdir = os.path.join(basedir, f.get_dir_path())
+                mfile = xport.multi_file_ni(src_pub, pkgdir,
+                    not keep_compressed, tracker)
+ 
+                nf, nb = add_hashes_to_multi(m, mfile)
+                nfiles += nf
+                nbytes += nb
 
-                for entry in cshashes.itervalues():
-                        nfiles += 1
-                        nbytes += entry[2]
-
-                retrieve_list.append((f, cshashes))
+                retrieve_list.append((f, mfile))
 
                 tracker.evaluate_progress(fmri=f)
         tracker.evaluate_done()
-        tracker.reset()
 
         # Next, retrieve and store the content for each package.
         msg(_("Retrieving package content ..."))
@@ -705,13 +582,13 @@
 
         publish_list = []
         while retrieve_list:
-                f, cshashes = retrieve_list.pop()
+                f, mfile = retrieve_list.pop()
                 tracker.download_start_pkg(f.get_fmri(include_scheme=False))
 
-                if len(cshashes) > 0:
-                        pkgdir = os.path.join(basedir, f.get_dir_path())
-                        fetch_files_byhash(src_uri, cshashes, pkgdir,
-                            keep_compressed, tracker)
+                if mfile:
+                        mfile.wait_files()
+                        if not download_start:
+                                download_start = True
 
                 if republish:
                         publish_list.append(f)
@@ -724,12 +601,12 @@
                 f = publish_list.pop()
                 msg(_("Republishing %s ...") % f)
 
-                m = get_manifest(src_uri, f, basedir)
+                m = get_manifest(f, basedir)
 
                 # Get first line of original manifest so that inclusion of the
                 # scheme can be determined.
                 use_scheme = True
-                contents = get_manifest(src_uri, f, basedir, contents=True)
+                contents = get_manifest(f, basedir, contents=True)
                 if contents.splitlines()[0].find("pkg:/") == -1:
                         use_scheme = False
 
@@ -740,9 +617,14 @@
                 # can be aborted.
                 trans_id = get_basename(f)
 
+                if not targ_pub:
+                        targ_pub = transport.setup_publisher(target, "target",
+                            xport, xport_cfg)
+
                 try:
                         t = trans.Transaction(target, pkg_name=pkg_name,
-                            trans_id=trans_id)
+                            trans_id=trans_id, refresh_index=not defer_refresh,
+                            xport=xport, pub=targ_pub)
 
                         # Remove any previous failed attempt to
                         # to republish this package.
@@ -780,7 +662,8 @@
                 if defer_refresh:
                         msg(_("Refreshing repository search indices ..."))
                         try:
-                                t = trans.Transaction(target)
+                                t = trans.Transaction(target, xport=xport,
+                                    pub=targ_pub)
                                 t.refresh_index()
                         except trans.TransactionError, e:
                                 error(e)
@@ -795,20 +678,25 @@
         try:
                 __ret = main_func()
         except (pkg.actions.ActionError, trans.TransactionError,
-            RuntimeError), _e:
+            RuntimeError, api_errors.TransportError,
+            api_errors.BadRepositoryURI,
+            api_errors.UnsupportedRepositoryURI), _e:
                 error(_e)
-                cleanup()
+                cleanup(True)
                 __ret = 1
-        except (PipeError, KeyboardInterrupt):
+        except PipeError:
                 # We don't want to display any messages here to prevent
                 # possible further broken pipe (EPIPE) errors.
-                cleanup()
+                cleanup(False)
+                __ret = 1
+        except (KeyboardInterrupt, api_errors.CanceledException):
+                cleanup(True)
                 __ret = 1
         except SystemExit, _e:
-                cleanup()
+                cleanup(False)
                 raise _e
         except:
-                cleanup()
+                cleanup(True)
                 traceback.print_exc()
                 error(
                     _("\n\nThis is an internal error.  Please let the "
--- a/src/tests/cli/t_pkg_depotd.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/tests/cli/t_pkg_depotd.py	Thu Jul 15 15:40:39 2010 -0700
@@ -182,7 +182,8 @@
                 operation doesn't fail."""
                 depot_url = self.dc.get_depot_url()
                 plist = self.pkgsend_bulk(depot_url, self.info10)
-                misc.versioned_urlopen(depot_url, "info", [0], plist[0])
+                repourl = urlparse.urljoin(depot_url, "info/0/%s" % plist[0])
+                urllib2.urlopen(repourl)
 
         def test_bug_3739(self):
                 """Verify that a depot will return a 400 (Bad Request) error
@@ -207,13 +208,18 @@
                 depot_url = self.dc.get_depot_url()
                 plist = self.pkgsend_bulk(depot_url, self.system10)
                 # First, try it un-encoded.
-                misc.versioned_urlopen(depot_url, "info", [0], plist[0])
-                misc.versioned_urlopen(depot_url, "manifest", [0], plist[0])
+                repourl = urlparse.urljoin(depot_url, "info/0/%s" % plist[0])
+                urllib2.urlopen(repourl)
+                repourl = urlparse.urljoin(depot_url, "manifest/0/%s" %
+                    plist[0])
+                urllib2.urlopen(repourl)
                 # Second, try it encoded.
-                misc.versioned_urlopen(depot_url, "info", [0],
+                repourl = urlparse.urljoin(depot_url, "info/0/%s" %
                     urllib.quote(plist[0]))
-                misc.versioned_urlopen(depot_url, "manifest", [0],
+                urllib2.urlopen(repourl)
+                repourl = urlparse.urljoin(depot_url, "manifest/0/%s" %
                     urllib.quote(plist[0]))
+                urllib2.urlopen(repourl)
 
         def test_bug_5707(self):
                 """Testing depotcontroller.refresh()."""
--- a/src/tests/cli/t_publish_api.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/tests/cli/t_publish_api.py	Thu Jul 15 15:40:39 2010 -0700
@@ -30,6 +30,8 @@
 import pkg5unittest
 
 import os
+import pkg.client.publisher as publisher
+import pkg.client.transport as transport
 import pkg.fmri as fmri
 import pkg.publish.transaction as trans
 import urlparse
@@ -50,12 +52,19 @@
                 publication can handle it."""
 
                 durl = self.dc.get_depot_url()
+                repouriobj = publisher.RepositoryURI(durl)
+                repo = publisher.Repository(origins=[repouriobj])
+                pub = publisher.Publisher(prefix="repo1", repositories=[repo])
+                xport_cfg = transport.GenericTransportCfg()
+                xport_cfg.add_publisher(pub)
+                xport = transport.Transport(xport_cfg)
 
                 # Each version number must be unique since multiple packages
                 # will be published within the same second.
                 for i in range(100):
                         pf = fmri.PkgFmri("foo@%d.0" % i, "5.11")
-                        t = trans.Transaction(durl, pkg_name=str(pf))
+                        t = trans.Transaction(durl, pkg_name=str(pf),
+                            xport=xport, pub=pub)
                         t.open()
                         pkg_fmri, pkg_state = t.close(refresh_index=True)
                         self.debug("%s: %s" % (pkg_fmri, pkg_state))
@@ -69,11 +78,19 @@
                 location = urlparse.urlunparse(("file", "",
                     urllib.pathname2url(location), "", "", ""))
 
+                repouriobj = publisher.RepositoryURI(location)
+                repo = publisher.Repository(origins=[repouriobj])
+                pub = publisher.Publisher(prefix="repo1", repositories=[repo])
+                xport_cfg = transport.GenericTransportCfg()
+                xport_cfg.add_publisher(pub)
+                xport = transport.Transport(xport_cfg)
+
                 # Each version number must be unique since multiple packages
                 # will be published within the same second.
                 for i in range(100):
                         pf = fmri.PkgFmri("foo@%d.0" % i, "5.11")
-                        t = trans.Transaction(location, pkg_name=str(pf))
+                        t = trans.Transaction(location, pkg_name=str(pf),
+                            xport=xport, pub=pub)
                         t.open()
                         pkg_fmri, pkg_state = t.close(refresh_index=True)
                         self.debug("%s: %s" % (pkg_fmri, pkg_state))
--- a/src/util/distro-import/importer.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/util/distro-import/importer.py	Thu Jul 15 15:40:39 2010 -0700
@@ -20,7 +20,7 @@
 # CDDL HEADER END
 #
 #
-# Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
 #
 
 import fnmatch
@@ -29,7 +29,9 @@
 import os
 import pkg
 import pkg.client
-import pkg.client.publisher
+import pkg.client.publisher    as publisher
+import pkg.client.transport    as transport
+import pkg.client.api_errors   as apx
 import pkg.client.api
 import pkg.client.progress
 import pkg.flavor.smf_manifest as smf_manifest
@@ -38,8 +40,6 @@
 import pkg.misc
 import pkg.pkgsubprocess       as subprocess
 import pkg.publish.transaction as trans
-import pkg.server.catalog      as catalog
-import pkg.server.repository   as repository
 import pkg.variant             as variant
 import pkg.version             as version
 import platform
@@ -50,12 +50,11 @@
 import tempfile
 import time
 import urllib
-import urlparse
 
 from datetime import datetime
 from pkg      import actions, elf
 from pkg.bundle.SolarisPackageDirBundle import SolarisPackageDirBundle
-from pkg.misc import versioned_urlopen, emsg, gunzip_from_stream
+from pkg.misc import emsg
 from pkg.portable import PD_LOCAL_PATH, PD_PROTO_DIR, PD_PROTO_DIR_LIST
 
 CLIENT_API_VERSION = 40
@@ -78,6 +77,7 @@
 file_repo = False    #
 curpkg = None        # which IPS package we're currently importing
 def_branch = ""      # default branch
+def_pub = None
 def_repo = "http://localhost:10000"
 def_vers = "0.5.11"  # default package version
 # default search path
@@ -104,6 +104,7 @@
     " - / filesystem", ",root(/)"] # remove from summaries
 svr4pkgsseen = {}    #svr4 pkgs seen - pkgs indexed by name
 timestamp_files = [] # patterns of files that retain timestamps from svr4 pkgs
+tmpdirs = []
 wos_path = []        # list of search pathes for svr4 packages
 
 local_smf_manifests = tempfile.mkdtemp(prefix="pkg_smf.") # where we store our SMF manifests
@@ -573,8 +574,9 @@
         # won't happen unless same pkg imported more than once into same ips pkg
         assert len(svr4_traversal_dict) == len(svr4_traversal_list)
 
-        t = trans.Transaction(def_repo, create_repo=file_repo, refresh_index=False,
-            pkg_name=pkg.fmristr(), noexecute=nopublish)
+        t = trans.Transaction(def_repo, create_repo=file_repo,
+            refresh_index=False, pkg_name=pkg.fmristr(), noexecute=nopublish,
+            xport=xport, pub=def_pub)
         transaction_id = t.open()
 
         # publish easy actions
@@ -756,8 +758,9 @@
                 if instance_mf:
                         return instance_mf.keys()
 
-def fetch_file(action, proto_dir, server_url=None):
+def fetch_file(action, proto_dir, server_pub=None):
         """ Save the file action contents to proto_dir """
+
         basename = os.path.basename(action.attrs["path"])
         dirname = os.path.dirname(action.attrs["path"])
         tmppath = os.path.join(proto_dir, dirname)
@@ -768,7 +771,20 @@
                         raise
         f = os.path.join(tmppath, basename)
 
-        if not server_url and action.data() is not None:
+        if server_pub:
+                try:
+                        file_content = xport.get_content(server_pub,
+                            action.hash)
+                except apx.TransportError, e:
+                        print >> sys.stderr, e
+                        cleanup()
+                        sys.exit(1)
+
+                ofile = file(f, "w")
+                ofile.write(file_content)
+                ofile.close()
+                file_content = None
+        elif action.data() is not None:
                 ao = action.data()
                 bufsz = 256 * 1024
                 sz = int(action.attrs["pkg.size"])
@@ -778,34 +794,10 @@
                         os.write(fd, d)
                         sz -= len(d)
                 d = None
-        elif server_url.startswith("http://"):
-                ofile = file(f, "w")
-                ifile, version = versioned_urlopen(server_url, "file", [0], action.hash)
-                gunzip_from_stream(ifile, ofile)
-
-        elif server_url.startswith("file://"):
-                try:
-                        repo = get_repo(server_url)
-                        ifile = file(repo.file(action.hash), "rb")
-                        ofile = open(f, "wb")
-                        gunzip_from_stream(ifile, ofile)
-                        ifile.close()
-                        ofile.close()
-                except repository.RepositoryError, e:
-                        print "Unable to download %s from %s" % (action.attrs["path"],
-                            server_url)
-                        raise
         else:
                 raise RuntimeError("Unable to save file %s - no URL provided."
                     % action.attrs["path"])
 
-def get_repo(server_url):
-        """Return a Repository object from a given file URL"""
-        parts = urlparse.urlparse(server_url, "file", allow_fragments=0)
-        path = urllib.url2pathname(parts[2])
-
-        return repository.Repository(read_only=True, repo_root=path)
-
 def gen_hardlink_depend_actions(action):
         """ generate dependency action for hardlinks; action is the
         hardlink action we're analyzing"""
@@ -985,40 +977,25 @@
         # program name on all platforms.
         emsg(ws + pkg_cmd + text_nows)
 
-def get_manifest(server_url, fmri):
+def get_manifest(server_pub, fmri):
         if not fmri: # no matching fmri
                 return null_manifest
 
-        return manifest_cache.setdefault((server_url, fmri), 
-            fetch_manifest(server_url, fmri))
+        return manifest_cache.setdefault((server_pub, fmri), 
+            fetch_manifest(server_pub, fmri))
 
-def fetch_manifest(server_url, fmri):
+def fetch_manifest(server_pub, fmri):
         """Fetch the manifest for package-fmri 'fmri' from the server
         in 'server_url'... return as Manifest object.... needs
         exact fmri"""
+
         # Request manifest from server
-
         try:
-                if server_url.startswith("http://"):
-                        m, v = versioned_urlopen(server_url, "manifest", [0],
-                        fmri.get_url_path())
-
-                elif server_url.startswith("file://"):
-                        repo = get_repo(server_url)
-                        m = file(repo.manifest(fmri), "rb")
-                else:
-                        raise RuntimeError("Repo url %s has an unknown scheme." %
-                            server_url)
-        except:
-                error(_("Unable to download manifest %s from %s") %
-                    (fmri.get_url_path(), server_url))
-                sys.exit(1)
-
-        # Read from server, write to file
-        try:
-                mfst_str = m.read()
-        except:
-                error(_("Error occurred while reading from: %s") % server_url)
+                mfst_str = xport.get_manifest(fmri, pub=server_pub,
+                    content_only=True)
+        except apx.TransportError, e:
+                print >> sys.stderr, e
+                cleanup()
                 sys.exit(1)
 
         m = manifest.Manifest()
@@ -1028,46 +1005,28 @@
 
 catalog_cache = {}
 
-def get_catalog(server_url):
-        return catalog_cache.get(server_url, fetch_catalog(server_url))[0] 
+def get_catalog(server_pub):
+        return catalog_cache.get(server_pub, fetch_catalog(server_pub))
 
-def cleanup_catalogs():
-        for c, d in catalog_cache.values():
-                shutil.rmtree(d)
-        catalog_cache.clear()
-
-def fetch_catalog(server_url):
+def fetch_catalog(server_pub):
         """Fetch the catalog from the server_url."""
 
-        # open connection for catalog
-        try:
-                c, v = versioned_urlopen(server_url, "catalog", [0])
-        except:
-                error(_("Unable to download catalog from: %s") % server_url)
-                sys.exit(1)
-
-        # make a tempdir for catalog
-        dl_dir = tempfile.mkdtemp()
+        if not server_pub.meta_root:
+                # Create a temporary directory for catalog.
+                cat_dir = tempfile.mkdtemp()
+                tmpdirs.append(cat_dir)
+                server_pub.meta_root = cat_dir
 
-        # call catalog.recv to pull down catalog
-        try:
-                catalog.ServerCatalog.recv(c, dl_dir)
-        except: 
-                error(_("Error while reading from: %s") % server_url)
-                sys.exit(1)
+        server_pub.transport = xport
+        server_pub.refresh(True, True)
 
-        # close connection to server
-        c.close()
+        cat = server_pub.catalog
 
-        # instantiate catalog object
-        cat = catalog.ServerCatalog(dl_dir, read_only=True)
-        
-        # return (catalog, tmpdir path)
-        return cat, dl_dir
+        return cat
 
 catalog_dict = {}
-def load_catalog(server_url):
-        c = get_catalog(server_url)
+def load_catalog(server_pub):
+        c = get_catalog(server_pub)
         d = {}
         for f in c.fmris():
                 d.setdefault(f.pkg_name, []).append(f)
@@ -1075,37 +1034,37 @@
         for k in d:
                 d[k].sort(reverse=True)
 
-        catalog_dict[server_url] = d        
+        catalog_dict[server_pub] = d        
 
-def expand_fmri(server_url, fmri_string, constraint=version.CONSTRAINT_AUTO):
+def expand_fmri(server_pub, fmri_string, constraint=version.CONSTRAINT_AUTO):
         """ from specified server, find matching fmri using CONSTRAINT_AUTO
         cache for performance.  Returns None if no matching fmri is found """
-        if server_url not in catalog_dict:
-                load_catalog(server_url)
+        if server_pub not in catalog_dict:
+                load_catalog(server_pub)
 
         fmri = pkg.fmri.PkgFmri(fmri_string, "5.11")        
 
-        for f in catalog_dict[server_url].get(fmri.pkg_name, []):
+        for f in catalog_dict[server_pub].get(fmri.pkg_name, []):
                 if not fmri.version or f.version.is_successor(fmri.version, constraint):
                         return f
         return None
 
 
-def get_dependencies(server_url, fmri_list):
+def get_dependencies(server_pub, fmri_list):
         s = set()
         for f in fmri_list:
-                fmri = expand_fmri(server_url, f)
-                _get_dependencies(s, server_url, fmri)
+                fmri = expand_fmri(server_pub, f)
+                _get_dependencies(s, server_pub, fmri)
         return s
 
-def _get_dependencies(s, server_url, fmri):
+def _get_dependencies(s, server_pub, fmri):
         """ recursive incorp expansion"""
         s.add(fmri)
-        for a in get_manifest(server_url, fmri).gen_actions_by_type("depend"):
+        for a in get_manifest(server_pub, fmri).gen_actions_by_type("depend"):
                 if a.attrs["type"] == "incorporate":
-                        new_fmri = expand_fmri(server_url, a.attrs["fmri"]) 
+                        new_fmri = expand_fmri(server_pub, a.attrs["fmri"]) 
                         if new_fmri and new_fmri not in s: # ignore missing, already planned
-                                _get_dependencies(s, server_url, new_fmri)
+                                _get_dependencies(s, server_pub, new_fmri)
 
 def get_smf_packages(server_url, manifest_locations, filter):
         """ Performs a search against server_url looking for packages which contain
@@ -1433,21 +1392,17 @@
                             "(%s:%s)" % (token, lexer.infile, lexer.lineno))
 def repo_add_content(path_to_repo, path_to_proto):
         """Fire up depo to add content and rebuild search index"""
-        pythonpath = os.environ.get("PYTHONPATH", "")
 
-        args = (os.path.join(path_to_proto, "usr/lib/pkg.depotd"), 
-                "--add-content",
-                "--exit-ready", 
-                "-d", path_to_repo
-                )
+        cmdname = os.path.join(path_to_proto, "usr/bin/pkgrepo") 
+        argstr = "%s -s %s refresh" % (cmdname, path_to_repo)
 
         print "Adding content & rebuilding search indicies synchronously...."
-        print "%s" % str(args)
+        print "%s" % str(argstr)
         try:
-                proc = subprocess.Popen(args, env=os.environ)
+                proc = subprocess.Popen(argstr, shell=True)
                 ret = proc.wait()
         except OSError, e:
-                print "cannot execute %s: %s" % (args, e)
+                print "cannot execute %s: %s" % (argstr, e)
                 return 1
         if ret:
                 print "exited w/ status %d" % ret
@@ -1455,10 +1410,15 @@
         print "done"
         return 0
 
+def cleanup():
+        """To be called at program finish."""
+        for d in tmpdirs:
+                shutil.rmtree(d, True)
 
 def main_func():
         global file_repo
         global def_branch
+        global def_pub
         global def_repo
         global def_vers
         global extra_entire_contents
@@ -1472,6 +1432,8 @@
         global wos_path
         global not_these_consolidations
         global curpkg
+        global xport
+        global xport_cfg
 
         
         try:
@@ -1568,6 +1530,16 @@
                 sys.exit(0)
 
         start_time = time.clock()
+        incoming_dir = tempfile.mkdtemp()
+
+        tmpdirs.append(incoming_dir)
+        tmpdirs.append(local_smf_manifests)
+
+        xport, xport_cfg = transport.setup_transport()
+        xport_cfg.incoming_download_dir = incoming_dir
+
+        def_pub = transport.setup_publisher(def_repo, "default", xport,
+            xport_cfg)
 
         print "Seeding local SMF manifest database from %s" % def_repo
 
@@ -1580,14 +1552,14 @@
                 pfmri_str = "%s@%s" % (pfmri.get_name(), pfmri.get_version())
                 manifest = None
                 try:
-                        manifest = get_manifest(def_repo, pfmri)
+                        manifest = get_manifest(def_pub, pfmri)
                 except:
                         print "No manifest found for %s" % str(pfmri)
                         raise
                 for action in manifest.gen_actions_by_type("file"):
                         if smf_manifest.has_smf_manifest_dir(action.attrs["path"]):
                                 fetch_file(action, local_smf_manifests,
-                                    server_url=def_repo)
+                                    server_pub=def_pub)
 
         print "First pass: initial import", datetime.now()
 
@@ -1684,6 +1656,7 @@
         if errors:
                 for e in errors:
                         print "Fail: %s" % e
+                cleanup()
                 sys.exit(1)
         # check for require dependencies on obsolete or renamed pkgs
 
@@ -1712,6 +1685,7 @@
         if errors:
                 for e in errors:
                         print "Fail: %s" % e
+                cleanup()
                 sys.exit(1)
 
 
@@ -1721,14 +1695,16 @@
                 excludes = [variant.Variants({"variant.arch": get_arch()}).allow_action]
                 for uri in reference_uris:
                         server, fmri_string = uri.split("@", 1)
-                        for pfmri in get_dependencies(server, [fmri_string]):
+                        server_pub = transport.setup_publisher(server,
+                            "reference", xport, xport_cfg, remote_prefix=True)
+                        for pfmri in get_dependencies(server_pub, [fmri_string]):
                                 if pfmri is None:
                                         continue
                                 if pfmri.get_name() in pkgdict:
                                         continue # ignore pkgs already seen
                                 pfmri_str = "%s@%s" % (pfmri.get_name(), pfmri.get_version())
                                 fmridict[pfmri.get_name()] = pfmri_str
-                                for action in get_manifest(server, pfmri).gen_actions(excludes):
+                                for action in get_manifest(server_pub, pfmri).gen_actions(excludes):
                                         if "path" not in action.attrs:
                                                 continue
                                         if action.name == "unknown":
@@ -1752,6 +1728,7 @@
                 if errors:
                         for e in errors:
                                 print "Fail: %s" % e
+                        cleanup()
                         sys.exit(1)
                 print "external packages checked for conflicts"
 
@@ -1896,6 +1873,7 @@
         if error_count:
                 print "%d/%d packages has errors; %.2f%% FAILED" % (error_count, total,
                     error_count * 100.0 / total)
+                cleanup()
                 sys.exit(1)
 
         print "%d/%d packages processed; %.2f%% complete" % (processed, total,
@@ -1904,12 +1882,13 @@
         if file_repo:
                 code = repo_add_content(def_repo[7:], g_proto_area)
                 if code:
+                        cleanup()
                         sys.exit(code)
 
-        shutil.rmtree(local_smf_manifests, True)
         print "Done:", datetime.now()
         elapsed = time.clock() - start_time 
         print "publication took %d:%.2d" % (elapsed/60, elapsed % 60)
+        cleanup()
         sys.exit(0)
         
 if __name__ == "__main__":
--- a/src/util/distro-import/solaris.py	Tue Jul 13 18:17:07 2010 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1489 +0,0 @@
-#!/usr/bin/python2.6
-#
-# CDDL HEADER START
-#
-# The contents of this file are subject to the terms of the
-# Common Development and Distribution License (the "License").
-# You may not use this file except in compliance with the License.
-#
-# You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
-# or http://www.opensolaris.org/os/licensing.
-# See the License for the specific language governing permissions
-# and limitations under the License.
-#
-# When distributing Covered Code, include this CDDL HEADER in each
-# file and include the License file at usr/src/OPENSOLARIS.LICENSE.
-# If applicable, add the following below this CDDL HEADER, with the
-# fields enclosed by brackets "[]" replaced with your own identifying
-# information: Portions Copyright [yyyy] [name of copyright owner]
-#
-# CDDL HEADER END
-#
-# Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
-# Use is subject to license terms.
-
-
-import fnmatch
-import getopt
-import gettext
-import os
-import pkg.depotcontroller as depotcontroller
-import pkg.publish.transaction as trans
-import re
-import shlex
-import sys
-import urllib
-import urlparse
-
-from datetime import datetime
-from itertools import groupby
-from pkg import actions, elf
-from pkg.bundle.SolarisPackageDirBundle import SolarisPackageDirBundle
-from pkg.sysvpkg import SolarisPackage
-from tempfile import mkstemp
-
-gettext.install("import", "/usr/lib/locale")
-
-class package(object):
-        def __init__(self, name):
-                self.name = name
-                self.files = []
-                self.depend = []
-                self.file_depend = []
-                self.idepend = []     #svr4 pkg deps, if any
-                self.undepend = []
-                self.extra = []
-                self.dropped_licenses = []
-                self.nonhollow_dirs = {}
-                self.srcpkgs = []
-                self.classification = []
-                self.desc = ""
-                self.summary = ""
-                self.version = ""
-                self.imppkg = None
-                pkgdict[name] = self
-
-        def import_pkg(self, imppkg, line):
-                try:
-                        p = SolarisPackage(pkg_path(imppkg))
-                except:
-                        raise RuntimeError("No such package: '%s'" % imppkg)
-
-                self.imppkg = p
-
-                svr4pkgpaths[p.pkginfo["PKG.PLAT"]] = pkg_path(imppkg)
-
-                # filename NOT always same as pkgname
-                imppkg = p.pkginfo["PKG.PLAT"]
-                svr4pkgsseen[imppkg] = p
-
-                if "SUNW_PKG_HOLLOW" in p.pkginfo and \
-                    p.pkginfo["SUNW_PKG_HOLLOW"].lower() == "true":
-                        hollow_pkgs[imppkg] = True
-
-                excludes = dict((f, True) for f in line.split())
-
-                # XXX This isn't thread-safe.  We want a dict method that adds
-                # the key/value pair, but throws an exception if the key is
-                # already present.
-                for o in p.manifest:
-                        if o.pathname in excludes:
-                                print "excluding %s from %s" % \
-                                    (o.pathname, imppkg)
-                                continue
-
-                        if o.pathname in elided_files:
-                                print "ignoring %s in %s" % (o.pathname, imppkg)
-                                continue
-
-                        if o.type == "e":
-                                if o.pathname not in editable_files:
-                                        editable_files[o.pathname] = \
-                                            [(imppkg, self)]
-                                else:
-                                        editable_files[o.pathname].append(
-                                            (imppkg, self))
-
-                        # XXX This decidedly ignores "e"-type files.
-
-                        if o.type in "fv" and o.pathname in usedlist:
-                                s = reuse_err % (
-                                        o.pathname,
-                                        self.name,
-                                        imppkg,
-                                        svr4pkgpaths[imppkg],
-                                        usedlist[o.pathname][1].name,
-                                        usedlist[o.pathname][0],
-                                        svr4pkgpaths[usedlist[o.pathname][0]])
-                                print s
-                                raise RuntimeError(s)
-                        elif o.type == "i" and o.pathname == "copyright":
-                                # Fake up a unique path for each license.
-                                o.pathname = "//license/%s" % imppkg
-                                usedlist[o.pathname] = (imppkg, self)
-                                self.files.append(o)
-                        elif o.type != "i":
-                                if o.type in "dx" and imppkg not in hollow_pkgs:
-                                        self.nonhollow_dirs[o.pathname] = True
-
-                                usedlist[o.pathname] = (imppkg, self)
-                                self.check_perms(o)
-                                self.files.append(o)
-
-                if not self.version:
-                        self.version = "%s-%s" % (def_vers,
-                            get_branch(self.name))
-                if not self.desc:
-                        try:
-                                self.desc = zap_strings(p.pkginfo["DESC"],
-                                    summary_detritus)
-                        except KeyError:
-                                self.desc = None
-                if not self.summary:
-                        self.summary = zap_strings(p.pkginfo["NAME"],
-                            summary_detritus)
-
-                # This is how we'd import dependencies, but we'll use
-                # file-specific dependencies only, since these tend to be
-                # broken.
-                # self.depend.extend(
-                #     d.req_pkg_fmri
-                #     for d in p.deps
-                # )
-
-                self.add_svr4_src(imppkg)
-
-        def add_svr4_src(self, imppkg):
-                if imppkg in destpkgs:
-                        destpkgs[imppkg].append(self.name)
-                else:
-                        destpkgs[imppkg] = [self.name]
-                self.srcpkgs.append(imppkg)
-
-        def import_file(self, fname, line):
-                imppkgname = self.imppkg.pkginfo["PKG.PLAT"]
-
-                if "SUNW_PKG_HOLLOW" in self.imppkg.pkginfo and \
-                    self.imppkg.pkginfo["SUNW_PKG_HOLLOW"].lower() == "true":
-                        hollow_pkgs[imppkgname] = True
-
-                if fname in usedlist:
-                        t = [
-                            f for f in usedlist[fname][1].files
-                            if f.pathname == fname
-                        ][0].type
-                        if t in "fv":
-                                assert imppkgname == usedlist[fname][0]
-                                raise RuntimeError(reuse_err % (
-                                        fname,
-                                        self.name,
-                                        self.imppkg,
-                                        svr4pkgpaths[self.imppkg],
-                                        usedlist[fname][1].name,
-                                        usedlist[fname][0],
-                                        svr4pkgpaths[usedlist[fname][0]]))
-
-                usedlist[fname] = (imppkgname, self)
-                o = [
-                    o
-                    for o in self.imppkg.manifest
-                    if o.pathname == fname 
-                ]
-                # There should be only one file with a given pathname in a
-                # single package.
-                if len(o) != 1:
-                        print "ERROR: %s %s" % (imppkgname, fname)
-                        assert len(o) == 1
-
-                if line:
-                        a = actions.fromstr(
-                            "%s path=%s %s" % \
-                                    (
-                                        self.convert_type(o[0].type),
-                                        o[0].pathname,
-                                        line
-                                        )
-                            )
-                        for attr in a.attrs:
-                                if attr == "owner":
-                                        o[0].owner = a.attrs[attr]
-                                elif attr == "group":
-                                        o[0].group = a.attrs[attr]
-                                elif attr == "mode":
-                                        o[0].mode = a.attrs[attr]
-                self.check_perms(o[0])
-                self.files.extend(o)
-
-        def convert_type(self, svrtype):
-                """ given sv4r type, return IPS type"""
-                return {
-                        "f": "file", "e": "file", "v": "file",
-                        "d": "dir", "x": "dir",
-                        "s": "link",
-                        "l": "hardlink"
-                        }[svrtype]
-
-        def type_convert(self, ipstype):
-                """ given IPS type, return svr4 type(s)"""
-                return {
-                        "file": "fev", "dir": "dx", "link": "s",
-                        "hardlink": "l"
-                        }[ipstype]
-
-        def file_to_action(self, f):
-
-                if f.type in "dx":
-                        action = actions.directory.DirectoryAction(
-                            None, mode = f.mode, owner = f.owner,
-                            group = f.group, path = f.pathname)
-                elif f.type in "efv":
-                        action = actions.file.FileAction(
-                            None, mode = f.mode, owner = f.owner,
-                            group = f.group, path = f.pathname)
-                elif f.type == "s":
-                        action = actions.link.LinkAction(None,
-                            target = f.target, path = f.pathname)
-                elif f.type == "l":
-                        action = actions.hardlink.HardLinkAction(None,
-                            target = f.target, path = f.pathname)
-                else:
-                        print "unknown type %s - path %s" % \
-                            ( f.type, f.pathname)
-
-                return action
-
-        def check_perms(self, manifest):
-                if manifest.type not in "fevdxbc":
-                        return
-
-                if manifest.owner == "?":
-                        manifest.owner = "root"
-                        print "File %s in pkg %s owned by '?': mapping to %s" \
-                            % (manifest.pathname, self.name, manifest.owner)
-
-                if manifest.group == "?":
-                        manifest.group = "bin"
-                        print "File %s in pkg %s of group '?': mapping to %s" \
-                            % (manifest.pathname, self.name, manifest.group)
-                if manifest.mode == "?":
-                        if manifest.type in "dx":
-                                manifest.mode = "0755"
-                        else:
-                                manifest.mode = "0444"
-                        print "File %s in pkg %s mode '?': mapping to %s" % \
-                            (manifest.pathname, self.name, manifest.mode)
-
-
-        def chattr(self, fname, line):
-                o = [f for f in self.files if f.pathname == fname]
-                if not o:
-                        raise RuntimeError("No file '%s' in package '%s'" % \
-                            (fname, curpkg.name))
-
-                line = line.rstrip()
-
-                # is this a deletion?
-                if line.startswith("drop"):
-                        for f in o:
-                                # deletion of existing attribute
-                                if not hasattr(f, "deleted_attrs"):
-                                        f.deleted_attrs = []
-                                print "Adding drop on %s of %s" % \
-                                    (fname, line.split()[1:])
-                                f.deleted_attrs.extend(line.split()[1:])
-                        return
-
-                # handle insertion/modification case
-                for f in o:
-                        # create attribute dictionary from line
-                        new_type = self.convert_type(f.type)
-                        new_attrs = actions._fromstr("%s %s" % 
-                            (new_type, line.rstrip()))[2]
-                        # get path if we're not changing it 
-                        if "path" not in new_attrs:
-                                new_attrs["path"] = fname
-                        a = actions.types[new_type](**new_attrs)
-                        if show_debug:
-                                print "Updating attributes on " + \
-                                    "'%s' in '%s' with '%s'" % \
-                                    (f.pathname, curpkg.name, a)
-                        orig_action = self.file_to_action(f)
-
-                        if not hasattr(f, "changed_attrs"):
-                                f.changed_attrs = {}
-
-                        # each chattr produces a dictionary of actions
-                        # including a path=xxxx and whatever modifications
-                        # are made.  Note that the path value may be a list in
-                        # the case of modifications to the path... since
-                        # each chattr produces another path= entry and results
-                        # from applying the changes to the original file spec,
-                        # we need to ignore path if it hasn't changed... for
-                        # generality, we ignore all unchanged attributes in
-                        # the code below, adding into changed_attrs only those
-                        # that are different from the original... this also
-                        # insulates us from the possibility of actions.fromstr
-                        # adding additional attributes in the constructor...
-
-                        for key in a.attrs.keys():
-                                if key not in orig_action.attrs or \
-                                    orig_action.attrs[key] != a.attrs[key]:
-                                        if key in f.changed_attrs:
-                                                print "Warning: overwriting " \
-                                                    "changed attr %s on %s " \
-                                                    "from %s to %s" % \
-                                                    (key, f.pathname,
-                                                     f.changed_attrs[key],
-                                                     a.attrs[key])
-                                        f.changed_attrs[key] = a.attrs[key]
-
-
-        # apply a chattr to wildcarded files/dirs
-        # also allows package specification, wildcarding, regexp edit
-
-        def chattr_glob(self, glob, line):
-                args = line.split()
-                if args[0] == "from":
-                        args.pop(0)
-                        pkgglob = args.pop(0)
-                        line = " ".join(args)
-                else:
-                        pkgglob = "*"
-
-                if args[0] == "type": # we care about type
-                        args.pop(0)
-                        types = self.type_convert(args.pop(0))
-                        line = " ".join(args)
-                else:
-                        types = "dfevslx"
-
-                if args[0] == "edit": # we're doing regexp edit of attr
-                        edit = True
-                        args.pop(0)
-                        target = args.pop(0)
-                        regexp = re.compile(args.pop(0))
-                        replace = args.pop(0)
-                        line = " ".join(args)
-                else:
-                        edit = False
-
-                o = [
-                        f
-                        for f in self.files
-                        if fnmatch.fnmatchcase(f.pathname, glob) and
-                            fnmatch.fnmatchcase(
-                                usedlist[f.pathname][0], pkgglob) and
-                            f.type in types
-                     ]
-
-                chattr_line = line
-
-                for f in o:
-                        fname = f.pathname
-                        orig_action = self.file_to_action(f)
-                        if edit:
-                                if target in orig_action.attrs:
-                                        old_value = orig_action.attrs[target]
-                                        new_value = regexp.sub(replace, \
-                                            old_value)
-                                        if old_value == new_value:
-                                                continue
-                                        chattr_line = "%s=%s %s" % \
-                                            (target, new_value, line)
-                                else:
-                                        continue
-                        chattr_line = chattr_line.rstrip()
-                        if show_debug:
-                                print "Updating attributes on " + \
-                                    "'%s' in '%s' with '%s'" % \
-                                    (fname, curpkg.name, chattr_line)
-
-                        # create attribute dictionary from line
-                        new_type = self.convert_type(f.type)
-                        new_attrs = actions._fromstr("%s %s" % 
-                            (new_type, chattr_line.rstrip()))[2]
-                        # get path if we're not changing it 
-                        if "path" not in new_attrs:
-                                new_attrs["path"] = fname
-                        a = actions.types[new_type](**new_attrs)
-                        # each chattr produces a dictionary of actions
-                        # including a path=xxxx and whatever modifications
-                        # are made.  Note that the path value may be a list in
-                        # the case of modifications to the path... since
-                        # each chattr produces another path= entry and results
-                        # from applying the changes to the original file spec,
-                        # we need to ignore path if it hasn't changed... for
-                        # generality, we ignore all unchanged attributes in
-                        # the code below, adding into changed_attrs only those
-                        # that are different from the original... this also
-                        # insulates us from the possibility of actions.fromstr
-                        # adding additional attributes in the constructor...
-
-                        if not hasattr(f, "changed_attrs"):
-                                f.changed_attrs = {}
-                        for key in a.attrs.keys():
-                                if key not in orig_action.attrs or \
-                                    orig_action.attrs[key] != a.attrs[key]:
-                                        if key in f.changed_attrs:
-                                                print "Warning: overwriting " \
-                                                    "changed attr %s on %s " \
-                                                    "from %s to %s" % \
-                                                    (key, f.pathname,
-                                                     f.changed_attrs[key],
-                                                     a.attrs[key])
-                                        f.changed_attrs[key] = a.attrs[key]
-
-pkgpaths = {}
-
-def pkg_path(pkgname):
-        name = os.path.basename(pkgname)
-        if pkgname in pkgpaths:
-                return pkgpaths[name]
-        if "/" in pkgname:
-                pkgpaths[name] = os.path.realpath(pkgname)
-                return pkgname
-        else:
-                for each_path in wos_path:
-                        if os.path.exists(each_path + "/" + pkgname):
-                                pkgpaths[name] = each_path + "/" + pkgname
-                                return pkgpaths[name]
-
-                raise RuntimeError("package %s not found" % pkgname)
-
-
-def start_package(pkgname):
-        set_macro("PKGNAME", urllib.quote(pkgname, ""))
-        return package(pkgname)
-
-def end_package(pkg):
-        pkg_branch = get_branch(pkg.name)
-        if not pkg.version:
-                pkg.version = "%s-%s" % (def_vers, pkg_branch)
-        elif "-" not in pkg.version:
-                pkg.version += "-%s" % pkg_branch
-
-        clear_macro("PKGNAME")
-        print "Package '%s'" % pkg.name
-        print "  Version:", pkg.version
-        print "  Description:", pkg.desc
-        print "  Summary:", pkg.summary
-        print "  Classification: ", ",".join(pkg.classification)
-
-def publish_pkg(pkg):
-
-        new_pkg_name = "%s@%s" % (pkg.name, pkg.version)
-        t = trans.Transaction(def_repo, create_repo=create_repo,
-            pkg_name=new_pkg_name, noexecute=nopublish)
-
-        print "    open %s" % new_pkg_name
-        transaction_id = t.open()
-
-        # Publish non-file objects first: they're easy.
-        for f in pkg.files:
-                if f.type in "dx":
-                        action = actions.directory.DirectoryAction(
-                            None, mode = f.mode, owner = f.owner,
-                            group = f.group, path = f.pathname)
-                        if hasattr(f, "changed_attrs"):
-                                action.attrs.update(f.changed_attrs)
-                                # chattr may have produced two path values
-                                action.attrs["path"] = \
-                                    action.attrlist("path")[-1]
-                        print "    %s add dir %s %s %s %s" % (
-                                pkg.name,
-                                action.attrs["mode"],
-                                action.attrs["owner"],
-                                action.attrs["group"],
-                                action.attrs["path"]
-                                )
-                elif f.type == "s":
-                        action = actions.link.LinkAction(None,
-                            target = f.target, path = f.pathname)
-                        if hasattr(f, "changed_attrs"):
-                                action.attrs.update(f.changed_attrs)
-                                # chattr may have produced two path values
-                                action.attrs["path"] = \
-                                    action.attrlist("path")[-1]
-                        print "    %s add link %s %s" % (
-                                pkg.name,
-                                action.attrs["path"],
-                                action.attrs["target"]
-                                )
-                elif f.type == "l":
-                        action = actions.hardlink.HardLinkAction(None,
-                            target = f.target, path = f.pathname)
-                        if hasattr(f, "changed_attrs"):
-                                action.attrs.update(f.changed_attrs)
-                                # chattr may have produced two path values
-                                action.attrs["path"] = \
-                                    action.attrlist("path")[-1]
-                        pkg.depend += process_link_dependencies(
-                            action.attrs["path"], action.attrs["target"])
-                        print "    %s add hardlink %s %s" % (
-                                pkg.name,
-                                action.attrs["path"],
-                                action.attrs["target"]
-                                )
-                else:
-                        continue
-
-                #
-                # If the originating package was hollow, tag this file
-                # as being global zone only.
-                #
-
-                if f.type not in "dx" and f.pathname in usedlist and \
-                    usedlist[f.pathname][0] in hollow_pkgs:
-                        action.attrs["opensolaris.zone"] = "global"
-                        action.attrs["variant.opensolaris.zone"] = "global"
-
-                if f.type in "dx" and f.pathname in usedlist and \
-                    usedlist[f.pathname][0] in hollow_pkgs and \
-                    f.pathname not in pkg.nonhollow_dirs:
-                        action.attrs["opensolaris.zone"] = "global"
-                        action.attrs["variant.opensolaris.zone"] = "global"
-
-                # handle attribute deletion
-                if hasattr(f, "deleted_attrs"):
-                        for d in f.deleted_attrs:
-                                if d in action.attrs:
-                                        del action.attrs[d]
-
-                t.add(action)
-
-        # Group the files in a (new) package based on what (old) package they
-        # came from, so that we can iterate through all files in a single (old)
-        # package (and, therefore, in a single bzip2 archive) before moving on
-        # to the next.  Because groupby() needs its input pre-sorted by group
-        # and we want to maintain the order that the files come out of the cpio
-        # archives, we coalesce the groups with the groups dictionary.
-        def fn(key):
-                return usedlist[key.pathname][0]
-        groups = {}
-        for k, g in groupby((f for f in pkg.files if f.type in "fevi"), fn):
-                if k in groups:
-                        groups[k].extend(g)
-                else:
-                        groups[k] = list(g)
-
-        def otherattrs(action):
-                s = " ".join(
-                    "%s=%s" % (a, action.attrs[a])
-                    for a in action.attrs
-                    if a not in ("owner", "group", "mode", "path")
-                )
-                if s:
-                        return " " + s
-                else:
-                        return ""
-
-        # Maps class names to preserve attribute values.
-        preserve_dict = {
-            "renameold": "renameold",
-            "renamenew": "renamenew",
-            "preserve": "true",
-            "svmpreserve": "true"
-        }
-
-        undeps = set()
-        for g in groups.values():
-                pkgname = usedlist[g[0].pathname][0]
-                print "pulling files from archive in package", pkgname
-                bundle = SolarisPackageDirBundle(svr4pkgpaths[pkgname])
-                pathdict = dict((f.pathname, f) for f in g)
-                for f in bundle:
-                        if f.name == "license":
-                                if f.attrs["license"] in pkg.dropped_licenses:
-                                        continue
-                                # add transaction id so that every version
-                                # of a pkg will have a unique license to prevent
-                                # license from disappearing on upgrade
-                                f.attrs["transaction_id"] = transaction_id
-                                # The "path" attribute is confusing and
-                                # unnecessary for licenses.
-                                del f.attrs["path"]
-                                print "    %s add license %s" % \
-                                    (pkg.name, f.attrs["license"])
-                                t.add(f)
-                        elif f.attrs["path"] in pathdict:
-                                if pkgname in hollow_pkgs:
-                                        f.attrs["opensolaris.zone"] = "global"
-                                        f.attrs["variant.opensolaris.zone"] = \
-                                            "global"
-                                path = f.attrs["path"]
-                                if pathdict[path].type in "ev":
-                                        f.attrs["preserve"] = "true"
-                                f.attrs["owner"] = pathdict[path].owner
-                                f.attrs["group"] = pathdict[path].group
-                                f.attrs["mode"] = pathdict[path].mode
-
-                                # is this a file for which we need a timestamp?
-                                basename = os.path.basename(path)
-                                for file_pattern in timestamp_files:
-                                        if fnmatch.fnmatch(basename,
-                                            file_pattern):
-                                                break
-                                else:
-                                        del f.attrs["timestamp"]
-                                if pathdict[path].klass in preserve_dict.keys():
-                                        f.attrs["preserve"] = \
-                                            preserve_dict[pathdict[path].klass]
-                                if hasattr(pathdict[path], "changed_attrs"):
-                                        f.attrs.update(
-                                            pathdict[path].changed_attrs)
-                                        # chattr may have produced two values
-                                        f.attrs["path"] = f.attrlist("path")[-1]
-
-                                print "    %s add file %s %s %s %s%s" % \
-                                    (pkg.name, f.attrs["mode"],
-                                        f.attrs["owner"], f.attrs["group"],
-                                        f.attrs["path"], otherattrs(f))
-
-                                # handle attribute deletion
-                                if hasattr(pathdict[path], "deleted_attrs"):
-                                        for d in pathdict[path].deleted_attrs:
-                                                if d in f.attrs:
-                                                        print "removed %s from %s in pkg %s" % (d, path, new_pkg_name)
-                                                        del f.attrs[d]
-
-                                # Read the file in chunks to avoid a memory
-                                # footprint blowout.
-                                fo = f.data()
-                                bufsz = 256 * 1024
-                                sz = int(f.attrs["pkg.size"])
-                                fd, tmp = mkstemp(prefix="pkg.")
-                                while sz > 0:
-                                        d = fo.read(min(bufsz, sz))
-                                        os.write(fd, d)
-                                        sz -= len(d)
-                                d = None
-                                os.close(fd)
-
-                                # Fool the action into pulling from a
-                                # temporary file so that both add() and
-                                # process_dependencies() can read() the
-                                # data.
-                                f.data = lambda: open(tmp, "rb")
-                                t.add(f)
-
-                                # Look for dependencies
-                                deps, u = process_dependencies(tmp, path)
-                                pkg.depend += deps
-                                if u:
-                                        print \
-                                            "%s has missing dependencies: %s" \
-                                            % (path, u)
-                                undeps |= set(u)
-                                os.unlink(tmp)
-
-        # process any dependencies on files
-        for f in pkg.file_depend:
-                f = f.lstrip("/") # remove any leading /                
-                if f in usedlist:
-                        pkg.depend += [ "%s@%s" %
-                            (usedlist[f][1].name,
-                             usedlist[f][1].version) 
-                        ]
-                else:
-                        print "Warning: pkg %s: depend_path %s not satisfied" \
-                            % (pkg.name, f)
-                        undeps.add(f)
-        # Publish dependencies
-
-        missing_cnt = 0
-
-        for p in set(pkg.idepend): # over set of svr4 deps, append ipkgs
-                if p in destpkgs:
-                        pkg.depend.extend(destpkgs[p])
-                else:
-                        print "pkg %s: SVR4 package %s not seen" % \
-                            (pkg.name, p)
-                        missing_cnt += 1
-        if missing_cnt > 0:
-                raise RuntimeError("missing packages!")
-
-        for p in set(pkg.depend) - set(pkg.undepend):
-                # Don't make a package depend on itself.
-                if p.split("@")[0] == pkg.name:
-                        continue
-                # enhance unqualified dependencies to include current
-                # pkg version
-                if "@" not in p and p in pkgdict:
-                        p = "%s@%s" % (p, pkgdict[p].version)
-
-                print "    %s add depend require %s" % (pkg.name, p)
-                action = actions.depend.DependencyAction(None,
-                    type = "require", fmri = p)
-                t.add(action)
-
-        for a in pkg.extra:
-                print "    %s add %s" % (pkg.name, a)
-                action = actions.fromstr(a)
-                if hasattr(action, "hash"):
-                        fname, fd = sourcehook(action.hash)
-                        fd.close()
-                        action.data = lambda: file(fname, "rb")
-                        action.attrs["pkg.size"] = str(os.stat(fname).st_size)
-                        if action.name == "license":
-                                action.attrs["transaction_id"] = transaction_id
-                        elif "path" in action.attrs:
-                                path = action.attrs["path"]
-                                deps, u = process_dependencies(fname, path)
-                                pkg.depend += deps
-                                if u:
-                                        print "%s has missing dependencies: " \
-                                            "%s" % (path, u)
-                                        undeps |= set(u)
-                #
-                # fmris may not be completely specified; enhance them to current
-                # version if this is the case
-                #
-                for attr in action.attrs:
-                        if attr == "fmri" and \
-                            "@" not in action.attrs[attr] and \
-                            action.attrs[attr][5:] in pkgdict:
-                                action.attrs[attr] += "@%s" % \
-                                    pkgdict[action.attrs[attr][5:]].version
-                t.add(action)
-
-        if pkg.desc:
-                print "    %s add set pkg.description=%s" % (pkg.name, pkg.desc)
-                attrs = dict(name="pkg.description", value=pkg.desc)
-                action = actions.attribute.AttributeAction(None, **attrs)
-                t.add(action)
-
-        if pkg.summary:
-                print "    %s add set pkg.summary=%s" % (pkg.name, pkg.summary)
-                attrs = dict(name="pkg.summary", value=pkg.summary)
-                action = actions.attribute.AttributeAction(None, **attrs)
-                t.add(action)
-
-                # Retain the description entry in the package manifest for 
-                # now for backward compatibility.
-                print "    %s add set description=%s" % (pkg.name, pkg.summary)
-                action = actions.attribute.AttributeAction(None,
-                    description = pkg.summary)
-                t.add(action)
-
-        if pkg.classification:
-                print "    %s add set info.classification=%s" % \
-                    (pkg.name, pkg.classification)
-                attrs = dict(name="info.classification",
-                             value=pkg.classification)
-                action = actions.attribute.AttributeAction(None, **attrs)
-                t.add(action)
-
-        if pkg.name != "SUNWipkg":
-                for p in pkg.srcpkgs:
-                        try:
-                                sp = svr4pkgsseen[p]
-                        except KeyError:
-                                continue
-
-                        wanted_attrs = (
-                                "PKG", "NAME", "ARCH", "VERSION", "CATEGORY",
-                                "VENDOR", "DESC", "HOTLINE"
-                                )
-                        attrs = dict(
-                                (k.lower(), v)
-                                for k, v in sp.pkginfo.iteritems()
-                                if k in wanted_attrs
-                                )
-                        attrs["pkg"] = sp.pkginfo["PKG.PLAT"]
-
-                        action = actions.legacy.LegacyAction(None, **attrs)
-
-                        print "    %s add %s" % (pkg.name, action)
-                        t.add(action)
-
-        if undeps:
-                print "Missing dependencies:", list(undeps)
-
-        print "    close"
-        pkg_fmri, pkg_state = t.close(refresh_index=not defer_refresh)
-        print "%s: %s\n" % (pkg_fmri, pkg_state)
-
-def process_link_dependencies(path, target):
-        orig_target = target
-        if target[0] != "/":
-                target = os.path.normpath(
-                    os.path.join(os.path.split(path)[0], target))
-
-        if target in usedlist:
-                if show_debug:
-                        print "hardlink %s -> %s makes %s depend on %s" % \
-                            (
-                                path, orig_target,
-                                usedlist[path][1].name,
-                                usedlist[target][1].name
-                                )
-                return ["%s@%s" % (usedlist[target][1].name,
-                    usedlist[target][1].version)]
-        else:
-                return []
-
-def process_dependencies(fname, path):
-        if not elf.is_elf_object(fname):
-                return process_non_elf_dependencies(fname, path)
-
-        ei = elf.get_info(fname)
-        try:
-                ed = elf.get_dynamic(fname)
-        except elf.ElfError:
-                deps = []
-                rp = []
-        else:
-                deps = [
-                    d[0]
-                    for d in ed.get("deps", [])
-                ]
-                rp = ed.get("runpath", "").split(":")
-                if len(rp) == 1 and rp[0] == "":
-                        rp = []
-
-        rp = [
-            os.path.normpath(p.replace("$ORIGIN", "/" + os.path.dirname(path)))
-            for p in rp
-        ]
-
-        kernel64 = None
-
-        # For kernel modules, default path resolution is /platform/<platform>,
-        # /kernel, /usr/kernel.  But how do we know what <platform> would be for
-        # a given module?  Does it do fallbacks to, say, sun4u?
-        if path.startswith("kernel") or path.startswith("usr/kernel") or \
-            (path.startswith("platform") and path.split("/")[2] == "kernel"):
-                if rp:
-                        print "RUNPATH set for kernel module (%s): %s" % \
-                            (path, rp)
-                # Default kernel search path
-                rp.extend(("/kernel", "/usr/kernel"))
-                # What subdirectory should we look in for 64-bit kernel modules?
-                if ei["bits"] == 64:
-                        if ei["arch"] == "i386":
-                                kernel64 = "amd64"
-                        elif ei["arch"] == "sparc":
-                                kernel64 = "sparcv9"
-                        else:
-                                print ei["arch"]
-        else:
-                if "/lib" not in rp:
-                        rp.append("/lib")
-                if "/usr/lib" not in rp:
-                        rp.append("/usr/lib")
-
-        # XXX Do we need to handle anything other than $ORIGIN?  x86 images have
-        # a couple of $PLATFORM and $ISALIST instances.
-        for p in rp:
-                if "$" in p:
-                        tok = p[p.find("$"):]
-                        if "/" in tok:
-                                tok = tok[:tok.find("/")]
-                        print "%s has dynamic token %s in rpath" % (path, tok)
-
-        dep_pkgs = []
-        undeps = []
-        depend_list = []
-        for d in deps:
-                for p in rp:
-                        # The instances of "[1:]" below are because usedlist
-                        # stores paths without leading slash
-                        if kernel64:
-                                # Find 64-bit modules the way krtld does.
-                                # XXX We don't resolve dependencies found in
-                                # /platform, since we don't know where under
-                                # /platform to look.
-                                head, tail = os.path.split(d)
-                                deppath = os.path.join(p,
-                                                       head,
-                                                       kernel64,
-                                                       tail)[1:]
-                        else:
-                                # This is a hack for when a runpath uses the 64
-                                # symlink to the actual 64-bit directory.
-                                # Better would be to see if the runpath was a
-                                # link, and if so, use its resolution, but
-                                # extracting that information from used list is
-                                # a pain, especially because you potentially
-                                # have to resolve symlinks at all levels of the
-                                # path.
-                                if p.endswith("/64"):
-                                        if ei["arch"] == "i386":
-                                                p = p[:-2] + "amd64"
-                                        elif ei["arch"] == "sparc":
-                                                p = p[:-2] + "sparcv9"
-                                deppath = os.path.join(p, d)[1:]
-                        if deppath in usedlist:
-                                dep_pkgs += [ "%s@%s" %
-                                    (usedlist[deppath][1].name,
-                                    usedlist[deppath][1].version) ]
-                                depend_list.append(
-                                        (
-                                                deppath,
-                                                usedlist[deppath][1].name
-                                                )
-                                        )
-                                break
-                else:
-                        undeps += [ d ]
-
-        if show_debug:
-                print "%s makes %s depend on %s" % \
-                    (path, usedlist[path][1].name, depend_list)
-
-        return dep_pkgs, undeps
-
-def process_non_elf_dependencies(localpath, path):
-        # localpath is path to actual file
-        # path is path in installed image
-        # take 1
-        dep_pkgs = []
-        undeps = []
-
-        f = file(localpath)
-        l = f.readline()
-        f.close()
-
-        # add #!/ dependency
-        if l.startswith("#!/"):
-                # usedlist omits leading /
-                p = (l[2:].split()[0]) # first part of string is path (removes options)
-                # we don't handle dependencies through links, so fix up the common one
-                if p.startswith("/bin"):
-                        p = "/usr" + p
-                if p[1:] in usedlist:
-                        dep_pkgs += [ "%s@%s" % (
-                            usedlist[p[1:]][1].name,
-                            usedlist[p[1:]][1].version) 
-                        ]    
-                        print "Added dependency on %s because of %s" % (usedlist[p[1:]][1].name, p)
-                else:
-                        undeps = [ p ]
-                        
-        return dep_pkgs, undeps
-
-def zap_strings(instr, strings):
-        """takes an input string and a list of strings to be removed, ignoring
-        case"""
-        for s in strings:
-                ls = s.lower()
-                while True:
-                        li = instr.lower()
-                        i = li.find(ls)
-                        if i < 0:
-                                break
-                        instr = instr[0:i] + instr[i + len(ls):]
-        return instr 
-
-def get_branch(name):
-        return branch_dict.get(name, def_branch)
-
-def_vers = "0.5.11"
-def_branch = ""
-def_wos_path = ["/net/netinstall.eng/export/nv/x/latest/Solaris_11/Product"]
-create_repo = False
-nopublish = False
-show_debug = False
-print_pkg_names = False
-def_repo = "http://localhost:10000"
-wos_path = []
-include_path = []
-branch_dict = {}
-timestamp_files = []
-
-#
-# files (by path) we always delete for bulk imports
-# note that we ignore these if specifically included.
-#
-elided_files = {}
-#
-# if user uses -j, just_these_pkgs becomes list of pkgs to process
-# allowing other arguments to be read in as files...
-#
-just_these_pkgs = []
-#
-# strings to rip out of summaries (case insensitve)
-#
-summary_detritus = [", (usr)", ", (root)", " (usr)", " (root)",
-" (/usr)", " - / filesystem", ",root(/)"]
-#
-# list of global includes to add to every package
-#
-global_includes = []
-# list of macro substitutions
-macro_definitions = {}
-
-def set_macro(key, value):
-        macro_definitions.update([("$(%s)" % key, value)])
-
-def clear_macro(key):
-        del macro_definitions["$(%s)" % key]
-
-try:
-        _opts, _args = getopt.getopt(sys.argv[1:], "B:D:I:G:NT:b:dj:m:ns:v:w:p:")
-except getopt.GetoptError, _e:
-        print "unknown option", _e.opt
-        sys.exit(1)
-
-g_proto_area = os.environ.get("ROOT", "")
-
-for opt, arg in _opts:
-        if opt == "-b":
-                def_branch = arg.rstrip("abcdefghijklmnopqrstuvwxyz")
-        elif opt == "-d":
-                show_debug = True
-        elif opt == "-j": # means we're using the new argument form...
-                just_these_pkgs.append(arg)
-        elif opt == "-m":
-                _a = arg.split("=", 1)
-                set_macro(_a[0], _a[1])
-        elif opt == "-n":
-                nopublish = True
-        elif opt == "-p":
-                if not os.path.exists(arg):
-                        raise RuntimeError("Invalid prototype area specified.")
-                # Clean up relative ../../, etc. out of path to proto
-                g_proto_area = os.path.realpath(arg)
-        elif  opt == "-s":
-                def_repo = arg
-                if def_repo.startswith("file://"):
-                        # When publishing to file:// repositories, automatically
-                        # create the target repository if needed.
-                        create_repo = True
-        elif opt == "-v":
-                def_vers = arg
-        elif opt == "-w":
-                wos_path.append(arg)
-        elif opt == "-D":
-                elided_files[arg] = True
-        elif opt == "-I":
-                include_path.extend(arg.split(":"))
-        elif opt == "-B":
-                branch_file = file(arg)
-                for _line in branch_file:
-                        if not _line.startswith("#"):
-                                bfargs = _line.split()
-                                if len(bfargs) == 2:
-                                        branch_dict[bfargs[0]] = bfargs[1]
-                branch_file.close()
-        elif opt == "-G": #another file of global includes
-                global_includes.append(arg)
-        elif opt == "-N":
-                print_pkg_names = True
-        elif opt == "-T":
-                timestamp_files.append(arg)
-
-if not def_branch:
-        print "need a branch id (build number)"
-        sys.exit(1)
-elif "." not in def_branch:
-        print "branch id needs to be of the form 'x.y'"
-        sys.exit(1)
-
-if not _args:
-        print "need argument!"
-        sys.exit(1)
-
-if not wos_path:
-        wos_path = def_wos_path
-
-if just_these_pkgs:
-        filelist = _args
-else:
-        filelist = _args[0:1]
-        just_these_pkgs = _args[1:]
-
-
-in_multiline_import = False
-
-# This maps what files we've seen to a tuple of what packages they came from and
-# what packages they went into, so we can prevent more than one package from
-# grabbing the same file.
-usedlist = {}
-
-#
-# pkgdict contains ipkgs by name
-#
-pkgdict = {}
-
-#
-# destpkgs contains the list of ipkgs generated from each svr4 pkg
-# this is needed to generate metaclusters
-#
-destpkgs = {}
-
-#
-#svr4 pkgs seen - pkgs indexed by name
-#
-svr4pkgsseen = {}
-
-#
-#paths where we found the packages we need
-#
-svr4pkgpaths = {}
-
-#
-# editable files and where they're found
-#
-editable_files = {}
-
-#
-# hollow svr4 packages processed
-#
-hollow_pkgs = {}
-
-
-reuse_err = \
-    "Conflict in path %s: IPS %s SVR4 %s from %s with IPS %s SVR4 %s from %s"
-
-
-# First pass: don't actually publish anything, because we're not collecting
-# dependencies here.
-def read_full_line(lexer, continuation='\\'):
-        """Read a complete line, allowing for the possibility of it being
-        continued over multiple lines.  Returns a single joined line, with
-        continuation characters and leading and trailing spaces removed.
-        """
-
-        lines = []
-        while True:
-                line = lexer.instream.readline().strip()
-                lexer.lineno = lexer.lineno + 1
-                if line[-1] in continuation:
-                        lines.append(line[:-1])
-                else:
-                        lines.append(line)
-                        break
-
-        return apply_macros(' '.join(lines))
-
-def apply_macros(s):
-        """Apply macro subs defined on command line... keep applying
-        macros until no translations are found.  If macro translates
-        to a comment, replace entire token text."""
-        while s and "$(" in s:
-                for key in macro_definitions.keys():
-                        if key in s:
-                                value = macro_definitions[key]
-                                if value == "#": # comment character
-                                        s = "#"  # affects whole token
-                                        break
-                                s = s.replace(key, value)
-                                break # look for more substitutions
-                else:
-                        break # no more substitutable tokens
-        return s
-
-def sourcehook(filename):
-        """ implement include hierarchy """
-        for i in include_path:
-                f = os.path.join(i, filename)
-                if os.path.exists(f):
-                        return (f, open(f))
-
-        return filename, open(filename)
-
-class tokenlexer(shlex.shlex):
-        def read_token(self):
-                """ simple replacement of $(ARCH) with a non-special
-                value defined on the command line is trivial.  Since
-                shlex's read_token routine also strips comments and
-                white space, this read_token cannot return either 
-                one so any macros that translate to either spaces or
-                # (comment) need to be removed from the token stream."""
-
-                while True:
-                        s = apply_macros(shlex.shlex.read_token(self))
-                        if s == "#": # discard line if comment; try again
-                                self.instream.readline()
-                                self.lineno = self.lineno + 1
-                        # bail on EOF or not space; loop on space
-                        elif s == None or (s != "" and not s.isspace()):
-                                break
-                return s
-
-curpkg = None
-def SolarisParse(mf):
-        global curpkg
-        global in_multiline_import
-
-        lexer = tokenlexer(file(mf), mf, True)
-        lexer.whitespace_split = True
-        lexer.source = "include"
-        lexer.sourcehook = sourcehook
-
-        while True:
-                token = lexer.get_token()
-
-                if not token:
-                        break
-
-                if token == "package":
-                        curpkg = start_package(lexer.get_token())
-
-                        if print_pkg_names:
-                                print "-j %s" % curpkg.name
-
-                elif token == "end":
-                        endarg = lexer.get_token()
-                        if endarg == "package":
-                                if print_pkg_names:
-                                        curpkg = None
-                                        continue
-
-                                for filename in global_includes:
-                                        for i in include_path:
-                                                f = os.path.join(i, filename)
-                                                if os.path.exists(f):
-                                                        SolarisParse(f)
-                                                        break
-                                        else:
-                                                raise RuntimeError("File not "
-                                                    "found: %s" % filename)
-                                try:
-                                        end_package(curpkg)
-                                except Exception, e:
-                                        print "ERROR(end_pkg):", e
-
-                                curpkg = None
-                        if endarg == "import":
-                                in_multiline_import = False
-                                curpkg.imppkg = None
-
-                elif token == "version":
-                        curpkg.version = lexer.get_token()
-
-                elif token == "import":
-                        package_name = lexer.get_token()
-                        next = lexer.get_token()
-                        if next != "exclude":
-                                line = ""
-                                lexer.push_token(next)
-                        else:
-                                line = read_full_line(lexer)
-
-                        if not print_pkg_names:
-                                curpkg.import_pkg(package_name, line)
-
-                elif token == "from":
-                        pkgspec = lexer.get_token()
-                        if not print_pkg_names:
-                                p = SolarisPackage(pkg_path(pkgspec))
-                                curpkg.imppkg = p
-                                spkgname = p.pkginfo["PKG.PLAT"]
-                                svr4pkgpaths[spkgname] = pkg_path(pkgspec)
-                                svr4pkgsseen[spkgname] = p
-                                curpkg.add_svr4_src(spkgname)
-
-                        junk = lexer.get_token()
-                        assert junk == "import"
-                        in_multiline_import = True
-
-                elif token == "classification":
-                        cat_subcat = lexer.get_token()
-                        curpkg.classification.append(
-                            "org.opensolaris.category.2008:%s" % cat_subcat)
-
-                elif token == "description":
-                        curpkg.desc = lexer.get_token()
-
-                elif token == "summary":
-                        curpkg.summary = lexer.get_token()
-
-                elif token == "depend":
-                        curpkg.depend.append(lexer.get_token())
-
-                elif token == "depend_path":
-                        curpkg.file_depend.append(lexer.get_token())
-
-                elif token == "cluster":
-                        curpkg.add_svr4_src(lexer.get_token())
-
-                elif token == "idepend":
-                        curpkg.idepend.append(lexer.get_token())
-
-                elif token == "undepend":
-                        curpkg.undepend.append(lexer.get_token())
-
-                elif token == "add":
-                        curpkg.extra.append(read_full_line(lexer))
-
-                elif token == "drop":
-                        f = lexer.get_token()
-                        if print_pkg_names:
-                                continue
-                        l = [o for o in curpkg.files if o.pathname == f]
-                        if not l:
-                                print "Cannot drop '%s' from '%s': not " \
-                                    "found" % (f, curpkg.name)
-                        else:
-                                del curpkg.files[curpkg.files.index(l[0])]
-                                # XXX The problem here is that if we do this on
-                                # a shared file (directory, etc), then it's
-                                # missing from usedlist entirely, since we don't
-                                # keep around *all* packages delivering a shared
-                                # file, just the last seen.  This probably
-                                # doesn't matter much.
-                                del usedlist[f]
-
-                elif token == "drop_license":
-                        curpkg.dropped_licenses.append(lexer.get_token())
-
-                elif token == "chattr":
-                        fname = lexer.get_token()
-                        line = read_full_line(lexer)
-                        if print_pkg_names:
-                                continue
-                        try:
-                                curpkg.chattr(fname, line)
-                        except Exception, e:
-                                print "Can't change attributes on " + \
-                                    "'%s': not in the package" % fname, e
-                                raise
-
-                elif token == "chattr_glob":
-                        glob = lexer.get_token()
-                        line = read_full_line(lexer)
-                        if print_pkg_names:
-                                continue
-                        try:
-                                curpkg.chattr_glob(glob, line)
-                        except Exception, e:
-                                print "Can't change attributes on " + \
-                                    "'%s': no matches in the package" % \
-                                    glob, e
-                                raise
-
-                elif in_multiline_import:
-                        next = lexer.get_token()
-                        if next == "with":
-                                # I can't imagine this is supported, but there's
-                                # no other way to read the rest of the line
-                                # without a whole lot more pain.
-                                line = read_full_line(lexer)
-                        else:
-                                lexer.push_token(next)
-                                line = ""
-
-                        try:
-                                curpkg.import_file(token, line)
-                        except Exception, e:
-                                print "ERROR(import_file):", e
-                                raise
-                else:
-                        raise RuntimeError("Error: unknown token '%s' "
-                            "(%s:%s)" % (token, lexer.infile, lexer.lineno))
-
-if print_pkg_names:
-        for _mf in filelist:
-                SolarisParse(_mf)
-        sys.exit(0)
-
-
-print "First pass:", datetime.now()
-
-for _mf in filelist:
-        SolarisParse(_mf)
-
-seenpkgs = set(i[0] for i in usedlist.values())
-
-print "Files you seem to have forgotten:\n  " + "\n  ".join(
-    "%s %s" % (f.type, f.pathname)
-    for pkg in seenpkgs
-    for f in svr4pkgsseen[pkg].manifest
-    if f.type != "i" and f.pathname not in usedlist)
-
-print "\n\nDuplicate Editables files list:\n"
-
-if editable_files:
-        length = 2 + max(len(p) for p in editable_files)
-        for paths in editable_files:
-                if len(editable_files[paths]) > 1:
-                        print ("%s:" % paths).ljust(length - 1) + \
-                            ("\n".ljust(length)).join("%s (from %s)" % \
-                            (l[1].name, l[0]) for l in editable_files[paths])
-
-
-# Second pass: iterate over the existing package objects, gathering dependencies
-# and publish!
-
-print "Second pass:", datetime.now()
-
-print "New packages:\n"
-# XXX Sort these.  Preferably topologically, if possible, alphabetically
-# otherwise (for a rough progress gauge).
-if just_these_pkgs:
-        newpkgs = set(pkgdict[name]
-                      for name in pkgdict.keys()
-                      if name in just_these_pkgs
-                      )
-else:
-        newpkgs = set(pkgdict.values())
-
-# Indicates whether search indices refresh will be deferred until the end.
-defer_refresh = False
-# Indicates whether local publishing is active.
-local_publish = False
-if def_repo.startswith("file:"):
-        # If publishing to disk, the search indices should be refreshed at
-        # the end of the publishing process and the feed cache will have to be
-        # generated by starting the depot server using the provided path and
-        # then accessing it.
-        defer_refresh = True
-        local_publish = True
-
-processed = 0
-total = len(newpkgs)
-for _p in sorted(newpkgs):
-        print "Package '%s'" % _p.name
-        print "  Version:", _p.version
-        print "  Description:", _p.desc
-        print "  Summary:", _p.summary
-        print "  Classification:", ",".join(_p.classification)
-        try:
-                publish_pkg(_p)
-        except trans.TransactionError, _e:
-                print "%s: FAILED: %s\n" % (_p.name, _e)
-        processed += 1
-        print "%d/%d packages processed; %.2f%% complete" % (processed, total,
-            processed * 100.0 / total)
-
-if not nopublish and defer_refresh:
-        # This has to be done at the end for some publishing modes.
-        print "Updating search indices..."
-        _t = trans.Transaction(def_repo)
-        _t.refresh_index()
-
-# Ensure that the feed is updated and cached to reflect changes.
-if not nopublish:
-        print "Caching RSS/Atom feed..."
-        dc = None
-        durl = def_repo
-        if local_publish:
-                # The depot server isn't already running, so will have to be
-                # temporarily started to allow proper feed cache generation.
-                dc = depotcontroller.DepotController()
-                dc.set_depotd_path(g_proto_area + "/usr/lib/pkg.depotd")
-                dc.set_depotd_content_root(g_proto_area + "/usr/share/lib/pkg")
-
-                _scheme, _netloc, _path, _params, _query, _fragment = \
-                    urlparse.urlparse(def_repo, "file", allow_fragments=0)
-
-                dc.set_repodir(_path)
-
-                # XXX There must be a better way...
-                dc.set_port(29083)
-
-                # Start the depot
-                dc.start()
-
-                durl = "http://localhost:29083"
-
-        _f = urllib.urlopen("%s/feed" % durl)
-        _f.close()
-
-        if dc:
-                dc.stop()
-                dc = None
-
-print "Done:", datetime.now()
--- a/src/util/publish/merge.py	Tue Jul 13 18:17:07 2010 -0700
+++ b/src/util/publish/merge.py	Thu Jul 15 15:40:39 2010 -0700
@@ -35,15 +35,21 @@
 import warnings
 
 import pkg.fmri
-import pkg.pkgtarfile as ptf
+import pkg.client.api_errors as apx
+import pkg.client.publisher as publisher
+import pkg.client.transport as transport
 import pkg.actions as actions
 import pkg.manifest as manifest
-import pkg.server.catalog as catalog
 import pkg.version as version
 
-from pkg.misc import versioned_urlopen, gunzip_from_stream, msg, PipeError
+from pkg.misc import PipeError
 from pkg.client import global_settings
 
+pub = None
+tmpdirs = []
+xport = None
+xport_cfg = None
+
 def pname():
         return os.path.basename(sys.argv[0])
 
@@ -68,133 +74,60 @@
 
         print >> sys.stderr, pname() + ": " + error
 
-def fetch_files_byhash(server_url, hashes, pkgdir):
+def fetch_files_byaction(repouri, actions, pkgdir):
         """Given a list of files named by content hash, download from
-        server_url into pkgdir."""
-
-        req_dict = { }
-
-        for i, k in enumerate(hashes):
-                str = "File-Name-%s" % i
-                req_dict[str] = k
+        repouri into pkgdir."""
 
-        req_str = urllib.urlencode(req_dict)
-
-        try:
-                f, v = versioned_urlopen(server_url, "filelist", [0],
-                    data = req_str)
-        except:
-                error(_("Unable to download files from: %s") % server_url)
-                sys.exit(1)
-
-        tar_stream = ptf.PkgTarFile.open(mode = "r|", fileobj = f)
+        mfile = xport.multi_file_ni(repouri, pkgdir, decompress=True)
 
-        if not os.path.exists(pkgdir):
-                try:
-                        os.makedirs(pkgdir)
-                except:
-                        error(_("Unable to create directory: %s") % pkgdir)
-                        sys.exit(1)
+        for a in actions:
+                mfile.add_action(a) 
 
-        for info in tar_stream:
-                gzfobj = None
-                try:
-                        # Uncompress as we retrieve the files
-                        gzfobj = tar_stream.extractfile(info)
-                        fpath = os.path.join(pkgdir, info.name)
-                        outfile = open(fpath, "wb")
-                        gunzip_from_stream(gzfobj, outfile)
-                        outfile.close()
-                        gzfobj.close()
-                except:
-                        error(_("Unable to extract file: %s") % info.name)
-                        sys.exit(1)
+        mfile.wait_files()
 
-        tar_stream.close()
-        f.close()
-
-manifest_cache={}
+manifest_cache = {}
 null_manifest = manifest.Manifest()
 
-def get_manifest(server_url, fmri):
+def get_manifest(repouri, fmri):
         if not fmri: # no matching fmri
                 return null_manifest
 
-        key = "%s->%s" % (server_url, fmri)
+        key = "%s->%s" % (repouri.uri, fmri)
         if key not in manifest_cache:
-                manifest_cache[key] = fetch_manifest(server_url, fmri)
+                manifest_cache[key] = fetch_manifest(repouri, fmri)
         return manifest_cache[key]
 
-def fetch_manifest(server_url, fmri):
+def fetch_manifest(repouri, fmri):
         """Fetch the manifest for package-fmri 'fmri' from the server
         in 'server_url'... return as Manifest object."""
-        # Request manifest from server
 
-        try:
-                m, v = versioned_urlopen(server_url, "manifest", [0],
-                    fmri.get_url_path())
-        except:
-                error(_("Unable to download manifest %s from %s") %
-                    (fmri.get_url_path(), server_url))
-                sys.exit(1)
-
-        # Read from server, write to file
-        try:
-                mfst_str = m.read()
-        except:
-                error(_("Error occurred while reading from: %s") % server_url)
-                sys.exit(1)
-
+        mfst_str = xport.get_manifest(fmri, pub=repouri, content_only=True)
         m = manifest.Manifest()
         m.set_content(mfst_str)
 
         return m
 
-catalog_cache = {}
-
-def get_catalog(server_url):
-        if server_url not in catalog_cache:
-                catalog_cache[server_url] = fetch_catalog(server_url)
-        return catalog_cache[server_url][0]
-
-def cleanup_catalogs():
-        global catalog_cache
-        for c, d in catalog_cache.values():
-                shutil.rmtree(d)
-        catalog_cache = {}
-
-def fetch_catalog(server_url):
+def fetch_catalog(repouri):
         """Fetch the catalog from the server_url."""
 
-        # open connection for catalog
-        try:
-                c, v = versioned_urlopen(server_url, "catalog", [0])
-        except:
-                error(_("Unable to download catalog from: %s") % server_url)
-                sys.exit(1)
-
-        # make a tempdir for catalog
-        dl_dir = tempfile.mkdtemp()
+        if not pub.meta_root:
+                # Create a temporary directory for catalog.
+                cat_dir = tempfile.mkdtemp()
+                tmpdirs.append(cat_dir)
+                pub.meta_root = cat_dir
 
-        # call catalog.recv to pull down catalog
-        try:
-                catalog.ServerCatalog.recv(c, dl_dir)
-        except:
-                error(_("Error while reading from: %s") % server_url)
-                sys.exit(1)
+        pub.transport = xport
+        # Pull catalog only from this host
+        pub.selected_repository.origins = [repouri]
+        pub.refresh(True, True)
 
-        # close connection to server
-        c.close()
+        cat = pub.catalog
 
-        # instantiate catalog object
-        cat = catalog.ServerCatalog(dl_dir, read_only=True)
-
-        # return (catalog, tmpdir path)
-        return cat, dl_dir
+        return cat
 
 catalog_dict = {}
-def load_catalog(server_url):
-        c = get_catalog(server_url)
+def load_catalog(repouri):
+        c = fetch_catalog(repouri)
         d = {}
         for f in c.fmris():
                 if f.pkg_name in d:
@@ -203,56 +136,63 @@
                         d[f.pkg_name] = [f]
                 for k in d.keys():
                         d[k].sort(reverse = True)
-        catalog_dict[server_url] = d
+        catalog_dict[repouri.uri] = d
 
-def expand_fmri(server_url, fmri_string, constraint=version.CONSTRAINT_AUTO):
+def expand_fmri(repouri, fmri_string, constraint=version.CONSTRAINT_AUTO):
         """ from specified server, find matching fmri using CONSTRAINT_AUTO
         cache for performance.  Returns None if no matching fmri is found """
-        if server_url not in catalog_dict:
-                load_catalog(server_url)
+        if repouri.uri not in catalog_dict:
+                load_catalog(repouri)
 
         fmri = pkg.fmri.PkgFmri(fmri_string, "5.11")
 
-        for f in catalog_dict[server_url].get(fmri.pkg_name, []):
+        for f in catalog_dict[repouri.uri].get(fmri.pkg_name, []):
                 if not fmri.version or f.version.is_successor(fmri.version, constraint):
                         return f
         return None
 
-def get_all_pkg_names(server_url):
+def get_all_pkg_names(repouri):
         """ return all the pkg_names in this catalog """
-        if server_url not in catalog_dict:
-                load_catalog(server_url)
-        return catalog_dict[server_url].keys()
+        if repouri.uri not in catalog_dict:
+                load_catalog(repouri)
+        return catalog_dict[repouri.uri].keys()
 
-def get_dependencies(server_url, fmri_list):
+def get_dependencies(repouri, fmri_list):
         s = set()
         for f in fmri_list:
-                fmri = expand_fmri(server_url, f)
-                _get_dependencies(s, server_url, fmri)
+                fmri = expand_fmri(repouri, f)
+                _get_dependencies(s, repouri, fmri)
         return s
 
-def _get_dependencies(s, server_url, fmri):
+def _get_dependencies(s, repouri, fmri):
         """ recursive incorp expansion"""
         s.add(fmri)
-        for a in get_manifest(server_url, fmri).gen_actions_by_type("depend"):
+        for a in get_manifest(repouri, fmri).gen_actions_by_type("depend"):
                 if a.attrs["type"] == "incorporate":
-                        new_fmri = expand_fmri(server_url, a.attrs["fmri"])
+                        new_fmri = expand_fmri(repouri, a.attrs["fmri"])
                         if new_fmri and new_fmri not in s:
-                                _get_dependencies(s, server_url, new_fmri)
+                                _get_dependencies(s, repouri, new_fmri)
         return s
 
+def cleanup():
+        """To be called at program finish."""
+
+        for d in tmpdirs:
+                shutil.rmtree(d, True)
 
 def main_func():
 
+        global pub, xport, xport_cfg
         basedir = None
         newfmri = False
+        incomingdir = None
 
         gettext.install("pkg", "/usr/share/locale")
 
         global_settings.client_name = "pkgmerge"
 
         try:
-               opts, pargs = getopt.getopt(sys.argv[1:], "d:nrv:")
+                opts, pargs = getopt.getopt(sys.argv[1:], "d:nrv:")
         except getopt.GetoptError, e:
                 usage(_("Illegal option -- %s") % e.opt)
 
@@ -276,12 +216,22 @@
 
         if not basedir:
                 basedir = os.getcwd()
+        
+        incomingdir = os.path.normpath(os.path.join(basedir,
+            "incoming-%d" % os.getpid()))
+        os.makedirs(incomingdir)
+        tmpdirs.append(incomingdir)
 
         server_list = [
-            v.split(",", 1)[1]
+            publisher.RepositoryURI(v.split(",", 1)[1])
             for v in varlist
         ]
 
+        xport, xport_cfg = transport.setup_transport()
+        xport_cfg.incoming_download_dir = incomingdir
+        pub = transport.setup_publisher(server_list, "merge", xport, xport_cfg,
+            remote_prefix=True)
+
         if len(pargs) == 1:
                 recursive = False
                 overall_set = set()
@@ -346,7 +296,7 @@
                         continue
 
                 merge_fmris(server_list, fmri_list, variant_list, variant, basedir, basename, get_files)
-        cleanup_catalogs()
+        cleanup()
 
         return 0
 
@@ -396,7 +346,7 @@
                         a.attrs[variant] = v
 
         # combine actions into single list
-        allactions = reduce(lambda a,b:a + b, action_lists)
+        allactions = reduce(lambda a, b: a + b, action_lists)
 
         # figure out which variants are actually there for this pkg
         actual_variant_list = [
@@ -444,10 +394,10 @@
                         d[a] = 1
                         return False
 
-                hash_sets = [
+                action_sets = [
                         set(
                                 [
-                                 a.hash
+                                 a
                                  for a in action_list
                                  if hasattr(a, "hash") and not \
                                  repeated(a.hash, already_seen)
@@ -457,9 +407,11 @@
                         ]
                 # remove duplicate files (save time)
 
-                for server, hash_set in zip(server_list + [server_list[0]], hash_sets):
-                        if len(hash_set) > 0:
-                                fetch_files_byhash(server, hash_set, basedir)
+                for server, action_set in zip(server_list + [server_list[0]],
+                    action_sets):
+                        if len(action_set) > 0:
+                                fetch_files_byaction(server, action_set,
+                                    basedir)
 
         return 0
 
@@ -471,14 +423,22 @@
 
         try:
                 ret = main_func()
+        except (apx.InvalidDepotResponseException, apx.TransportError,
+            apx.BadRepositoryURI, apx.UnsupportedRepositoryURI), e:
+                cleanup()
+                print >> sys.stderr, e
+                sys.exit(1)
         except SystemExit, e:
+                cleanup()
                 raise e
         except (PipeError, KeyboardInterrupt):
                 # We don't want to display any messages here to prevent
                 # possible further broken pipe (EPIPE) errors.
+                cleanup()
                 sys.exit(1)
         except:
                 traceback.print_exc()
+                cleanup()
                 sys.exit(99)
         sys.exit(ret)