16818 pkgrecv -d should publish per-package, not in one lump
authorShawn Walker <shawn.walker@oracle.com>
Thu, 21 Oct 2010 14:28:40 -0700
changeset 2115 c6a812ab117a
parent 2114 96e055a673c9
child 2116 01cac360e383
16818 pkgrecv -d should publish per-package, not in one lump 9197 pkgrecv could show progress indicator for republishing 9190 pkgrecv dry-run option desired 12701 pkgsend should take a relative path 16817 pkgrecv -d too subtle in functionality 17172 pkgrecv should defer catalog update until operation completion 17241 multi-publisher repository operations busted for http 17242 pkgsend should accept simple paths for repository location 17243 pkgrecv should accept simple paths for repository location 17280 pkgrecv should support multi-publisher repositories
src/gui/modules/progress.py
src/man/pkgrecv.1.txt
src/modules/client/progress.py
src/modules/client/transport/engine.py
src/modules/client/transport/repo.py
src/modules/client/transport/transport.py
src/modules/publish/transaction.py
src/modules/server/depot.py
src/publish.py
src/pull.py
src/tests/cli/t_pkg_image_create.py
src/tests/cli/t_pkg_refresh.py
src/tests/cli/t_pkgrecv.py
--- a/src/gui/modules/progress.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/gui/modules/progress.py	Thu Oct 21 14:28:40 2010 -0700
@@ -121,11 +121,11 @@
 
         def dl_output(self):
                 self.display_download_info()
-                if self.prev_pkg != self.dl_cur_pkg:
-                        self.prev_pkg = self.dl_cur_pkg
+                if self.prev_pkg != self.cur_pkg:
+                        self.prev_pkg = self.cur_pkg
                         self.update_details_text(
                             _("Package %d of %d: %s\n") % (self.dl_cur_npkgs+1, 
-                            self.dl_goal_npkgs, self.dl_cur_pkg), "level1")
+                            self.dl_goal_npkgs, self.cur_pkg), "level1")
 
         def dl_output_done(self):
                 self.update_details_text("\n")
--- a/src/man/pkgrecv.1.txt	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/man/pkgrecv.1.txt	Thu Oct 21 14:28:40 2010 -0700
@@ -6,42 +6,35 @@
 
 SYNOPSIS
      /usr/bin/pkgrecv [-s src_uri] [-d (path|dest_uri)] [-c cache_dir]
-         [-kr] [-m match] (fmri|pattern) ...
-     /usr/bin/pkgrecv [-s src_uri] -n
+         [-kr] [-m match] [-n] [--raw] (fmri|pattern) ...
+     /usr/bin/pkgrecv [-s src_uri] --newest
 
 DESCRIPTION
      pkgrecv allows the user to retrieve the contents of a package
      from a pkg(5) repository.  With the default options, the contents
-     are retrieved in a format that can easily be input to pkgsend(1)
-     when used with the 'include' subcommand.  As a result, pkgrecv(1)
-     and pkgsend(1) can be used to conveniently modify and republish
-     packages, perhaps by correcting file contents or providing
-     additional package tags.
+     are retrieved in a repository format suitable for use with
+     pkg.depotd(1M).
 
 OPTIONS
      The following options are supported:
 
      -c cache_dir    The path to a directory that will be used to cache
-		     downloaded content.  If one is not supplied, the
-		     client will automatically pick a cache directory.
-		     In the case where a download is interrupted, and a
-		     cache directory was automatically chosen, use this
-		     option to resume the download.
+                     downloaded content.  If one is not supplied, the
+                     client will automatically pick a cache directory.
+                     In the case where a download is interrupted, and a
+                     cache directory was automatically chosen, use this
+                     option to resume the download.
 
-     -d path_or_uri  The path of a directory to save the retrieved package
-                     to, or the URI of a repository to republish it to.  If
-                     not provided, the default value is the current working
-                     directory.  If a directory path is provided, then
-                     package content will only be retrieved if it does not
-                     already exist in the target directory.  If a repository
-                     URI is provided, a temporary directory will be created
-                     and all of the package data retrieved before attempting
-                     to republish it.
+     -d path_or_uri  The filesystem path or URI of the target repository to
+                     republish packages to.  If not provided, the default
+                     value is the current working directory.  The target
+                     must already exist.  New repositories can be created
+                     using pkgrepo(1).
 
      -h              Display usage message.
 
      -k              Keep the retrieved package content compressed, ignored
-                     when republishing.  Should not be used with pkgsend.
+                     when republishing.  Should not be used with pkgsend(1).
 
      -m match        Controls matching behavior using the following values:
                          all-timestamps
@@ -50,9 +43,7 @@
                          all-versions
                              includes all matching versions, not just latest
 
-     -n              List the most recent versions of the packages available
-                     from the specified repository and exit.  (All other
-                     options except -s will be ignored.)
+     -n              Perform a trial run with no changes made.
 
      -r              Recursively retrieves all dependencies for the provided
                      list of packages.
@@ -60,20 +51,34 @@
      -s src_repo_uri A URI representing the location of a pkg(5) repository
                      from which to receive package data.
 
+     --newest        List the most recent versions of the packages available
+                     from the specified repository and exit.  (All other
+                     options except -s will be ignored.)
+
+     --raw           Retrieve and store the raw package data in a set of
+                     directory structures by stem and version at the location
+                     specified by -d.  May only be used with filesystem-
+                     based destinations.  This can be used with pkgsend(1)
+                     include to conveniently modify and republish packages,
+                     perhaps by correcting file contents or providing
+                     additional package metadata.
+
 EXAMPLES
      Example 1:  List newest packages available from the repository on
      the system named 'test'.
 
-     $ pkgrecv -s http://test -n
+     $ pkgrecv -s http://test --newest
      pkg:/[email protected],5.11-0.79:20080221T125720Z
      pkg:/[email protected],5.11-0.79:20080221T123955Z
      pkg:/[email protected],5.11-0.79:20080221T125728Z
      pkg:/[email protected],5.11-0.79:20080221T125730Z
 
      Example 2: Receive the SUNWlibC, SUNWfreetype, and SUNWlibm
-     packages from example 1.
+     packages from example 1 in a format suitable for use with
+     pkgsend(1) include.
 
-     $ pkgrecv -s http://test [email protected],5.11-0.79:20080221T125720Z
+     $ pkgrecv -s http://test [email protected],5.11-0.79:20080221T125720Z \
+         --raw
      [email protected],5.11-0.79:20080221T123955Z
      [email protected],5.11-0.79:20080221T125728Z
      $ ls -d SUNW*
@@ -92,7 +97,7 @@
      Example 5: Receive all versions of the package "SUNWvim" from the
      system named 'test' and republish it to a local repository.
 
-     $ pkgrecv -s http://test -d file:///local/repo SUNWvim
+     $ pkgrecv -s http://test -d /local/repo SUNWvim
 
      Example 6: Receive all versions of the package "SUNWzlib" from the
      system named 'test' and republish it to a remote repository on the
@@ -103,7 +108,12 @@
      Example 7: Receive the package "SUNWemacs" and all of its dependencies
      from the repository located at '/var/pkg/repo'.
 
-     $ pkgrecv -s file:///var/pkg/repo -r SUNWemacs
+     $ pkgrecv -s /var/pkg/repo -r SUNWemacs
+
+     Example 8: Receive all packages that do not already exist from the
+     repository located at 'http://example.com:10000':
+
+     $ pkgrecv -s http://example.com:10000 -d /my/pkg/repo '*'
 
 ENVIRONMENT VARIABLES
      The following environment variables are supported:
@@ -119,13 +129,14 @@
 EXIT STATUS
      The following exit values are returned:
 
-     0     Everything worked.
+     0     Command succeeded.
 
-     1     Something bad happened.
+     1     An error occurred.
 
      2     Invalid command line options were specified.
 
-FILES
+     3     Multiple operations were requested, but only some of them
+           succeeded.
 
 ATTRIBUTES
      See attributes(5) for descriptions of the following attributes:
@@ -136,7 +147,7 @@
     |_____________________________|_____________________________|
 
 SEE ALSO
-     pkgsend(1), attributes(5), pkg(5)
+     pkgrepo(1), pkgsend(1), attributes(5), pkg(5)
 
 NOTES
      The image packaging system is an under-development feature.
--- a/src/modules/client/progress.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/client/progress.py	Thu Oct 21 14:28:40 2010 -0700
@@ -21,8 +21,7 @@
 #
 
 #
-# Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
-# Use is subject to license terms.
+# Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
 #
 
 import errno
@@ -62,13 +61,14 @@
                 self.reset()
 
         def reset_download(self):
+                self.dl_started = False
                 self.dl_goal_nfiles = 0
                 self.dl_cur_nfiles = 0
                 self.dl_goal_nbytes = 0
                 self.dl_cur_nbytes = 0
                 self.dl_goal_npkgs = 0
                 self.dl_cur_npkgs = 0
-                self.dl_cur_pkg = "None"
+                self.cur_pkg = "None"
 
         def reset(self):
                 self.cat_cur_catalog = None
@@ -102,6 +102,10 @@
                 self.item_phase = "None"
                 self.item_phase_last = "None"
 
+                self.send_cur_nbytes = 0
+                self.send_goal_nbytes = 0
+                self.republish_started = False
+
                 # The tracker sets this to True whenever it has emitted
                 # output, but not yet written a newline. ProgressTracker
                 # users should call flush() when wanting to interrupt
@@ -183,7 +187,7 @@
                 self.dl_goal_nbytes = nbytes
 
         def download_start_pkg(self, pkgname):
-                self.dl_cur_pkg = pkgname
+                self.cur_pkg = pkgname
                 if self.dl_goal_nbytes != 0:
                         self.dl_output()
 
@@ -197,8 +201,12 @@
 
                 self.dl_cur_nbytes += nbytes
                 self.dl_cur_nfiles += nfiles
-                if self.dl_goal_nbytes != 0:
-                        self.dl_output()
+                if self.dl_started:
+                        if self.dl_goal_nbytes != 0:
+                                self.dl_output()
+                elif self.republish_started:
+                        if self.dl_goal_nbytes != 0:
+                                self.republish_output()
 
         def download_done(self):
                 """ Call when all downloading is finished """
@@ -274,6 +282,36 @@
                         self.item_output_done()
                 assert self.item_goal_nitems == self.item_cur_nitems
 
+        def republish_set_goal(self, npkgs, ngetbytes, nsendbytes):
+                self.item_cur_nitems = 0
+                self.item_goal_nitems = npkgs
+                self.dl_cur_nbytes = 0
+                self.dl_goal_nbytes = ngetbytes
+                self.send_cur_nbytes = 0
+                self.send_goal_nbytes = nsendbytes
+
+        def republish_start_pkg(self, pkgname):
+                self.cur_pkg = pkgname
+                if self.dl_goal_nbytes != 0:
+                        self.republish_output()
+
+        def republish_end_pkg(self):
+                self.item_cur_nitems += 1
+                if self.dl_goal_nbytes != 0:
+                        self.republish_output()
+
+        def upload_add_progress(self, nbytes):
+                """ Call to provide news that the upload has made progress """
+
+                self.send_cur_nbytes += nbytes
+                if self.send_goal_nbytes != 0:
+                        self.republish_output()
+
+        def republish_done(self):
+                """ Call when all downloading is finished """
+                if self.dl_goal_nbytes != 0:
+                        self.republish_output_done()
+
         #
         # This set of methods should be regarded as abstract *and* protected.
         # If you aren't in this class hierarchy, these should not be
@@ -376,6 +414,14 @@
                 raise NotImplementedError("item_output_done() not implemented "
                     "in superclass")
 
+        def republish_output(self):
+                raise NotImplementedError("republish_output() not implemented "
+                    "in superclass")
+
+        def republish_output_done(self):
+                raise NotImplementedError("republish_output_done() not "
+                    "implemented in superclass")
+
         def flush(self):
                 raise NotImplementedError("flush() not implemented in "
                     "superclass")
@@ -464,9 +510,16 @@
         def item_output_done(self):
                 return
 
+        def republish_output(self):
+                return
+
+        def republish_output_done(self):
+                return
+
         def flush(self):
                 return
 
+
 class NullProgressTracker(QuietProgressTracker):
         """ This ProgressTracker is a subclass of QuietProgressTracker
             because that's convenient for now.  It is semantically intended to
@@ -487,7 +540,7 @@
 
         def __init__(self):
                 ProgressTracker.__init__(self)
-                self.dl_last_printed_pkg = None
+                self.last_printed_pkg = None
 
         def cat_output_start(self):
                 return
@@ -531,14 +584,14 @@
         def ver_output_done(self):
                 return
 
-        def dl_output(self):
+        def __generic_pkg_output(self, pkg_line):
                 try:
                         # The first time, emit header.
-                        if self.dl_cur_pkg != self.dl_last_printed_pkg:
-                                if self.dl_last_printed_pkg != None:
+                        if self.cur_pkg != self.last_printed_pkg:
+                                if self.last_printed_pkg != None:
                                         print _("Done")
-                                print _("Download: %s ... ") % (self.dl_cur_pkg),
-                                self.dl_last_printed_pkg = self.dl_cur_pkg
+                                print pkg_line % self.cur_pkg,
+                                self.last_printed_pkg = self.cur_pkg
                                 self.needs_cr = True
                         sys.stdout.flush()
                 except IOError, e:
@@ -546,6 +599,12 @@
                                 raise PipeError, e
                         raise
 
+        def dl_output(self):
+                self.__generic_pkg_output(_("Download: %s ... "))
+
+        def republish_output(self):
+                self.__generic_pkg_output(_("Republish : %s ... "))
+
         def __generic_done(self):
                 try:
                         print _("Done")
@@ -559,6 +618,9 @@
         def dl_output_done(self):
                 self.__generic_done()
 
+        def republish_output_done(self):
+                self.__generic_done()
+
         def __generic_output(self, phase_attr, last_phase_attr, force=False):
                 pattr = getattr(self, phase_attr)
                 last_pattr = getattr(self, last_phase_attr)
@@ -644,7 +706,6 @@
                                 self.cr = '\r'
                         else:
                                 raise ProgressTrackerException()
-                self.dl_started = False
                 self.spinner = 0
                 self.spinner_chars = "/-\|"
                 self.curstrlen = 0
@@ -837,9 +898,9 @@
                                 print self.cr,
                                 self.needs_cr = True
 
-                        pkg_name = self.dl_cur_pkg
+                        pkg_name = self.cur_pkg
                         if len(pkg_name) > 38:
-                                pkg_name = pkg_name[:34] + "..."
+                                pkg_name = "..." + pkg_name[-34:]
 
                         s = "%-38.38s %7s %11s %12s" % \
                             (pkg_name,
@@ -857,7 +918,7 @@
                         raise
 
         def dl_output_done(self):
-                self.dl_cur_pkg = "Completed"
+                self.cur_pkg = "Completed"
                 self.dl_output(force=True)
 
                 # Reset.
@@ -875,6 +936,64 @@
                                 raise PipeError, e
                         raise
 
+        def republish_output(self, force=False):
+                if self.republish_started and not force and \
+                    (time.time() - self.last_print_time) < self.TERM_DELAY:
+                        return
+
+                self.last_print_time = time.time()
+
+                try:
+                        # The first time, emit header.
+                        if not self.republish_started:
+                                self.republish_started = True
+                                print "%-40s %12s %12s %12s" % (_("PROCESS"),
+                                    _("ITEMS"), _("GET (MB)"), _("SEND (MB)"))
+                        else:
+                                print self.cr,
+                                self.needs_cr = True
+
+                        pkg_name = self.cur_pkg
+                        if len(pkg_name) > 40:
+                                pkg_name = "..." + pkg_name[-37:]
+
+                        s = "%-40.40s %12s %12s %12s" % \
+                            (pkg_name,
+                            "%d/%d" % (self.item_cur_nitems,
+                                self.item_goal_nitems),
+                            "%.1f/%.1f" % \
+                                ((self.dl_cur_nbytes / 1024.0 / 1024.0),
+                                (self.dl_goal_nbytes / 1024.0 / 1024.0)),
+                            "%.1f/%.1f" % \
+                                ((self.send_cur_nbytes / 1024.0 / 1024.0),
+                                (self.send_goal_nbytes / 1024.0 / 1024.0)))
+                        sys.stdout.write(s + self.clear_eol)
+                        self.needs_cr = True
+                        sys.stdout.flush()
+                except IOError, e:
+                        if e.errno == errno.EPIPE:
+                                raise PipeError, e
+                        raise
+
+        def republish_output_done(self):
+                self.cur_pkg = "Completed"
+                self.republish_output(force=True)
+
+                # Reset.
+                self.republish_started = False
+                self.spinner = 0
+                self.curstrlen = 0
+
+                try:
+                        print
+                        print
+                        self.needs_cr = False
+                        sys.stdout.flush()
+                except IOError, e:
+                        if e.errno == errno.EPIPE:
+                                raise PipeError, e
+                        raise
+
         def __generic_simple_done(self):
                 try:
                         print
--- a/src/modules/client/transport/engine.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/client/transport/engine.py	Thu Oct 21 14:28:40 2010 -0700
@@ -676,7 +676,8 @@
 
         def send_data(self, url, data=None, header=None, sslcert=None,
             sslkey=None, repourl=None, ccancel=None,
-            data_fobj=None, data_fp=None, failonerror=True):
+            data_fobj=None, data_fp=None, failonerror=True,
+            progclass=None, progtrack=None):
                 """Invoke the engine to retrieve a single URL.  
                 This routine sends the data in data, and returns the
                 server's response.  
@@ -690,7 +691,7 @@
                 fobj = fileobj.StreamingFileObj(url, self, ccancel=ccancel)
                 progfunc = None
 
-                if ccancel:
+                if ccancel and not progtrack and not progclass:
                         progfunc = fobj.get_progress_func()
 
                 t = TransportRequest(url, writefunc=fobj.get_write_func(),
@@ -698,7 +699,8 @@
                     httpmethod="POST", sslcert=sslcert, sslkey=sslkey,
                     repourl=repourl, progfunc=progfunc, uuid=fobj.uuid,
                     read_fobj=data_fobj, read_filepath=data_fp,
-                    failonerror=failonerror)
+                    failonerror=failonerror, progclass=progclass,
+                    progtrack=progtrack)
 
                 self.__req_q.appendleft(t)
 
--- a/src/modules/client/transport/repo.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/client/transport/repo.py	Thu Oct 21 14:28:40 2010 -0700
@@ -129,7 +129,8 @@
 
                 raise NotImplementedError
 
-        def publish_add(self, action, header=None, trans_id=None):
+        def publish_add(self, action, header=None, progtrack=None,
+            trans_id=None):
                 """The publish operation that adds content to a repository.
                 The action must be populated with a data property.
                 Callers may supply a header, and should supply a transaction
@@ -321,11 +322,13 @@
                     failonerror=failonerror)
 
         def _post_url(self, url, data=None, header=None, ccancel=None,
-            data_fobj=None, data_fp=None, failonerror=True):
+            data_fobj=None, data_fp=None, failonerror=True, progclass=None,
+            progtrack=None):
                 return self._engine.send_data(url, data=data, header=header,
                     repourl=self._url, ccancel=ccancel,
                     data_fobj=data_fobj, data_fp=data_fp,
-                    failonerror=failonerror)
+                    failonerror=failonerror, progclass=progclass,
+                    progtrack=progtrack)
 
         def __check_response_body(self, fobj):
                 """Parse the response body found accessible using the provided
@@ -371,9 +374,9 @@
                 pub_prefix = getattr(pub, "prefix", None)
                 if pub_prefix and not methodstr.startswith("open/") and \
                     not base.endswith("/%s/" % pub_prefix) and \
-                    self.supports_version("publisher", [1]):
+                    self.supports_version("publisher", [1]) > -1:
                         # Append the publisher prefix to the repository URL.
-                        base = urlparse.urljoin(base, pub_prefix)
+                        base = urlparse.urljoin(base, pub_prefix) + "/"
 
                 uri = urlparse.urljoin(base, methodstr)
                 if not query:
@@ -650,7 +653,8 @@
 
                 return self._verdata is not None
 
-        def publish_add(self, action, header=None, trans_id=None):
+        def publish_add(self, action, header=None, progtrack=None,
+            trans_id=None):
                 """The publish operation that adds content to a repository.
                 The action must be populated with a data property.
                 Callers may supply a header, and should supply a transaction
@@ -659,6 +663,10 @@
                 attrs = action.attrs
                 data_fobj = None
                 data = None
+                progclass = None
+
+                if progtrack:
+                        progclass = FileProgress
 
                 baseurl = self.__get_request_url("add/0/")
                 request_str = "%s/%s" % (trans_id, action.name)
@@ -678,7 +686,8 @@
                         headers.update(header)
 
                 fobj = self._post_url(requesturl, header=headers,
-                    data_fobj=data_fobj, data=data, failonerror=False)
+                    data_fobj=data_fobj, data=data, failonerror=False,
+                    progclass=progclass, progtrack=progtrack)
                 self.__check_response_body(fobj)
 
         def publish_add_file(self, pth, header=None, trans_id=None):
@@ -880,7 +889,7 @@
         def publish_refresh_indexes(self, header=None, pub=None):
                 """Attempt to refresh the search data in the repository."""
 
-                if self.supports_version("admin", [0]):
+                if self.supports_version("admin", [0]) > -1:
                         requesturl = self.__get_request_url("admin/0", query={
                             "cmd": "refresh-indexes" }, pub=pub)
                 else:
@@ -1444,7 +1453,8 @@
 
                 return self._verdata is not None
 
-        def publish_add(self, action, header=None, trans_id=None):
+        def publish_add(self, action, header=None, progtrack=None,
+            trans_id=None):
                 """The publish operation that adds an action and its
                 payload (if applicable) to an existing transaction in a
                 repository.  The action must be populated with a data property.
@@ -1454,10 +1464,21 @@
                 # Calling any publication operation sets read_only to False.
                 self._frepo.read_only = False
 
+                progclass = None
+                if progtrack:
+                        progclass = FileProgress
+                        progtrack = progclass(progtrack)
+
                 try:
                         self._frepo.add(trans_id, action)
                 except svr_repo.RepositoryError, e:
+                        if progtrack:
+                                progtrack.abort()
                         raise tx.TransportOperationError(str(e))
+                else:
+                        if progtrack:
+                                sz = int(action.attrs.get("pkg.size", 0))
+                                progtrack.progress_callback(0, 0, sz, sz)
 
         def publish_add_file(self, pth, header=None, trans_id=None):
                 """The publish operation that adds a file to an existing
@@ -1678,6 +1699,8 @@
                 ProgressCallback.__init__(self, progtrack)
                 self.dltotal = 0
                 self.dlcurrent = 0
+                self.ultotal = 0
+                self.ulcurrent = 0
                 self.completed = False
 
         def abort(self):
@@ -1685,6 +1708,7 @@
                 by this file from the ProgressTracker."""
 
                 self.progtrack.download_add_progress(0, -self.dlcurrent)
+                self.progtrack.upload_add_progress(-self.ulcurrent)
                 self.completed = True
 
         def commit(self, size):
@@ -1719,6 +1743,14 @@
                         self.dlcurrent += new_progress
                         self.progtrack.download_add_progress(0, new_progress)
 
+                if self.ultotal != ultot:
+                        self.ultotal = ultot
+
+                new_progress = int(ulcur - self.ulcurrent)
+                if new_progress > 0:
+                        self.ulcurrent += new_progress
+                        self.progtrack.upload_add_progress(new_progress)
+
                 return 0
 
 
--- a/src/modules/client/transport/transport.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/client/transport/transport.py	Thu Oct 21 14:28:40 2010 -0700
@@ -2115,7 +2115,8 @@
                             (chash, newhash), size=s.st_size)
 
         @LockedTransport()
-        def publish_add(self, pub, action=None, trans_id=None):
+        def publish_add(self, pub, action=None, ccancel=None, progtrack=None,
+            trans_id=None):
                 """Perform the 'add' publication operation to the publisher
                 supplied in pub.  The transaction-id is passed in trans_id."""
 
@@ -2123,11 +2124,14 @@
                 retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
                 header = self.__build_header(uuid=self.__get_uuid(pub))
 
+                if progtrack and ccancel:
+                        progtrack.check_cancelation = ccancel
+
                 for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
                     single_repository=True, operation="add", versions=[0]):
                         try:
                                 d.publish_add(action, header=header,
-                                    trans_id=trans_id)
+                                    progtrack=progtrack, trans_id=trans_id)
                                 return
                         except tx.ExcessiveTransientFailure, ex:
                                 # If an endpoint experienced so many failures
--- a/src/modules/publish/transaction.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/publish/transaction.py	Thu Oct 21 14:28:40 2010 -0700
@@ -131,10 +131,12 @@
         purposes."""
 
         def __init__(self, origin_url, create_repo=False, pkg_name=None,
-            repo_props=EmptyDict, trans_id=None, xport=None, pub=None):
+            repo_props=EmptyDict, trans_id=None, xport=None, pub=None,
+            progtrack=None):
                 self.create_repo = create_repo
                 self.origin_url = origin_url
                 self.pkg_name = pkg_name
+                self.progtrack = progtrack
                 self.trans_id = trans_id
 
         def add(self, action):
@@ -195,7 +197,8 @@
         """Provides a publishing interface that uses client transport."""
 
         def __init__(self, origin_url, create_repo=False, pkg_name=None,
-            repo_props=EmptyDict, trans_id=None, xport=None, pub=None):
+            repo_props=EmptyDict, trans_id=None, xport=None, pub=None,
+            progtrack=None):
 
                 scheme, netloc, path, params, query, fragment = \
                     urlparse.urlparse(origin_url, "http", allow_fragments=0)
@@ -204,6 +207,7 @@
                 self.trans_id = trans_id
                 self.scheme = scheme
                 self.path = path
+                self.progtrack = progtrack
                 self.transport = xport
                 self.publisher = pub
 
@@ -267,7 +271,8 @@
 
                 try:
                         self.transport.publish_add(self.publisher,
-                            action=action, trans_id=self.trans_id)
+                            action=action, trans_id=self.trans_id,
+                            progtrack=self.progtrack)
                 except apx.TransportError, e:
                         msg = str(e)
                         raise TransactionOperationError("add",
@@ -421,7 +426,7 @@
 
         def __new__(cls, origin_url, create_repo=False, pkg_name=None,
             repo_props=EmptyDict, trans_id=None, noexecute=False, xport=None,
-            pub=None):
+            pub=None, progtrack=None):
 
                 scheme, netloc, path, params, query, fragment = \
                     urlparse.urlparse(origin_url, "http", allow_fragments=0)
@@ -459,4 +464,4 @@
                 return cls.__schemes[scheme](origin_url,
                     create_repo=create_repo, pkg_name=pkg_name,
                     repo_props=repo_props, trans_id=trans_id, xport=xport,
-                    pub=pub)
+                    pub=pub, progtrack=progtrack)
--- a/src/modules/server/depot.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/server/depot.py	Thu Oct 21 14:28:40 2010 -0700
@@ -417,6 +417,7 @@
 
         def refresh(self):
                 """Catch SIGUSR1 and reload the depot information."""
+                old_pubs = self.repo.publishers
                 self.repo.reload()
                 if type(self.cfg) == cfg.SMFConfig:
                         # For all other cases, reloading depot configuration
@@ -426,6 +427,9 @@
                 # Handles the BUI (Browser User Interface).
                 face.init(self)
 
+                # Map new publishers into operation space.
+                map(self.__map_pub_ops, self.repo.publishers - old_pubs)
+
         def __map_pub_ops(self, pub_prefix):
                 # Map the publisher into the depot's operation namespace if
                 # needed.
@@ -2107,21 +2111,21 @@
 
 class BackgroundTaskPlugin(SimplePlugin):
         """This class allows background task execution for the depot server.  It
-        is designed in such a way as to only allow a single task to be queued
+        is designed in such a way as to only allow a few tasks to be queued
         for execution at a time.
         """
 
         def __init__(self, bus):
                 # Setup the background task queue.
                 SimplePlugin.__init__(self, bus)
-                self.__q = Queue.Queue(1)
+                self.__q = Queue.Queue(10)
                 self.__thread = None
 
         def put(self, task, *args, **kwargs):
-                """Schedule the given task for background execution if one
-                is not already.
+                """Schedule the given task for background execution if queue
+                isn't full.
                 """
-                if self.__q.unfinished_tasks:
+                if self.__q.unfinished_tasks > 9:
                         raise Queue.Full()
                 self.__q.put_nowait((task, args, kwargs))
 
--- a/src/publish.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/publish.py	Thu Oct 21 14:28:40 2010 -0700
@@ -50,6 +50,7 @@
 import pkg.client.api_errors as apx
 import pkg.fmri
 import pkg.manifest
+import pkg.misc as misc
 import pkg.publish.transaction as trans
 import pkg.client.transport.transport as transport
 import pkg.client.publisher as publisher
@@ -124,7 +125,7 @@
                         repo_props.setdefault(p_sec, {})
                         repo_props[p_sec][p_name] = p_value
 
-        xport, pub = setup_transport_and_pubs(repo_uri)
+        xport, pub = setup_transport_and_pubs(repo_uri, remote=False)
 
         try:
                 trans.Transaction(repo_uri, create_repo=True,
@@ -581,19 +582,16 @@
                 return 1
         return 0
 
-def setup_transport_and_pubs(repo_uri):
+def setup_transport_and_pubs(repo_uri, remote=True):
+
+        if repo_uri.startswith("null:"):
+                return None, None
 
-        try:
-                repo = publisher.Repository(origins=[repo_uri])
-                pub = publisher.Publisher(prefix="default", repositories=[repo])
-                xport = transport.Transport(transport.GenericTransportCfg(
-                    publishers=[pub]))
-        except apx.UnsupportedRepositoryURI:
-                if repo_uri.startswith("null:"):
-                        return None, None
-                raise
+        xport, xport_cfg = transport.setup_transport()
+        targ_pub = transport.setup_publisher(repo_uri, "default",
+            xport, xport_cfg, remote_prefix=remote)
 
-        return xport, pub
+        return xport, targ_pub
 
 def main_func():
         gettext.install("pkg", "/usr/share/locale")
@@ -615,6 +613,9 @@
         except getopt.GetoptError, e:
                 usage(_("pkgsend: illegal global option -- %s") % e.opt)
 
+        if repo_uri and not repo_uri.startswith("null:"):
+                repo_uri = misc.parse_uri(repo_uri)
+
         subcommand = None
         if pargs:
                 subcommand = pargs.pop(0)
--- a/src/pull.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/pull.py	Thu Oct 21 14:28:40 2010 -0700
@@ -56,11 +56,12 @@
 cache_dir = None
 complete_catalog = None
 download_start = False
-repo_cache = {}
 tmpdirs = []
 temp_root = None
 xport = None
 xport_cfg = None
+dest_xport = None
+targ_pub = None
 
 def error(text):
         """Emit an error message prefixed by the command name """
@@ -87,9 +88,9 @@
 
         msg(_("""\
 Usage:
-        pkgrecv [-s src_repo_uri] [-d (path|dest_uri)] [-kr] [-m match]
-            (fmri|pattern) ...
-        pkgrecv [-s src_repo_uri] -n
+        pkgrecv [-s src_repo_uri] [-d (path|dest_uri)] [-kr] [-m match] [-n]
+            [--raw] (fmri|pattern) ...
+        pkgrecv [-s src_repo_uri] --newest
 
 Options:
         -c cache_dir    The path to a directory that will be used to cache
@@ -99,32 +100,43 @@
                         cache directory was automatically chosen, use this
                         option to resume the download.
 
-        -d path_or_uri  The path of a directory to save the retrieved package
-                        to, or the URI of a repository to republish it to.  If
-                        not provided, the default value is the current working
-                        directory.  If a directory path is provided, then
-                        package content will only be retrieved if it does not
-                        already exist in the target directory.  If a repository
-                        URI is provided, a temporary directory will be created
-                        and all of the package data retrieved before attempting
-                        to republish it.
+        -d path_or_uri  The filesystem path or URI of the target repository to
+                        republish packages to.  If not provided, the default
+                        value is the current working directory.  The target
+                        must already exist.  New repositories can be created
+                        using pkgrepo(1).
 
         -h              Display this usage message.
+
         -k              Keep the retrieved package content compressed, ignored
                         when republishing.  Should not be used with pkgsend.
+
         -m match        Controls matching behaviour using the following values:
                             all-timestamps
                                 includes all matching timestamps, not just
                                 latest (implies all-versions)
                             all-versions
                                 includes all matching versions, not just latest
-        -n              List the most recent versions of the packages available
+
+        -n              Perform a trial run with no changes made.
+
+        -r              Recursively evaluates all dependencies for the provided
+                        list of packages and adds them to the list.
+
+        -s src_repo_uri A URI representing the location of a pkg(5)
+                        repository to retrieve package data from.
+
+        --newest        List the most recent versions of the packages available
                         from the specified repository and exit.  (All other
                         options except -s will be ignored.)
-        -r              Recursively evaluates all dependencies for the provided
-                        list of packages and adds them to the list.
-        -s src_repo_uri A URI representing the location of a pkg(5)
-                        repository to retrieve package data from.
+
+        --raw           Retrieve and store the raw package data in a set of
+                        directory structures by stem and version at the location
+                        specified by -d.  May only be used with filesystem-
+                        based destinations.  This can be used with pkgsend(1)
+                        include to conveniently modify and republish packages,
+                        perhaps by correcting file contents or providing
+                        additional package metadata.
 
 Environment:
         PKG_DEST        Destination directory or repository URI
@@ -145,6 +157,14 @@
                         continue
                 shutil.rmtree(d, True)
 
+        if caller_error and dest_xport and targ_pub:
+                try:
+                        dest_xport.publish_refresh_packages(targ_pub)
+                except apx.TransportError:
+                        # If this fails, ignore it as this was a last ditch
+                        # attempt anyway.
+                        pass
+
 def abort(err=None, retcode=1):
         """To be called when a fatal error is encountered."""
 
@@ -153,7 +173,7 @@
                 msg("")
                 error(err)
 
-        cleanup()
+        cleanup(caller_error=True)
         sys.exit(retcode)
 
 def get_tracker(quiet=False):
@@ -188,32 +208,6 @@
                 return m.tostr_unsorted()
         return m
 
-def get_repo(uri):
-        if uri in repo_cache:
-                return repo_cache[uri]
-
-        parts = urlparse.urlparse(uri, "file", allow_fragments=0)
-        path = urllib.url2pathname(parts[2])
-
-        try:
-                repo = sr.Repository(read_only=True, root=path)
-        except EnvironmentError, _e:
-                error("an error occurred while trying to " \
-                    "initialize the repository directory " \
-                    "structures:\n%s" % _e)
-                sys.exit(1)
-        except sr.RepositoryError, _e:
-                error(_e)
-                sys.exit(1)
-        except cfg.ConfigError, _e:
-                error("repository configuration error: %s" % _e)
-                sys.exit(1)
-        except (search_errors.IndexingException, apx.PermissionsException), _e:
-                emsg(str(_e), "INDEX")
-                sys.exit(1)
-        repo_cache[uri] = repo
-        return repo
-
 def expand_fmri(pfmri, constraint=version.CONSTRAINT_AUTO):
         """Find matching fmri using CONSTRAINT_AUTO cache for performance.
         Returns None if no matching fmri is found."""
@@ -239,30 +233,10 @@
         except pkg.fmri.FmriError, e:
                 abort(err=e)
 
-        # XXX publisher prefixes have to be stripped for catalog matching
-        # for now; awaits v1 client support, etc.
-        pattern_pubs = {}
-        for f in patterns:
-                if f.publisher:
-                        pattern_pubs[f.get_fmri(anarchy=True)] = f.publisher
-                        f.publisher = None
-
-        matches, unmatched = catalog.extract_matching_fmris(fmri_list,
+        return catalog.extract_matching_fmris(fmri_list,
             patterns=patterns, constraint=version.CONSTRAINT_AUTO,
             matcher=pkg.fmri.glob_match)
 
-        if unmatched:
-                match_err = apx.InventoryException(**unmatched)
-                emsg(match_err)
-                abort()
-
-        # XXX restore stripped publisher information.
-        for m in matches:
-                pub = pattern_pubs.pop(str(m), None)
-                if pub:
-                        m.publisher = pub
-        return matches
-
 def get_dependencies(src_uri, fmri_list, basedir, tracker):
 
         old_limit = sys.getrecursionlimit()
@@ -294,18 +268,18 @@
 
 def add_hashes_to_multi(mfst, multi):
         """Takes a manifest and a multi object. Adds the hashes to the
-        multi object, returns (nfiles, nbytes) tuple."""
+        multi object, returns (get_bytes, send_bytes) tuple."""
 
-        nf = 0
-        nb = 0
+        getb = 0
+        sendb = 0
 
         for atype in ("file", "license"):
                 for a in mfst.gen_actions_by_type(atype):
                         if a.needsdata(None, None):
                                 multi.add_action(a)
-                                nf += 1
-                                nb += get_pkg_otw_size(a)
-        return nf, nb
+                                getb += get_pkg_otw_size(a)
+                                sendb += int(a.attrs.get("pkg.size", 0))
+        return getb, sendb
 
 def prune(fmri_list, all_versions, all_timestamps):
         """Returns a filtered version of fmri_list based on the provided
@@ -345,7 +319,7 @@
                 fm_list.append(l[0])
 
         for e in fm_list:
-                msg(e.get_fmri(anarchy=True))
+                msg(e.get_fmri())
 
 def fetch_catalog(src_pub, tracker, txport):
         """Fetch the catalog from src_uri."""
@@ -356,12 +330,20 @@
 
         if not src_pub.meta_root:
                 # Create a temporary directory for catalog.
-                cat_dir = tempfile.mkdtemp(dir=temp_root)
+                cat_dir = tempfile.mkdtemp(dir=temp_root,
+                    prefix=global_settings.client_name + "-")
                 tmpdirs.append(cat_dir)
                 src_pub.meta_root = cat_dir
 
         src_pub.transport = txport
-        src_pub.refresh(True, True)
+        try:
+                src_pub.refresh(True, True)
+        except apx.TransportError, e:
+                # Assume that a catalog doesn't exist for the target publisher,
+                # and drive on.  If there was an actual failure due to a
+                # transport issue, let the failure happen whenever some other
+                # operation is attempted later.
+                return []
 
         cat = src_pub.catalog
 
@@ -378,9 +360,10 @@
         return fmri_list
 
 def main_func():
-        global cache_dir, download_start, xport, xport_cfg
+        global cache_dir, download_start, xport, xport_cfg, dest_xport, targ_pub
         all_timestamps = False
         all_versions = False
+        dry_run = False
         keep_compressed = False
         list_newest = False
         recursive = False
@@ -388,7 +371,7 @@
         target = None
         incoming_dir = None
         src_pub = None
-        targ_pub = None
+        raw = False
 
         temp_root = misc.config_temp_root()
 
@@ -399,7 +382,8 @@
         src_uri = os.environ.get("PKG_SRC", None)
 
         try:
-                opts, pargs = getopt.getopt(sys.argv[1:], "c:d:hkm:nrs:")
+                opts, pargs = getopt.getopt(sys.argv[1:], "c:d:hkm:nrs:",
+                    ["newest", "raw"])
         except getopt.GetoptError, e:
                 usage(_("Illegal option -- %s") % e.opt)
 
@@ -412,12 +396,6 @@
                         usage(retcode=0)
                 elif opt == "-k":
                         keep_compressed = True
-                elif opt == "-n":
-                        list_newest = True
-                elif opt == "-r":
-                        recursive = True
-                elif opt == "-s":
-                        src_uri = arg
                 elif opt == "-m":
                         if arg == "all-timestamps":
                                 all_timestamps = True
@@ -425,17 +403,31 @@
                                 all_versions = True
                         else:
                                 usage(_("Illegal option value -- %s") % arg)
+                elif opt == "-n":
+                        dry_run = True
+                elif opt == "-r":
+                        recursive = True
+                elif opt == "-s":
+                        src_uri = arg
+                elif opt == "--newest":
+                        list_newest = True
+                elif opt == "--raw":
+                        raw = True
 
         if not src_uri:
                 usage(_("a source repository must be provided"))
+        else:
+                src_uri = misc.parse_uri(src_uri)
 
         if not cache_dir:
-                cache_dir = tempfile.mkdtemp(dir=temp_root)
+                cache_dir = tempfile.mkdtemp(dir=temp_root,
+                    prefix=global_settings.client_name + "-")
                 # Only clean-up cache dir if implicitly created by pkgrecv.
                 # User's cache-dirs should be preserved
                 tmpdirs.append(cache_dir)
 
-        incoming_dir = tempfile.mkdtemp(dir=temp_root)
+        incoming_dir = tempfile.mkdtemp(dir=temp_root,
+            prefix=global_settings.client_name + "-")
         tmpdirs.append(incoming_dir)
 
         # Create transport and transport config
@@ -451,200 +443,286 @@
         dest_xport_cfg.add_cache(cache_dir, readonly=False)
         dest_xport_cfg.incoming_root = incoming_dir
 
-        # Configure src publisher
-        src_pub = transport.setup_publisher(src_uri, "source", xport, xport_cfg,
+        # Configure src publisher(s).
+        transport.setup_publisher(src_uri, "source", xport, xport_cfg,
             remote_prefix=True)
 
-        tracker = get_tracker()
-        if list_newest:
-                if pargs or len(pargs) > 0:
-                        usage(_("-n takes no options"))
+        any_unmatched = []
+        total_processed = 0
+        for src_pub in xport_cfg.gen_publishers():
+                tracker = get_tracker()
+                if list_newest:
+                        if pargs or len(pargs) > 0:
+                                usage(_("-n takes no options"))
+
+                        fmri_list = fetch_catalog(src_pub, tracker, xport)
+                        list_newest_fmris(fmri_list)
+                        continue
+
+                msg(_("Processing packages for publisher %s ...") %
+                    src_pub.prefix)
+                if pargs == None or len(pargs) == 0:
+                        usage(_("must specify at least one pkgfmri"))
 
-                fmri_list = fetch_catalog(src_pub, tracker, xport)
-                list_newest_fmris(fmri_list)
-                return 0
+                republish = False
 
-        if pargs == None or len(pargs) == 0:
-                usage(_("must specify at least one pkgfmri"))
+                if not target:
+                        target = basedir = os.getcwd()
+                elif target and not raw:
+                        basedir = tempfile.mkdtemp(dir=temp_root,
+                            prefix=global_settings.client_name + "-")
+                        tmpdirs.append(basedir)
+                        republish = True
+
+                        # Turn target into a valid URI.
+                        target = misc.parse_uri(target)
+
+                        # Setup target for transport.
+                        targ_pub = transport.setup_publisher(target,
+                            src_pub.prefix, dest_xport, dest_xport_cfg)
 
-        republish = False
+                        # Files have to be decompressed for republishing.
+                        keep_compressed = False
+                        if target.startswith("file://"):
+                                # Check to see if the repository exists first.
+                                try:
+                                        t = trans.Transaction(target,
+                                            xport=dest_xport, pub=targ_pub)
+                                except trans.TransactionRepositoryInvalidError, e:
+                                        txt = str(e) + "\n\n"
+                                        txt += _("To create a repository, use "
+                                            "the pkgsend command.")
+                                        abort(err=txt)
+                                except trans.TransactionRepositoryConfigError, e:
+                                        txt = str(e) + "\n\n"
+                                        txt += _("The repository configuration "
+                                            "for the repository located at "
+                                            "'%s' is not valid or the "
+                                            "specified path does not exist.  "
+                                            "Please correct the configuration "
+                                            "of the repository or create a new "
+                                            "one.") % target
+                                        abort(err=txt)
+                                except trans.TransactionError, e:
+                                        abort(err=e)
+                else:
+                        basedir = target
+                        if not os.path.exists(basedir):
+                                try:
+                                        os.makedirs(basedir, misc.PKG_DIR_MODE)
+                                except Exception, e:
+                                        error(_("Unable to create basedir "
+                                            "'%s': %s") % (basedir, e))
+                                        abort()
 
-        if not target:
-                target = basedir = os.getcwd()
-        elif target.find("://") != -1:
-                basedir = tempfile.mkdtemp(dir=temp_root)
-                tmpdirs.append(basedir)
-                republish = True
-
-                targ_pub = transport.setup_publisher(target, "target",
-                    dest_xport, dest_xport_cfg, remote_prefix=True)
+                xport_cfg.pkg_root = basedir
+                dest_xport_cfg.pkg_root = basedir
 
-                # Files have to be decompressed for republishing.
-                keep_compressed = False
-                if target.startswith("file://"):
-                        # Check to see if the repository exists first.
-                        try:
-                                t = trans.Transaction(target, xport=dest_xport,
-                                    pub=targ_pub)
-                        except trans.TransactionRepositoryInvalidError, e:
-                                txt = str(e) + "\n\n"
-                                txt += _("To create a repository, use the "
-                                    "pkgsend command.")
-                                abort(err=txt)
-                        except trans.TransactionRepositoryConfigError, e:
-                                txt = str(e) + "\n\n"
-                                txt += _("The repository configuration for "
-                                    "the repository located at '%s' is not "
-                                    "valid or the specified path does not "
-                                    "exist.  Please correct the configuration "
-                                    "of the repository or create a new "
-                                    "one.") % target
-                                abort(err=txt)
-                        except trans.TransactionError, e:
-                                abort(err=e)
-        else:
-                basedir = target
-                if not os.path.exists(basedir):
-                        try:
-                                os.makedirs(basedir, misc.PKG_DIR_MODE)
-                        except:
-                                error(_("Unable to create basedir '%s'.") % \
-                                    basedir)
-                                return 1
+                if republish:
+                        targ_fmris = fetch_catalog(targ_pub, tracker, dest_xport)
+
+                all_fmris = fetch_catalog(src_pub, tracker, xport)
+                fmri_arguments = pargs
+                matches, unmatched = expand_matching_fmris(all_fmris,
+                    fmri_arguments)
+
+                # Track anything that failed to match.
+                any_unmatched.append(unmatched)
+                if not matches:
+                        # No matches at all; nothing to do for this publisher.
+                        continue
+
+                fmri_list = prune(list(set(matches)), all_versions,
+                    all_timestamps)
+
+                if recursive:
+                        msg(_("Retrieving manifests for dependency "
+                            "evaluation ..."))
+                        tracker.evaluate_start()
+                        fmri_list = prune(get_dependencies(src_uri, fmri_list,
+                            basedir, tracker), all_versions, all_timestamps)
+                        tracker.evaluate_done()
+
+                def get_basename(pfmri):
+                        open_time = pfmri.get_timestamp()
+                        return "%d_%s" % \
+                            (calendar.timegm(open_time.utctimetuple()),
+                            urllib.quote(str(pfmri), ""))
 
-        xport_cfg.pkg_root = basedir
-        dest_xport_cfg.pkg_root = basedir
+                # First, retrieve the manifests and calculate package transfer
+                # sizes.
+                npkgs = len(fmri_list)
+                get_bytes = 0
+                send_bytes = 0
 
-        if republish:
-                targ_fmris = fetch_catalog(targ_pub, tracker, dest_xport)
+                if not recursive:
+                        msg(_("Retrieving and evaluating %d package(s)...") %
+                            npkgs)
+
+                tracker.evaluate_start(npkgs=npkgs)
+                skipped = False
+                retrieve_list = []
+                while fmri_list:
+                        f = fmri_list.pop()
 
-        all_fmris = fetch_catalog(src_pub, tracker, xport)
-        fmri_arguments = pargs
-        fmri_list = prune(list(set(expand_matching_fmris(all_fmris,
-            fmri_arguments))), all_versions, all_timestamps)
+                        if republish and f in targ_fmris:
+                                if not skipped:
+                                        # Ensure a new line is output so message
+                                        # is on separate line from spinner.
+                                        msg("")
+                                msg(_("Skipping %s: already present "
+                                    "at destination") % f)
+                                skipped = True
+                                continue
 
-        if recursive:
-                msg(_("Retrieving manifests for dependency evaluation ..."))
-                tracker.evaluate_start()
-                fmri_list = prune(get_dependencies(src_uri, fmri_list, basedir,
-                    tracker), all_versions, all_timestamps)
+                        m = get_manifest(f, basedir)
+                        pkgdir = xport_cfg.get_pkg_dir(f)
+                        mfile = xport.multi_file_ni(src_pub, pkgdir,
+                            not keep_compressed, tracker)
+         
+                        getb, sendb = add_hashes_to_multi(m, mfile)
+                        get_bytes += getb
+                        if republish:
+                                send_bytes += sendb
+
+                        retrieve_list.append((f, mfile))
+
+                        tracker.evaluate_progress(fmri=f)
                 tracker.evaluate_done()
 
-        def get_basename(pfmri):
-                open_time = pfmri.get_timestamp()
-                return "%d_%s" % \
-                    (calendar.timegm(open_time.utctimetuple()),
-                    urllib.quote(str(pfmri), ""))
-
-        # First, retrieve the manifests and calculate package transfer sizes.
-        npkgs = len(fmri_list)
-        nfiles = 0
-        nbytes = 0
+                # Next, retrieve and store the content for each package.
+                tracker.republish_set_goal(len(retrieve_list), get_bytes,
+                    send_bytes)
 
-        if not recursive:
-                msg(_("Retrieving manifests for package evaluation ..."))
-
-        tracker.evaluate_start(npkgs=npkgs)
-        retrieve_list = []
-        while fmri_list:
-                f = fmri_list.pop()
-
-                if republish and f in targ_fmris:
-                        msg(_("Skipping %s: already present "
-                            "at destination") % f)
+                if dry_run:
+                        tracker.republish_done()
+                        cleanup()
                         continue
 
-                m = get_manifest(f, basedir)
-                pkgdir = xport_cfg.get_pkg_dir(f)
-                mfile = xport.multi_file_ni(src_pub, pkgdir,
-                    not keep_compressed, tracker)
- 
-                nf, nb = add_hashes_to_multi(m, mfile)
-                nfiles += nf
-                nbytes += nb
+                processed = 0
+                while retrieve_list:
+                        f, mfile = retrieve_list.pop()
+                        tracker.republish_start_pkg(f.pkg_name)
+
+                        if mfile:
+                                download_start = True
+                                mfile.wait_files()
 
-                retrieve_list.append((f, mfile))
+                        if not republish:
+                                # Nothing more to do for this package.
+                                tracker.republish_end_pkg()
+                                continue
+
+                        m = get_manifest(f, basedir)
 
-                tracker.evaluate_progress(fmri=f)
-        tracker.evaluate_done()
+                        # Get first line of original manifest so that inclusion
+                        # of the scheme can be determined.
+                        use_scheme = True
+                        contents = get_manifest(f, basedir, contents=True)
+                        if contents.splitlines()[0].find("pkg:/") == -1:
+                                use_scheme = False
 
-        # Next, retrieve and store the content for each package.
-        msg(_("Retrieving package content ..."))
-        tracker.download_set_goal(len(retrieve_list), nfiles, nbytes)
+                        pkg_name = f.get_fmri(include_scheme=use_scheme)
+                        pkgdir = xport_cfg.get_pkg_dir(f)
 
-        publish_list = []
-        while retrieve_list:
-                f, mfile = retrieve_list.pop()
-                tracker.download_start_pkg(f.get_fmri(include_scheme=False))
+                        # This is needed so any previous failures for a package
+                        # can be aborted.
+                        trans_id = get_basename(f)
 
-                if mfile:
-                        mfile.wait_files()
-                        if not download_start:
-                                download_start = True
+                        if not targ_pub:
+                                targ_pub = transport.setup_publisher(target,
+                                    src_pub.prefix, dest_xport, dest_xport_cfg,
+                                    remote_prefix=True)
+
+                        try:
+                                t = trans.Transaction(target, pkg_name=pkg_name,
+                                    trans_id=trans_id, xport=dest_xport,
+                                    pub=targ_pub, progtrack=tracker)
 
-                if republish:
-                        publish_list.append(f)
-                tracker.download_end_pkg()
-        tracker.download_done()
-        tracker.reset()
+                                # Remove any previous failed attempt to
+                                # to republish this package.
+                                try:
+                                        t.close(abandon=True)
+                                except:
+                                        # It might not exist already.
+                                        pass
 
-        # Finally, republish the packages if needed.
-        while publish_list:
-                f = publish_list.pop()
-                msg(_("Republishing %s ...") % f)
-
-                m = get_manifest(f, basedir)
+                                t.open()
+                                for a in m.gen_actions():
+                                        if a.name == "set" and \
+                                            a.attrs.get("name", "") in ("fmri",
+                                            "pkg.fmri"):
+                                                # To be consistent with the
+                                                # server, the fmri can't be
+                                                # added to the manifest.
+                                                continue
 
-                # Get first line of original manifest so that inclusion of the
-                # scheme can be determined.
-                use_scheme = True
-                contents = get_manifest(f, basedir, contents=True)
-                if contents.splitlines()[0].find("pkg:/") == -1:
-                        use_scheme = False
-
-                pkg_name = f.get_fmri(include_scheme=use_scheme)
-                pkgdir = xport_cfg.get_pkg_dir(f)
+                                        if hasattr(a, "hash"):
+                                                fname = os.path.join(pkgdir,
+                                                    a.hash)
+                                                a.data = lambda: open(fname,
+                                                    "rb")
+                                        t.add(a)
+                                # Always defer catalog update.
+                                t.close(add_to_catalog=False)
+                        except trans.TransactionError, e:
+                                abort(err=e)
 
-                # This is needed so any previous failures for a package
-                # can be aborted.
-                trans_id = get_basename(f)
-
-                if not targ_pub:
-                        targ_pub = transport.setup_publisher(target, "target",
-                            dest_xport, dest_xport_cfg, remote_prefix=True)
-
-                try:
-                        t = trans.Transaction(target, pkg_name=pkg_name,
-                            trans_id=trans_id, xport=dest_xport, pub=targ_pub)
-
-                        # Remove any previous failed attempt to
-                        # to republish this package.
+                        # Dump data retrieved so far after each successful
+                        # republish to conserve space.
                         try:
-                                t.close(abandon=True)
-                        except:
-                                # It might not exist already.
-                                pass
+                                shutil.rmtree(dest_xport_cfg.incoming_root)
+                        except EnvironmentError, e:
+                                raise apx._convert_error(e)
+                        misc.makedirs(dest_xport_cfg.incoming_root)
+
+                        processed += 1
+                        tracker.republish_end_pkg()
+
+                tracker.republish_done()
+                tracker.reset()
+
+                if processed > 0:
+                        # If any packages were published, trigger an update of
+                        # the catalog.
+                        total_processed += processed
+                        dest_xport.publish_refresh_packages(targ_pub)
+
+                # Prevent further use.
+                targ_pub = None
 
-                        t.open()
-                        for a in m.gen_actions():
-                                if a.name == "set" and \
-                                    a.attrs.get("name", "") in ("fmri",
-                                    "pkg.fmri"):
-                                        # To be consistent with the server,
-                                        # the fmri can't be added to the
-                                        # manifest.
-                                        continue
+        # Find the intersection of patterns that failed to match.
+        unmatched = {}
+        for pub_unmatched in any_unmatched:
+                if not pub_unmatched:
+                        # If any publisher matched all patterns, then treat
+                        # the operation as successful.
+                        unmatched = {}
+                        break
 
-                                if hasattr(a, "hash"):
-                                        fname = os.path.join(pkgdir,
-                                            a.hash)
-                                        a.data = lambda: open(fname,
-                                            "rb")
-                                t.add(a)
-                        t.close()
-                except trans.TransactionError, e:
-                        abort(err=e)
-                        return 1
+                # Otherwise, find the intersection of unmatched patterns so far.
+                for k in pub_unmatched:
+                        try:
+                                src = set(unmatched[k])
+                                unmatched[k] = \
+                                    src.intersection(pub_unmatched[k])
+                        except KeyError:
+                                # Nothing to intersect with; assign instead.
+                                unmatched[k] = pub_unmatched[k]
+
+        # Prune types of matching that didn't have any match failures.
+        for k, v in unmatched.items():
+                if not v:
+                        del unmatched[k]
+
+        if unmatched:
+                # If any match failures remain, abort with an error.
+                match_err = apx.InventoryException(**unmatched)
+                emsg(match_err)
+                if total_processed > 0:
+                        # Partial failure.
+                        abort(retcode=3)
+                abort()
 
         # Dump all temporary data.
         cleanup()
--- a/src/tests/cli/t_pkg_image_create.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/tests/cli/t_pkg_image_create.py	Thu Oct 21 14:28:40 2010 -0700
@@ -56,7 +56,7 @@
 
                 # The fifth depot is purposefully one with the publisher
                 # operation disabled.
-                self.dcs[5].set_disable_ops(["publisher/0"])
+                self.dcs[5].set_disable_ops(["publisher/0", "publisher/1"])
                 self.dcs[5].start()
 
         def test_basic(self):
--- a/src/tests/cli/t_pkg_refresh.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/tests/cli/t_pkg_refresh.py	Thu Oct 21 14:28:40 2010 -0700
@@ -94,7 +94,7 @@
                 order."""
 
                 # 127.0.0.1 - - [15/Oct/2009:00:15:38]
-                # "GET /catalog/1/catalog.base.C HTTP/1.1" 200 189 ""
+                # "GET [/<pub>]/catalog/1/catalog.base.C HTTP/1.1" 200 189 ""
                 # "pkg/b1f63b112bff+ (sunos i86pc; 5.11 snv_122; none; pkg)"
                 entry_comps = [
                     r"(?P<host>\S+)",
@@ -125,6 +125,9 @@
                         if req_method != method:
                                 continue
 
+                        # Strip publisher from URI for this part.
+                        uri = uri.replace("/test1", "")
+                        uri = uri.replace("/test2", "")
                         req_parts = uri.strip("/").split("/", 3)
                         if req_parts[0] != op:
                                 continue
--- a/src/tests/cli/t_pkgrecv.py	Wed Oct 20 15:32:12 2010 -0700
+++ b/src/tests/cli/t_pkgrecv.py	Thu Oct 21 14:28:40 2010 -0700
@@ -109,11 +109,12 @@
                 """ Start two depots.
                     depot 1 gets foo and moo, depot 2 gets foo and bar
                     depot1 is mapped to publisher test1 (preferred)
-                    depot2 is mapped to publisher test1 (alternate) """
+                    depot2 is mapped to publisher test1 (alternate)
+                    depot3 and depot4 are scratch depots"""
 
                 # This test suite needs actual depots.
-                pkg5unittest.ManyDepotTestCase.setUp(self, ["test1", "test1"],
-                    start_depots=True)
+                pkg5unittest.ManyDepotTestCase.setUp(self, ["test1", "test1",
+                    "test2", "test2"], start_depots=True)
 
                 self.make_misc_files(self.misc_files)
 
@@ -133,6 +134,9 @@
                 self.durl2 = self.dcs[2].get_depot_url()
                 self.tempdir = tempfile.mkdtemp(dir=self.test_root)
 
+                self.durl3 = self.dcs[3].get_depot_url()
+                self.durl4 = self.dcs[4].get_depot_url()
+
         @staticmethod
         def get_repo(uri):
                 parts = urlparse.urlparse(uri, "file", allow_fragments=0)
@@ -150,7 +154,7 @@
                 invalid options or option values return expected exit code."""
 
                 # Test that bad options return expected exit code.
-                self.pkgrecv(command="-n", exit=2)
+                self.pkgrecv(command="--newest", exit=2)
                 self.pkgrecv(self.durl1, "-!", exit=2)
                 self.pkgrecv(self.durl1, "-p foo", exit=2)
                 self.pkgrecv(self.durl1, "-d %s [email protected]" % self.tempdir,
@@ -165,15 +169,15 @@
                 self.pkgrecv(self.durl1, "-d file://%s foo" % npath,  exit=1)
 
                 # Test list newest.
-                self.pkgrecv(self.durl1, "-n")
+                self.pkgrecv(self.durl1, "--newest")
                 output = self.reduceSpaces(self.output)
 
                 # The latest version of amber and bronze should be listed
                 # (sans publisher prefix currently).
-                amber = self.published[1].replace("pkg://test1/", "pkg:/")
-                scheme = self.published[6].replace("pkg://test1/", "pkg:/")
-                bronze = self.published[4].replace("pkg://test1/", "pkg:/")
-                tree = self.published[5].replace("pkg://test1/", "pkg:/")
+                amber = self.published[1]
+                scheme = self.published[6]
+                bronze = self.published[4]
+                tree = self.published[5]
                 expected = "\n".join((amber, scheme, tree, bronze)) + "\n"
                 self.assertEqualDiff(expected, output)
 
@@ -183,7 +187,7 @@
                 f = fmri.PkgFmri(self.published[3], None)
 
                 # First, retrieve the package.
-                self.pkgrecv(self.durl1, "-d %s %s" % (self.tempdir, f))
+                self.pkgrecv(self.durl1, "--raw -d %s %s" % (self.tempdir, f))
 
                 # Next, load the manifest.
                 basedir = os.path.join(self.tempdir, f.get_dir_path())
@@ -225,7 +229,8 @@
                 # First, pkgrecv the pkg to a directory.  The files are
                 # kept compressed so they can be compared directly to the
                 # repository's internal copy.
-                self.pkgrecv(self.durl1, "-k -d %s %s" % (self.tempdir, f))
+                self.pkgrecv(self.durl1, "--raw -k -d %s %s" % (self.tempdir,
+                    f))
 
                 # Next, compare the manifests.
                 orepo = self.get_repo(self.dpath1)
@@ -320,6 +325,7 @@
 
                 # Fourth, create an image and verify that the sent package is
                 # seen by the client.
+                self.wait_repo(self.dpath2)
                 self.image_create(self.durl2, prefix="test1")
                 self.pkg("info -r [email protected]")
 
@@ -350,7 +356,7 @@
                 # Retrieve bronze recursively to a directory, this should
                 # also retrieve its dependency: amber, and amber's dependency:
                 # tree.
-                self.pkgrecv(self.durl1, "-r -k -d %s %s" % (self.tempdir,
+                self.pkgrecv(self.durl1, "--raw -r -k -d %s %s" % (self.tempdir,
                     bronze))
 
                 amber = fmri.PkgFmri(self.published[1], None)
@@ -371,8 +377,8 @@
 
                 # Retrieve bronze using -m all-timestamps and a version pattern.
                 # This should only retrieve bronze20_1 and bronze20_2.
-                self.pkgrecv(self.durl1, "-m all-timestamps -r -k -d %s %s" % (
-                    self.tempdir, "[email protected]"))
+                self.pkgrecv(self.durl1, "--raw -m all-timestamps -r -k "
+                    "-d %s %s" % (self.tempdir, "[email protected]"))
 
                 # Verify that only expected packages were retrieved.
                 expected = [
@@ -392,8 +398,8 @@
 
                 # Retrieve bronze using -m all-timestamps and a package stem.
                 # This should retrieve bronze10, bronze20_1, and bronze20_2.
-                self.pkgrecv(self.durl1, "-m all-timestamps -r -k -d %s %s" % (
-                    self.tempdir, "bronze"))
+                self.pkgrecv(self.durl1, "--raw -m all-timestamps -r -k "
+                    "-d %s %s" % (self.tempdir, "bronze"))
 
                 # Verify that only expected packages were retrieved.
                 expected = [
@@ -414,8 +420,8 @@
 
                 # Retrieve bronze using -m all-versions, this should only
                 # retrieve bronze10 and bronze20_2.
-                self.pkgrecv(self.durl1, "-m all-versions -r -k -d %s %s" % (
-                    self.tempdir, "bronze"))
+                self.pkgrecv(self.durl1, "--raw -m all-versions -r -k "
+                    "-d %s %s" % (self.tempdir, "bronze"))
 
                 # Verify that only expected packages were retrieved.
                 expected = [
@@ -439,7 +445,7 @@
                 os.environ["PKG_DEST"] = self.tempdir
 
                 # First, retrieve the package.
-                self.pkgrecv(command="%s" % f)
+                self.pkgrecv(command="--raw %s" % f)
 
                 # Next, load the manifest.
                 basedir = os.path.join(self.tempdir, f.get_dir_path())
@@ -482,6 +488,36 @@
                 # This would fail before behavior fixed to skip existing pkgs.
                 self.pkgrecv(self.durl1, "-r -d file://%s %s" % (npath, f2))
 
+        def test_7_recv_multipublisher(self):
+                """Verify that pkgrecv handles multi-publisher repositories as
+                expected."""
+
+                # Setup a repository with packages from multiple publishers.
+                amber = self.amber10.replace("open ", "open pkg://test2/")
+                self.pkgsend_bulk(self.durl3, amber)
+                self.pkgrecv(self.durl1, "-d %s [email protected] [email protected]" %
+                    self.durl3)
+
+                # Now attempt to receive from a repository with packages from
+                # multiple publishers and verify entry exists only for test1.
+                self.pkgrecv(self.durl3, "-d %s bronze" % self.durl4)
+                self.pkgrecv(self.durl3, "--newest")
+                self.assertNotEqual(self.output.find("test1/bronze"), -1)
+                self.assertEqual(self.output.find("test2/bronze"), -1)
+
+                # Now retrieve amber, and verify entries exist for both pubs.
+                self.wait_repo(self.dcs[4].get_repodir())
+                self.wait_repo(self.dcs[3].get_repodir())
+                self.pkgrecv(self.durl3, "-d %s amber" % self.durl4)
+                self.pkgrecv(self.durl4, "--newest")
+                self.assertNotEqual(self.output.find("test1/amber"), -1)
+                self.assertNotEqual(self.output.find("test2/amber"), -1)
+
+                # Verify attempting to retrieve a non-existent package fails
+                # for a multi-publisher repository.
+                self.pkgrecv(self.durl3, "-d %s nosuchpackage" % self.durl4,
+                    exit=1)
+
 
 if __name__ == "__main__":
         unittest.main()