src/pull.py
changeset 1118 f2dd69165173
parent 1063 80e49e35ce17
child 1132 1abf9cf8f69c
--- a/src/pull.py	Mon May 18 23:54:59 2009 -0500
+++ b/src/pull.py	Tue May 19 00:26:10 2009 -0500
@@ -25,182 +25,464 @@
 # Use is subject to license terms.
 #
 
-import sys
+import calendar
+import getopt
+import gettext
 import os
+import shutil
+import sys
+import tempfile
 import traceback
-import getopt
 import urllib
-import tempfile
-import gettext
-import shutil
+import urlparse
 
+import pkg.catalog as catalog
+import pkg.client.progress as progress
 import pkg.fmri
+import pkg.manifest as manifest
 import pkg.pkgtarfile as ptf
-import pkg.catalog as catalog
-import pkg.actions as actions
-from pkg.misc import versioned_urlopen, gunzip_from_stream, msg, emsg, PipeError
+import pkg.portable as portable
+import pkg.publish.transaction as trans
+import pkg.server.config as config
+import pkg.server.repository as repo
+import pkg.server.repositoryconfig as rc
+import pkg.version as version
+
 from pkg.client import global_settings
+from pkg.misc import (emsg, get_pkg_otw_size, gunzip_from_stream, msg,
+    versioned_urlopen, PipeError)
 
-def usage(usage_error = None):
-        """ Emit a usage message and optionally prefix it with a more
-            specific error message.  Causes program to exit. """
+# Globals
+complete_catalog = None
+repo_cache = {}
+tmpdirs = []
+
+def error(text):
+        """Emit an error message prefixed by the command name """
+
+        # If we get passed something like an Exception, we can convert
+        # it down to a string.
+        text = str(text)
+
+        # If the message starts with whitespace, assume that it should come
+        # *before* the command-name prefix.
+        text_nows = text.lstrip()
+        ws = text[:len(text) - len(text_nows)]
+
+        # This has to be a constant value as we can't reliably get our actual
+        # program name on all platforms.
+        emsg(ws + "pkgrecv: " + text_nows)
+
+def usage(usage_error=None, retcode=2):
+        """Emit a usage message and optionally prefix it with a more specific
+        error message.  Causes program to exit."""
 
         if usage_error:
                 error(usage_error)
 
-        emsg(_("""\
+        msg(_("""\
 Usage:
-        pkgrecv -s server [-d dir] pkgfmri ...
-        pkgrecv -s server -n"""))
+        pkgrecv [-s src_repo_uri] [-d (path|dest_uri)] [-k] [-m] [-n] [-r]
+            (fmri|pattern) ...
+        pkgrecv [-s src_repo_uri] -n
+
+Options:
+        -d path_or_uri  The path of a directory to save the retrieved package
+                        to, or the URI of a repository to republish it to.  If
+                        not provided, the default value is the current working
+                        directory.  If a directory path is provided, then
+                        package content will only be retrieved if it does not
+                        already exist in the target directory.  If a repository
+                        URI is provided, a temporary directory will be created
+                        and all of the package data retrieved before attempting
+                        to republish it.
 
-        sys.exit(2)
+        -h              Display this usage message.
+        -k              Keep the retrieved package content compressed, ignored
+                        when republishing.  Should not be used with pkgsend.
+        -m match        Controls matching behaviour using the following values:
+                            all-timestamps
+                                includes all matching timestamps, not just
+                                latest (implies all-versions)
+                            all-versions
+                                includes all matching versions, not just latest
+        -n              List the most recent versions of the packages available
+                        from the specified repository and exit (all other
+                        options except -s will be ignored).
+        -r              Recursively evaluates all dependencies for the provided
+                        list of packages and adds them to the list.
+        -s src_repo_uri A URI representing the location of a pkg(5)
+                        repository to retrieve package data from.
 
+Environment:
+        PKG_DEST        Destination directory or repository URI
+        PKG_SRC         Source repository URI"""))
+        sys.exit(retcode)
+
+def cleanup():
+        """To be called at program finish."""
+        for d in tmpdirs:
+                shutil.rmtree(d, True)
+
+def abort(err=None, retcode=1):
+        """To be called when a fatal error is encountered."""
+
+        if err:
+                # Clear any possible output first.
+                msg("")
+                error(err)
+
+        cleanup()
+        sys.exit(retcode)
 
-def error(error):
-        """ Emit an error message prefixed by the command name """
+def get_tracker(quiet=False):
+        if quiet:
+                progresstracker = progress.QuietProgressTracker()
+        else:
+                try:
+                        progresstracker = \
+                            progress.FancyUNIXProgressTracker()
+                except progress.ProgressTrackerException:
+                        progresstracker = progress.CommandLineProgressTracker()
+        return progresstracker
+
+def get_manifest(src_uri, pfmri, basedir):
+
+        m = None
+        pkgdir = os.path.join(basedir, pfmri.get_dir_path())
+        mpath = os.path.join(pkgdir, "manifest")
+
+        raw = None
+        overwrite = False
+        if not os.path.exists(mpath):
+                raw = fetch_manifest(src_uri, pfmri)
+                overwrite = True
+        else:
+                try:
+                        raw = file(mpath, "rb").read()
+                except:
+                        abort(err=_("Unable to load manifest '%s' for package "
+                            " '%s'.") % (mpath, pfmri))
 
-        # The prgram name has to be a constant value as we can't reliably 
-        # get our actual program name on all platforms.
-        emsg("pkgrecv: " + error)
+        try:
+                m = manifest.CachedManifest(pfmri, basedir, None,
+                    contents=raw)
+        except:
+                abort(err=_("Unable to parse manifest '%s' for package "
+                    "'%s'") % (mpath, pfmri))
+
+        if overwrite:
+                # Overwrite the manifest file so that the on-disk version will
+                # be consistent with the server (due to fmri addition).
+                try:
+                        f = open(mpath, "wb")
+                        f.write(raw)
+                        f.close()
+                except:
+                        abort(err=_("Unable to write manifest '%s' for package "
+                           " '%s'.") % (mpath, pfmri))
+        return m
 
-def hashes_from_mfst(manifest):
-        """Given a path to a manifest, open the file and read through the
-        actions.  Return a set of all content hashes found in the manifest."""
+def get_repo(uri):
+        if uri in repo_cache:
+                return repo_cache[uri]
+
+        parts = urlparse.urlparse(uri, "file", allow_fragments=0)
+        path = urllib.url2pathname(parts[2])
 
-        hashes = set()
+        scfg = config.SvrConfig(path, None, None)
+        scfg.set_read_only()
+        try:
+                scfg.init_dirs()
+        except (config.SvrConfigError, EnvironmentError), e:
+                raise repo.RepositoryError(_("An error occurred while "
+                    "trying to initialize the repository directory "
+                    "structures:\n%s") % e)
+
+        scfg.acquire_in_flight()
+
+        try:
+                scfg.acquire_catalog()
+        except catalog.CatalogPermissionsException, e:
+                raise repo.RepositoryError(str(e))
 
         try:
-                f = file(manifest, "r")
+                repo_cache[uri] = repo.Repository(scfg)
+        except rc.InvalidAttributeValueError, e:
+                raise repo.RepositoryError(_("The specified repository's "
+                    "configuration data is not valid:\n%s") % e)
+
+        return repo_cache[uri]
+
+def fetch_manifest(src_uri, pfmri):
+        """Return the manifest data for package-fmri 'fmri' from the repository
+        at 'src_uri'."""
+
+        if src_uri.startswith("file://"):
+                try:
+                        r = get_repo(src_uri)
+                        m = file(r.manifest(pfmri), "rb")
+                except (EnvironmentError, repo.RepositoryError), e:
+                        abort(err=e)
+        else:
+                # Request manifest from repository.
+                try:
+                        m = versioned_urlopen(src_uri, "manifest", [0],
+                            pfmri.get_url_path())[0]
+                except Exception, e:
+                        abort(err=_("Unable to retrieve manifest %s from "
+                            "%s: %s") % (pfmri.get_url_path(), src_uri, e))
+                except:
+                        abort()
+
+        # Read from repository, return to caller.
+        try:
+                mfst_str = m.read()
         except:
-                error(_("Unable to open manifest: %s") % manifest)
-                sys.exit(1)
+                abort(err=_("Error occurred while reading from: %s") % src_uri)
+
+        if hasattr(m, "close"):
+                m.close()
+
+        return mfst_str
+
+def expand_fmri(pfmri, constraint=version.CONSTRAINT_AUTO):
+        """Find matching fmri using CONSTRAINT_AUTO cache for performance.
+        Returns None if no matching fmri is found."""
+        if isinstance(pfmri, str):
+                pfmri = pkg.fmri.PkgFmri(pfmri, "5.11")
+
+        for f in complete_catalog.get(pfmri.pkg_name, []):
+                if not pfmri.version or \
+                    f.version.is_successor(pfmri.version, constraint):
+                        return f
+        return
+
+def expand_matching_fmris(fmri_list, pfmri_strings):
+        """find matching fmris using pattern matching and
+        constraint auto."""
+        counthash = {}
+
+        try:
+                patterns = [
+                    pkg.fmri.MatchingPkgFmri(s, "5.11")
+                    for s in pfmri_strings
+                ]
+        except pkg.fmri.FmriError, e:
+                abort(err=e)
+
+        matches = catalog.extract_matching_fmris(fmri_list,
+            patterns=patterns, constraint=version.CONSTRAINT_AUTO,
+            counthash=counthash, matcher=pkg.fmri.glob_match)
+
+        bail = False
+
+        for f in patterns:
+                if f not in counthash:
+                        emsg(_("No match found for %s") % f.pkg_name)
+                        bail = True
+
+        if bail:
+                abort()
+
+        return matches
+
+def get_dependencies(src_uri, fmri_list, basedir, tracker):
+
+        old_limit = sys.getrecursionlimit()
+        # The user may be recursing 'entire' or 'redistributable'.
+        sys.setrecursionlimit(3000)
+
+        s = set()
+        for f in fmri_list:
+                pfmri = expand_fmri(f)
+                _get_dependencies(src_uri, s, pfmri, basedir, tracker)
+
+        # Restore the previous default.
+        sys.setrecursionlimit(old_limit)
+
+        return list(s)
+
+def _get_dependencies(src_uri, s, pfmri, basedir, tracker):
+        """Expand all dependencies."""
+        tracker.evaluate_progress(fmri=pfmri)
+        s.add(pfmri)
+
+        m = get_manifest(src_uri, pfmri, basedir)
+        for a in m.gen_actions_by_type("depend"):
+                new_fmri = expand_fmri(a.attrs["fmri"])
+                if new_fmri and new_fmri not in s:
+                        _get_dependencies(src_uri, s, new_fmri, basedir,
+                            tracker)
+        return s
+
+def get_hashes_and_sizes(m):
+        """Returns a dict of hashes and transfer sizes of actions with content
+        in a manifest."""
+
+        seen_hashes = set()
+        def repeated(a):
+                if a in seen_hashes:
+                        return True
+                seen_hashes.add(a)
+                return False
 
-        for line in f:
-                line = line.lstrip()
-                if not line or line[0] == "#":
+        cshashes = {}
+        for atype in ("file", "license"):
+                for a in m.gen_actions_by_type(atype):
+                        if hasattr(a, "hash") and not repeated(a.hash):
+                                sz = int(a.attrs.get("pkg.size", 0))
+                                csize = int(a.attrs.get("pkg.csize", 0))
+                                otw_sz = get_pkg_otw_size(a)
+                                cshashes[a.hash] = (sz, csize, otw_sz)
+        return cshashes
+
+def prune(fmri_list, all_versions, all_timestamps):
+        """Returns a filtered version of fmri_list based on the provided
+        parameters."""
+
+        if all_timestamps:
+                pass
+        elif all_versions:
+                dedup = {}
+                for f in fmri_list:
+                        dedup.setdefault(f.get_short_fmri(), []).append(f)
+                fmri_list = [sorted(dedup[f], reverse=True)[0] for f in dedup]
+        else:
+                dedup = {}
+                for f in fmri_list:
+                        dedup.setdefault(f.pkg_name, []).append(f)
+                fmri_list = [sorted(dedup[f], reverse=True)[0] for f in dedup]
+        return fmri_list
+
+def fetch_files_byhash(src_uri, cshashes, destdir, keep_compressed, tracker):
+        """Given a list of tuples containing content hash, and size and download
+        the content from src_uri into destdir."""
+
+        def valid_file(h):
+                # XXX this should check data digest
+                fname = os.path.join(destdir, h)
+                if os.path.exists(fname):
+                        if keep_compressed:
+                                sz = cshashes[h][1]
+                        else:
+                                sz = cshashes[h][0]
+
+                        if sz == 0:
+                                return True
+
+                        try:
+                                fs = os.stat(fname)
+                        except:
+                                pass
+                        else:
+                                if fs.st_size == sz:
+                                        return True
+                return False
+
+        if src_uri.startswith("file://"):
+                try:
+                        r = get_repo(src_uri)
+                except repo.RepositoryError, e:
+                        abort(err=e)
+
+                for h in cshashes.keys():
+                        dest = os.path.join(destdir, h)
+
+                        # Check to see if the file already exists first, so the
+                        # user can continue interrupted pkgrecv operations.
+                        retrieve = not valid_file(h)
+
+                        try:
+                                if retrieve and keep_compressed:
+                                        src = r.file(h)
+                                        shutil.copy(src, dest)
+                                elif retrieve:
+                                        src = file(r.file(h), "rb")
+                                        outfile = open(dest, "wb")
+                                        gunzip_from_stream(src, outfile)
+                                        outfile.close()
+                        except (EnvironmentError,
+                            repo.RepositoryError), e:
+                                try:
+                                        portable.remove(dest)
+                                except:
+                                        pass
+                                abort(err=e)
+
+                        tracker.download_add_progress(1, cshashes[h][2])
+                return
+
+        req_dict = {}
+        for i, k in enumerate(cshashes.keys()):
+                # Check to see if the file already exists first, so the user can
+                # continue interrupted pkgrecv operations.
+                if valid_file(k):
+                        tracker.download_add_progress(1, cshashes[k][2])
                         continue
 
-                try:
-                        action = actions.fromstr(line)
-                except:
-                        continue
-
-                if hasattr(action, "hash"):
-                        hashes.add(action.hash)
-
-        f.close()
-
-        return hashes
-
-def fetch_files_byhash(server_url, hashes, pkgdir, keep_compressed):
-        """Given a list of files named by content hash, download from
-        server_url into pkgdir."""
-
-        req_dict = { }
-
-        for i, k in enumerate(hashes):
-                str = "File-Name-%s" % i
-                req_dict[str] = k
+                entry = "File-Name-%s" % i
+                req_dict[entry] = k
 
         req_str = urllib.urlencode(req_dict)
+        if not req_str:
+                # Nothing to retrieve.
+                return
+
+        tmpdir = tempfile.mkdtemp()
+        tmpdirs.append(tmpdir)
 
         try:
-                f, v = versioned_urlopen(server_url, "filelist", [0],
-                    data = req_str)
+                f = versioned_urlopen(src_uri, "filelist", [0],
+                    data=req_str)[0]
         except:
-                error(_("Unable to download files from: %s") % server_url)
-                sys.exit(1)
+                abort(err=_("Unable to retrieve content from: %s") % src_uri)
 
         tar_stream = ptf.PkgTarFile.open(mode = "r|", fileobj = f)
 
-        if not os.path.exists(pkgdir):
-                try:
-                        os.makedirs(pkgdir)
-                except:
-                        error(_("Unable to create directory: %s") % pkgdir)
-                        sys.exit(1)
-
         for info in tar_stream:
                 gzfobj = None
                 try:
                         if not keep_compressed:
                                 # Uncompress as we retrieve the files
                                 gzfobj = tar_stream.extractfile(info)
-                                fpath = os.path.join(pkgdir, info.name)
+                                fpath = os.path.join(tmpdir,
+                                    info.name)
                                 outfile = open(fpath, "wb")
                                 gunzip_from_stream(gzfobj, outfile)
                                 outfile.close()
                                 gzfobj.close()
                         else:
-                                # We want to keep the files compressed on disk
-                                tar_stream.extract_to(info, pkgdir, info.name)
+                                # We want to keep the files compressed
+                                # on disk.
+                                tar_stream.extract_to(info, tmpdir,
+                                    info.name)
+
+                        # Copy the file into place (rename can cause a cross-
+                        # link device failure) and then remove the original.
+                        src = os.path.join(tmpdir, info.name)
+                        shutil.copy(src, os.path.join(destdir, info.name))
+                        portable.remove(src)
+
+                        tracker.download_add_progress(1, cshashes[info.name][2])
+                except KeyboardInterrupt:
+                        raise
                 except:
-                        error(_("Unable to extract file: %s") % info.name)
-                        sys.exit(1)
+                        abort(err=_("Unable to extract file: %s") % info.name)
+
+        shutil.rmtree(tmpdirs.pop(), True)
 
         tar_stream.close()
         f.close()
 
-def fetch_manifest(server_url, fmri, basedir):
-        """Fetch the manifest for package-fmri 'fmri' from the server
-        in 'server_url'. Put manifest in a directory named by package stem"""
-
-        # Request manifest from server
-        try:
-                m, v = versioned_urlopen(server_url, "manifest", [0],
-                    fmri.get_url_path())
-        except:
-                error(_("Unable to download manifest %(name)s from %(url)s") %
-                    { "name": fmri.get_url_path(), "url": server_url} )
-                sys.exit(1)
-
-        # join pkgname onto basedir.  Manifest goes here
-        opath = os.path.join(basedir, urllib.quote(fmri.pkg_name, ""))
-
-        # Create directories if they don't exist
-        if not os.path.exists(opath):
-                try:
-                        os.makedirs(opath)
-                except:
-                        error(_("Unable to create directory: %s") % opath)
-                        sys.exit(1)
+def list_newest_fmris(fmri_list):
+        """List the provided fmris."""
 
-        # Open manifest
-        opath = os.path.join(opath, "manifest")
-        try:
-                ofile = file(opath, "w")
-        except:
-                error(_("Unable to open file: %s") % opath)
-                sys.exit(1)
-
-        # Read from server, write to file
-        try:
-                mfst = m.read()
-        except:
-                error(_("Error occurred while reading from: %s") % server_url)
-                sys.exit(1)
-
-        try:
-                ofile.write(mfst)
-        except:
-                error(_("Error occurred while writing to: %s") % opath)
-                sys.exit(1)
-
-        # Close it up
-        ofile.close()
-        m.close()
-
-        return opath
-
-def list_newest_fmris(cat):
-        """Look through the catalog 'cat' and return the newest version
-        of a fmri found for a given package."""
-
-        fm_hash = { }
-        fm_list = [ ]
+        fm_hash = {}
+        fm_list = []
 
         # Order all fmris by package name
-        for f in cat.fmris():
+        for f in sorted(fmri_list):
                 if f.pkg_name in fm_hash:
                         fm_hash[f.pkg_name].append(f)
                 else:
@@ -213,119 +495,292 @@
                 fm_list.append(l[0])
 
         for e in fm_list:
-                print e
+                msg(e)
 
-def fetch_catalog(server_url):
-        """Fetch the catalog from the server_url."""
+def fetch_catalog(src_uri, tracker):
+        """Fetch the catalog from src_uri."""
+        global complete_catalog
+
+        tracker.catalog_start(src_uri)
 
-        # open connection for catalog
-        try:
-                c, v = versioned_urlopen(server_url, "catalog", [0])
-        except:
-                error(_("Unable to download catalog from: %s") % server_url)
-                sys.exit(1)
-
-        # make a tempdir for catalog
-        dl_dir = tempfile.mkdtemp()
+        if src_uri.startswith("file://"):
+                try:
+                        r = get_repo(src_uri)
+                        c = r.catalog()
+                except repo.RepositoryError, e:
+                        error(e)
+                        abort()
+        else:
+                # open connection for catalog
+                try:
+                        c = versioned_urlopen(src_uri, "catalog", [0])[0]
+                except:
+                        abort(err=_("Unable to download catalog from: %s") % \
+                            src_uri)
 
-        # call catalog.recv to pull down catalog
+        # Create a temporary directory for catalog.
+        cat_dir = tempfile.mkdtemp()
+        tmpdirs.append(cat_dir)
+
+        # Call catalog.recv to retrieve catalog.
         try:
-                catalog.recv(c, dl_dir)
+                catalog.recv(c, cat_dir)
         except: 
-                error(_("Error while reading from: %s") % server_url)
-                sys.exit(1)
+                abort(err=_("Error while reading from: %s") % src_uri)
+
+        if hasattr(c, "close"):
+                c.close()
 
-        # close connection to server
-        c.close()
+        cat = catalog.Catalog(cat_dir)
 
-        # instantiate catalog object
-        cat = catalog.Catalog(dl_dir)
-        
-        # return (catalog, tmpdir path)
-        return cat, dl_dir
+        d = {}
+        fmri_list = []
+        for f in cat.fmris():
+                fmri_list.append(f)
+                d.setdefault(f.pkg_name, [f]).append(f)
+        for k in d.keys():
+                d[k].sort(reverse=True)
+
+        complete_catalog = d
+        tracker.catalog_done()
+        return fmri_list
 
 def main_func():
-
-        server = None
-        basedir = None
-        newfmri = False
+        all_timestamps = False
+        all_versions = False
         keep_compressed = False
+        list_newest = False
+        recursive = False
+        src_uri = None
+        target = None
 
         # XXX /usr/lib/locale is OpenSolaris-specific.
         gettext.install("pkgrecv", "/usr/lib/locale")
 
         global_settings.client_name = "pkgrecv"
+        target = os.environ.get("PKG_DEST", None)
+        src_uri = os.environ.get("PKG_SRC", None)
 
         try:
-               opts, pargs = getopt.getopt(sys.argv[1:], "s:d:nk")
+                opts, pargs = getopt.getopt(sys.argv[1:], "d:hkm:nrs:")
         except getopt.GetoptError, e:
-                usage(_("Illegal option -- %s") % e.opt) 
+                usage(_("Illegal option -- %s") % e.opt)
 
         for opt, arg in opts:
-                if opt == "-s":
-                        server = arg
                 if opt == "-d":
-                        basedir = arg
-                if opt == "-n":
-                        newfmri = True
-                if opt == "-k":
+                        target = arg
+                elif opt == "-h":
+                        usage(retcode=0)
+                elif opt == "-k":
                         keep_compressed = True
-
-        if not server:
-                usage(_("must specify a server"))
+                elif opt == "-n":
+                        list_newest = True
+                elif opt == "-r":
+                        recursive = True
+                elif opt == "-s":
+                        src_uri = arg
+                elif opt == "-m":
+                        if arg == "all-timestamps":
+                                all_timestamps = True
+                        elif arg == "all-versions":
+                                all_versions = True
+                        else:
+                                usage(_("Illegal option value -- %s") % arg)
 
-        if not server.startswith("http://"):
-                server = "http://%s" % server
+        if not src_uri:
+                usage(_("a source repository must be provided"))
 
-        if newfmri:
+        tracker = get_tracker()
+        if list_newest:
                 if pargs or len(pargs) > 0:
                         usage(_("-n takes no options"))
 
-                cat, dir = fetch_catalog(server)
-                list_newest_fmris(cat)
-                shutil.rmtree(dir)
-                
-        else:
-                if pargs == None or len(pargs) == 0:
-                        usage(_("must specify at least one pkgfmri"))
+                fmri_list = fetch_catalog(src_uri, tracker)
+                list_newest_fmris(fmri_list)
+                return 0
+
+        if pargs == None or len(pargs) == 0:
+                usage(_("must specify at least one pkgfmri"))
 
-                if not basedir:
-                        basedir = os.getcwd()
+        all_fmris = fetch_catalog(src_uri, tracker)
+        fmri_arguments = pargs
+        fmri_list = prune(list(set(expand_matching_fmris(all_fmris,
+            fmri_arguments))), all_versions, all_timestamps)
+
+        create_repo = False
+        defer_refresh = False
+        republish = False
 
-                for pkgfmri in pargs:
-                        if not pkgfmri.startswith("pkg:/"):
-                                pkgfmri = "pkg:/%s" % pkgfmri
+        if not target:
+                target = basedir = os.getcwd()
+        elif target.find("://") != -1:
+                basedir = tempfile.mkdtemp()
+                tmpdirs.append(basedir)
+                republish = True
+
+                # Files have to be decompressed for republishing.
+                keep_compressed = False
 
+                # Automatically create repository at target location if it
+                # doesn't exist.
+                if target.startswith("file://"):
+                        create_repo = True
+                        # For efficiency, and publishing speed, don't update
+                        # indexes until all file publishing is finished.
+                        defer_refresh = True
+        else:
+                basedir = target
+                if not os.path.exists(basedir):
                         try:
-                                fmri = pkg.fmri.PkgFmri(pkgfmri)
-                        except pkg.fmri.IllegalFmri, e:
-                                error(_("%(fmri)s is an illegal fmri: "
-                                    "%(error)s") %
-                                    { "fmri": pkgfmri, "error": e })
+                                os.makedirs(basedir, 0755)
+                        except:
+                                error(_("Unable to create basedir '%s'.") % \
+                                    basedir)
                                 return 1
 
-                        mfstpath = fetch_manifest(server, fmri, basedir)
-                        content_hashes = hashes_from_mfst(mfstpath)
+        if recursive:
+                msg(_("Retrieving manifests for dependency evaluation ..."))
+                tracker.evaluate_start()
+                fmri_list = prune(get_dependencies(src_uri, fmri_list, basedir,
+                    tracker), all_versions, all_timestamps)
+                tracker.evaluate_done()
+
+        def get_basename(pfmri):
+                open_time = pfmri.get_timestamp()
+                return "%d_%s" % \
+                    (calendar.timegm(open_time.utctimetuple()),
+                    urllib.quote(str(pfmri), ""))
+
+        # First, retrieve the manifests and calculate package transfer sizes.
+        npkgs = len(fmri_list)
+        nfiles = 0
+        nbytes = 0
+
+        if not recursive:
+                msg(_("Retrieving manifests for package evaluation ..."))
+
+        tracker.evaluate_start(npkgs=npkgs)
+        retrieve_list = []
+        while fmri_list:
+                f = fmri_list.pop()
+                m = get_manifest(src_uri, f, basedir)
+                cshashes = get_hashes_and_sizes(m)
+
+                for entry in cshashes.itervalues():
+                        nfiles += 1
+                        nbytes += entry[2]
+
+                retrieve_list.append((f, cshashes))
+
+                tracker.evaluate_progress(fmri=f)
+        tracker.evaluate_done()
+        tracker.reset()
+
+        # Next, retrieve and store the content for each package.
+        msg(_("Retrieving package content ..."))
+        tracker.download_set_goal(len(retrieve_list), nfiles, nbytes)
+
+        publish_list = []
+        while retrieve_list:
+                f, cshashes = retrieve_list.pop()
+                tracker.download_start_pkg(f.get_fmri(include_scheme=False))
+
+                if len(cshashes) > 0:
+                        pkgdir = os.path.join(basedir, f.get_dir_path())
+                        fetch_files_byhash(src_uri, cshashes, pkgdir,
+                            keep_compressed, tracker)
+
+                if republish:
+                        publish_list.append(f)
+                tracker.download_end_pkg()
+        tracker.download_done()
+        tracker.reset()
 
-                        if len(content_hashes) > 0:
-                                fetch_files_byhash(server, content_hashes,
-                                        os.path.dirname(mfstpath),
-                                        keep_compressed)
-                        else:
-                                msg(_("No files to retrieve."))
+        # Finally, republish the packages if needed.
+        while publish_list:
+                f = publish_list.pop()
+                msg(_("Republishing %s ...") % f)
+
+                m = get_manifest(src_uri, f, basedir)
+                pkgdir = os.path.join(basedir, f.get_dir_path())
+
+                # Ensure pkg:/ prefix is not included.
+                pkg_name = f.get_fmri(include_scheme=False)
+
+                # This is needed so any previous failures for a package
+                # can be aborted.
+                trans_id = get_basename(f)
+
+                try:
+                        t = trans.Transaction(target, create_repo=create_repo,
+                            pkg_name=pkg_name, trans_id=trans_id)
+
+                        # Remove any previous failed attempt to
+                        # to republish this package.
+                        try:
+                                t.close(abandon=True)
+                        except:
+                                # It might not exist already.
+                                pass
 
+                        t.open()
+                        for a in m.gen_actions():
+                                if a.name == "set" and \
+                                    a.attrs.get("name", "") == "fmri":
+                                        # To be consistent with the server,
+                                        # the fmri can't be added to the
+                                        # manifest.
+                                        continue
+
+                                if hasattr(a, "hash"):
+                                        fname = os.path.join(pkgdir,
+                                            a.hash)
+                                        a.data = lambda: open(fname,
+                                            "rb")
+                                t.add(a)
+                        t.close(refresh_index=not defer_refresh)
+                except trans.TransactionError, e:
+                        abort(err=e)
+                        return 1
+
+        # Dump all temporary data.
+        cleanup()
+
+        if republish:
+                if defer_refresh:
+                        msg(_("Refreshing repository search indices ..."))
+                        try:
+                                t = trans.Transaction(target)
+                                t.refresh_index()
+                        except trans.TransactionError, e:
+                                error(e)
+                                return 1
         return 0
 
 if __name__ == "__main__":
         try:
-                ret = main_func()
-        except SystemExit, e:
-                raise e
+                __ret = main_func()
+        except (pkg.actions.ActionError, trans.TransactionError,
+            RuntimeError), _e:
+                error(_e)
+                cleanup()
+                __ret = 1
         except (PipeError, KeyboardInterrupt):
                 # We don't want to display any messages here to prevent
                 # possible further broken pipe (EPIPE) errors.
-                sys.exit(1)
+                cleanup()
+                __ret = 1
+        except SystemExit, _e:
+                cleanup()
+                raise _e
         except:
+                cleanup()
                 traceback.print_exc()
-                sys.exit(99)
-        sys.exit(ret)
+                error(
+                    _("\n\nThis is an internal error.  Please let the "
+                    "developers know about this\nproblem by filing a bug at "
+                    "http://defect.opensolaris.org and including the\nabove "
+                    "traceback and this message.  The version of pkg(5) is "
+                    "'%s'.") % pkg.VERSION)
+                __ret = 99
+        sys.exit(__ret)