16818 pkgrecv -d should publish per-package, not in one lump
9197 pkgrecv could show progress indicator for republishing
9190 pkgrecv dry-run option desired
12701 pkgsend should take a relative path
16817 pkgrecv -d too subtle in functionality
17172 pkgrecv should defer catalog update until operation completion
17241 multi-publisher repository operations busted for http
17242 pkgsend should accept simple paths for repository location
17243 pkgrecv should accept simple paths for repository location
17280 pkgrecv should support multi-publisher repositories
--- a/src/gui/modules/progress.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/gui/modules/progress.py Thu Oct 21 14:28:40 2010 -0700
@@ -121,11 +121,11 @@
def dl_output(self):
self.display_download_info()
- if self.prev_pkg != self.dl_cur_pkg:
- self.prev_pkg = self.dl_cur_pkg
+ if self.prev_pkg != self.cur_pkg:
+ self.prev_pkg = self.cur_pkg
self.update_details_text(
_("Package %d of %d: %s\n") % (self.dl_cur_npkgs+1,
- self.dl_goal_npkgs, self.dl_cur_pkg), "level1")
+ self.dl_goal_npkgs, self.cur_pkg), "level1")
def dl_output_done(self):
self.update_details_text("\n")
--- a/src/man/pkgrecv.1.txt Wed Oct 20 15:32:12 2010 -0700
+++ b/src/man/pkgrecv.1.txt Thu Oct 21 14:28:40 2010 -0700
@@ -6,42 +6,35 @@
SYNOPSIS
/usr/bin/pkgrecv [-s src_uri] [-d (path|dest_uri)] [-c cache_dir]
- [-kr] [-m match] (fmri|pattern) ...
- /usr/bin/pkgrecv [-s src_uri] -n
+ [-kr] [-m match] [-n] [--raw] (fmri|pattern) ...
+ /usr/bin/pkgrecv [-s src_uri] --newest
DESCRIPTION
pkgrecv allows the user to retrieve the contents of a package
from a pkg(5) repository. With the default options, the contents
- are retrieved in a format that can easily be input to pkgsend(1)
- when used with the 'include' subcommand. As a result, pkgrecv(1)
- and pkgsend(1) can be used to conveniently modify and republish
- packages, perhaps by correcting file contents or providing
- additional package tags.
+ are retrieved in a repository format suitable for use with
+ pkg.depotd(1M).
OPTIONS
The following options are supported:
-c cache_dir The path to a directory that will be used to cache
- downloaded content. If one is not supplied, the
- client will automatically pick a cache directory.
- In the case where a download is interrupted, and a
- cache directory was automatically chosen, use this
- option to resume the download.
+ downloaded content. If one is not supplied, the
+ client will automatically pick a cache directory.
+ In the case where a download is interrupted, and a
+ cache directory was automatically chosen, use this
+ option to resume the download.
- -d path_or_uri The path of a directory to save the retrieved package
- to, or the URI of a repository to republish it to. If
- not provided, the default value is the current working
- directory. If a directory path is provided, then
- package content will only be retrieved if it does not
- already exist in the target directory. If a repository
- URI is provided, a temporary directory will be created
- and all of the package data retrieved before attempting
- to republish it.
+ -d path_or_uri The filesystem path or URI of the target repository to
+ republish packages to. If not provided, the default
+ value is the current working directory. The target
+ must already exist. New repositories can be created
+ using pkgrepo(1).
-h Display usage message.
-k Keep the retrieved package content compressed, ignored
- when republishing. Should not be used with pkgsend.
+ when republishing. Should not be used with pkgsend(1).
-m match Controls matching behavior using the following values:
all-timestamps
@@ -50,9 +43,7 @@
all-versions
includes all matching versions, not just latest
- -n List the most recent versions of the packages available
- from the specified repository and exit. (All other
- options except -s will be ignored.)
+ -n Perform a trial run with no changes made.
-r Recursively retrieves all dependencies for the provided
list of packages.
@@ -60,20 +51,34 @@
-s src_repo_uri A URI representing the location of a pkg(5) repository
from which to receive package data.
+ --newest List the most recent versions of the packages available
+ from the specified repository and exit. (All other
+ options except -s will be ignored.)
+
+ --raw Retrieve and store the raw package data in a set of
+ directory structures by stem and version at the location
+ specified by -d. May only be used with filesystem-
+ based destinations. This can be used with pkgsend(1)
+ include to conveniently modify and republish packages,
+ perhaps by correcting file contents or providing
+ additional package metadata.
+
EXAMPLES
Example 1: List newest packages available from the repository on
the system named 'test'.
- $ pkgrecv -s http://test -n
+ $ pkgrecv -s http://test --newest
pkg:/[email protected],5.11-0.79:20080221T125720Z
pkg:/[email protected],5.11-0.79:20080221T123955Z
pkg:/[email protected],5.11-0.79:20080221T125728Z
pkg:/[email protected],5.11-0.79:20080221T125730Z
Example 2: Receive the SUNWlibC, SUNWfreetype, and SUNWlibm
- packages from example 1.
+ packages from example 1 in a format suitable for use with
+ pkgsend(1) include.
- $ pkgrecv -s http://test [email protected],5.11-0.79:20080221T125720Z
+ $ pkgrecv -s http://test [email protected],5.11-0.79:20080221T125720Z \
+ --raw
[email protected],5.11-0.79:20080221T123955Z
[email protected],5.11-0.79:20080221T125728Z
$ ls -d SUNW*
@@ -92,7 +97,7 @@
Example 5: Receive all versions of the package "SUNWvim" from the
system named 'test' and republish it to a local repository.
- $ pkgrecv -s http://test -d file:///local/repo SUNWvim
+ $ pkgrecv -s http://test -d /local/repo SUNWvim
Example 6: Receive all versions of the package "SUNWzlib" from the
system named 'test' and republish it to a remote repository on the
@@ -103,7 +108,12 @@
Example 7: Receive the package "SUNWemacs" and all of its dependencies
from the repository located at '/var/pkg/repo'.
- $ pkgrecv -s file:///var/pkg/repo -r SUNWemacs
+ $ pkgrecv -s /var/pkg/repo -r SUNWemacs
+
+ Example 8: Receive all packages that do not already exist from the
+ repository located at 'http://example.com:10000':
+
+ $ pkgrecv -s http://example.com:10000 -d /my/pkg/repo '*'
ENVIRONMENT VARIABLES
The following environment variables are supported:
@@ -119,13 +129,14 @@
EXIT STATUS
The following exit values are returned:
- 0 Everything worked.
+ 0 Command succeeded.
- 1 Something bad happened.
+ 1 An error occurred.
2 Invalid command line options were specified.
-FILES
+ 3 Multiple operations were requested, but only some of them
+ succeeded.
ATTRIBUTES
See attributes(5) for descriptions of the following attributes:
@@ -136,7 +147,7 @@
|_____________________________|_____________________________|
SEE ALSO
- pkgsend(1), attributes(5), pkg(5)
+ pkgrepo(1), pkgsend(1), attributes(5), pkg(5)
NOTES
The image packaging system is an under-development feature.
--- a/src/modules/client/progress.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/client/progress.py Thu Oct 21 14:28:40 2010 -0700
@@ -21,8 +21,7 @@
#
#
-# Copyright 2010 Sun Microsystems, Inc. All rights reserved.
-# Use is subject to license terms.
+# Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
#
import errno
@@ -62,13 +61,14 @@
self.reset()
def reset_download(self):
+ self.dl_started = False
self.dl_goal_nfiles = 0
self.dl_cur_nfiles = 0
self.dl_goal_nbytes = 0
self.dl_cur_nbytes = 0
self.dl_goal_npkgs = 0
self.dl_cur_npkgs = 0
- self.dl_cur_pkg = "None"
+ self.cur_pkg = "None"
def reset(self):
self.cat_cur_catalog = None
@@ -102,6 +102,10 @@
self.item_phase = "None"
self.item_phase_last = "None"
+ self.send_cur_nbytes = 0
+ self.send_goal_nbytes = 0
+ self.republish_started = False
+
# The tracker sets this to True whenever it has emitted
# output, but not yet written a newline. ProgressTracker
# users should call flush() when wanting to interrupt
@@ -183,7 +187,7 @@
self.dl_goal_nbytes = nbytes
def download_start_pkg(self, pkgname):
- self.dl_cur_pkg = pkgname
+ self.cur_pkg = pkgname
if self.dl_goal_nbytes != 0:
self.dl_output()
@@ -197,8 +201,12 @@
self.dl_cur_nbytes += nbytes
self.dl_cur_nfiles += nfiles
- if self.dl_goal_nbytes != 0:
- self.dl_output()
+ if self.dl_started:
+ if self.dl_goal_nbytes != 0:
+ self.dl_output()
+ elif self.republish_started:
+ if self.dl_goal_nbytes != 0:
+ self.republish_output()
def download_done(self):
""" Call when all downloading is finished """
@@ -274,6 +282,36 @@
self.item_output_done()
assert self.item_goal_nitems == self.item_cur_nitems
+ def republish_set_goal(self, npkgs, ngetbytes, nsendbytes):
+ self.item_cur_nitems = 0
+ self.item_goal_nitems = npkgs
+ self.dl_cur_nbytes = 0
+ self.dl_goal_nbytes = ngetbytes
+ self.send_cur_nbytes = 0
+ self.send_goal_nbytes = nsendbytes
+
+ def republish_start_pkg(self, pkgname):
+ self.cur_pkg = pkgname
+ if self.dl_goal_nbytes != 0:
+ self.republish_output()
+
+ def republish_end_pkg(self):
+ self.item_cur_nitems += 1
+ if self.dl_goal_nbytes != 0:
+ self.republish_output()
+
+ def upload_add_progress(self, nbytes):
+ """ Call to provide news that the upload has made progress """
+
+ self.send_cur_nbytes += nbytes
+ if self.send_goal_nbytes != 0:
+ self.republish_output()
+
+ def republish_done(self):
+ """ Call when all downloading is finished """
+ if self.dl_goal_nbytes != 0:
+ self.republish_output_done()
+
#
# This set of methods should be regarded as abstract *and* protected.
# If you aren't in this class hierarchy, these should not be
@@ -376,6 +414,14 @@
raise NotImplementedError("item_output_done() not implemented "
"in superclass")
+ def republish_output(self):
+ raise NotImplementedError("republish_output() not implemented "
+ "in superclass")
+
+ def republish_output_done(self):
+ raise NotImplementedError("republish_output_done() not "
+ "implemented in superclass")
+
def flush(self):
raise NotImplementedError("flush() not implemented in "
"superclass")
@@ -464,9 +510,16 @@
def item_output_done(self):
return
+ def republish_output(self):
+ return
+
+ def republish_output_done(self):
+ return
+
def flush(self):
return
+
class NullProgressTracker(QuietProgressTracker):
""" This ProgressTracker is a subclass of QuietProgressTracker
because that's convenient for now. It is semantically intended to
@@ -487,7 +540,7 @@
def __init__(self):
ProgressTracker.__init__(self)
- self.dl_last_printed_pkg = None
+ self.last_printed_pkg = None
def cat_output_start(self):
return
@@ -531,14 +584,14 @@
def ver_output_done(self):
return
- def dl_output(self):
+ def __generic_pkg_output(self, pkg_line):
try:
# The first time, emit header.
- if self.dl_cur_pkg != self.dl_last_printed_pkg:
- if self.dl_last_printed_pkg != None:
+ if self.cur_pkg != self.last_printed_pkg:
+ if self.last_printed_pkg != None:
print _("Done")
- print _("Download: %s ... ") % (self.dl_cur_pkg),
- self.dl_last_printed_pkg = self.dl_cur_pkg
+ print pkg_line % self.cur_pkg,
+ self.last_printed_pkg = self.cur_pkg
self.needs_cr = True
sys.stdout.flush()
except IOError, e:
@@ -546,6 +599,12 @@
raise PipeError, e
raise
+ def dl_output(self):
+ self.__generic_pkg_output(_("Download: %s ... "))
+
+ def republish_output(self):
+ self.__generic_pkg_output(_("Republish : %s ... "))
+
def __generic_done(self):
try:
print _("Done")
@@ -559,6 +618,9 @@
def dl_output_done(self):
self.__generic_done()
+ def republish_output_done(self):
+ self.__generic_done()
+
def __generic_output(self, phase_attr, last_phase_attr, force=False):
pattr = getattr(self, phase_attr)
last_pattr = getattr(self, last_phase_attr)
@@ -644,7 +706,6 @@
self.cr = '\r'
else:
raise ProgressTrackerException()
- self.dl_started = False
self.spinner = 0
self.spinner_chars = "/-\|"
self.curstrlen = 0
@@ -837,9 +898,9 @@
print self.cr,
self.needs_cr = True
- pkg_name = self.dl_cur_pkg
+ pkg_name = self.cur_pkg
if len(pkg_name) > 38:
- pkg_name = pkg_name[:34] + "..."
+ pkg_name = "..." + pkg_name[-34:]
s = "%-38.38s %7s %11s %12s" % \
(pkg_name,
@@ -857,7 +918,7 @@
raise
def dl_output_done(self):
- self.dl_cur_pkg = "Completed"
+ self.cur_pkg = "Completed"
self.dl_output(force=True)
# Reset.
@@ -875,6 +936,64 @@
raise PipeError, e
raise
+ def republish_output(self, force=False):
+ if self.republish_started and not force and \
+ (time.time() - self.last_print_time) < self.TERM_DELAY:
+ return
+
+ self.last_print_time = time.time()
+
+ try:
+ # The first time, emit header.
+ if not self.republish_started:
+ self.republish_started = True
+ print "%-40s %12s %12s %12s" % (_("PROCESS"),
+ _("ITEMS"), _("GET (MB)"), _("SEND (MB)"))
+ else:
+ print self.cr,
+ self.needs_cr = True
+
+ pkg_name = self.cur_pkg
+ if len(pkg_name) > 40:
+ pkg_name = "..." + pkg_name[-37:]
+
+ s = "%-40.40s %12s %12s %12s" % \
+ (pkg_name,
+ "%d/%d" % (self.item_cur_nitems,
+ self.item_goal_nitems),
+ "%.1f/%.1f" % \
+ ((self.dl_cur_nbytes / 1024.0 / 1024.0),
+ (self.dl_goal_nbytes / 1024.0 / 1024.0)),
+ "%.1f/%.1f" % \
+ ((self.send_cur_nbytes / 1024.0 / 1024.0),
+ (self.send_goal_nbytes / 1024.0 / 1024.0)))
+ sys.stdout.write(s + self.clear_eol)
+ self.needs_cr = True
+ sys.stdout.flush()
+ except IOError, e:
+ if e.errno == errno.EPIPE:
+ raise PipeError, e
+ raise
+
+ def republish_output_done(self):
+ self.cur_pkg = "Completed"
+ self.republish_output(force=True)
+
+ # Reset.
+ self.republish_started = False
+ self.spinner = 0
+ self.curstrlen = 0
+
+ try:
+ print
+ print
+ self.needs_cr = False
+ sys.stdout.flush()
+ except IOError, e:
+ if e.errno == errno.EPIPE:
+ raise PipeError, e
+ raise
+
def __generic_simple_done(self):
try:
print
--- a/src/modules/client/transport/engine.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/client/transport/engine.py Thu Oct 21 14:28:40 2010 -0700
@@ -676,7 +676,8 @@
def send_data(self, url, data=None, header=None, sslcert=None,
sslkey=None, repourl=None, ccancel=None,
- data_fobj=None, data_fp=None, failonerror=True):
+ data_fobj=None, data_fp=None, failonerror=True,
+ progclass=None, progtrack=None):
"""Invoke the engine to retrieve a single URL.
This routine sends the data in data, and returns the
server's response.
@@ -690,7 +691,7 @@
fobj = fileobj.StreamingFileObj(url, self, ccancel=ccancel)
progfunc = None
- if ccancel:
+ if ccancel and not progtrack and not progclass:
progfunc = fobj.get_progress_func()
t = TransportRequest(url, writefunc=fobj.get_write_func(),
@@ -698,7 +699,8 @@
httpmethod="POST", sslcert=sslcert, sslkey=sslkey,
repourl=repourl, progfunc=progfunc, uuid=fobj.uuid,
read_fobj=data_fobj, read_filepath=data_fp,
- failonerror=failonerror)
+ failonerror=failonerror, progclass=progclass,
+ progtrack=progtrack)
self.__req_q.appendleft(t)
--- a/src/modules/client/transport/repo.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/client/transport/repo.py Thu Oct 21 14:28:40 2010 -0700
@@ -129,7 +129,8 @@
raise NotImplementedError
- def publish_add(self, action, header=None, trans_id=None):
+ def publish_add(self, action, header=None, progtrack=None,
+ trans_id=None):
"""The publish operation that adds content to a repository.
The action must be populated with a data property.
Callers may supply a header, and should supply a transaction
@@ -321,11 +322,13 @@
failonerror=failonerror)
def _post_url(self, url, data=None, header=None, ccancel=None,
- data_fobj=None, data_fp=None, failonerror=True):
+ data_fobj=None, data_fp=None, failonerror=True, progclass=None,
+ progtrack=None):
return self._engine.send_data(url, data=data, header=header,
repourl=self._url, ccancel=ccancel,
data_fobj=data_fobj, data_fp=data_fp,
- failonerror=failonerror)
+ failonerror=failonerror, progclass=progclass,
+ progtrack=progtrack)
def __check_response_body(self, fobj):
"""Parse the response body found accessible using the provided
@@ -371,9 +374,9 @@
pub_prefix = getattr(pub, "prefix", None)
if pub_prefix and not methodstr.startswith("open/") and \
not base.endswith("/%s/" % pub_prefix) and \
- self.supports_version("publisher", [1]):
+ self.supports_version("publisher", [1]) > -1:
# Append the publisher prefix to the repository URL.
- base = urlparse.urljoin(base, pub_prefix)
+ base = urlparse.urljoin(base, pub_prefix) + "/"
uri = urlparse.urljoin(base, methodstr)
if not query:
@@ -650,7 +653,8 @@
return self._verdata is not None
- def publish_add(self, action, header=None, trans_id=None):
+ def publish_add(self, action, header=None, progtrack=None,
+ trans_id=None):
"""The publish operation that adds content to a repository.
The action must be populated with a data property.
Callers may supply a header, and should supply a transaction
@@ -659,6 +663,10 @@
attrs = action.attrs
data_fobj = None
data = None
+ progclass = None
+
+ if progtrack:
+ progclass = FileProgress
baseurl = self.__get_request_url("add/0/")
request_str = "%s/%s" % (trans_id, action.name)
@@ -678,7 +686,8 @@
headers.update(header)
fobj = self._post_url(requesturl, header=headers,
- data_fobj=data_fobj, data=data, failonerror=False)
+ data_fobj=data_fobj, data=data, failonerror=False,
+ progclass=progclass, progtrack=progtrack)
self.__check_response_body(fobj)
def publish_add_file(self, pth, header=None, trans_id=None):
@@ -880,7 +889,7 @@
def publish_refresh_indexes(self, header=None, pub=None):
"""Attempt to refresh the search data in the repository."""
- if self.supports_version("admin", [0]):
+ if self.supports_version("admin", [0]) > -1:
requesturl = self.__get_request_url("admin/0", query={
"cmd": "refresh-indexes" }, pub=pub)
else:
@@ -1444,7 +1453,8 @@
return self._verdata is not None
- def publish_add(self, action, header=None, trans_id=None):
+ def publish_add(self, action, header=None, progtrack=None,
+ trans_id=None):
"""The publish operation that adds an action and its
payload (if applicable) to an existing transaction in a
repository. The action must be populated with a data property.
@@ -1454,10 +1464,21 @@
# Calling any publication operation sets read_only to False.
self._frepo.read_only = False
+ progclass = None
+ if progtrack:
+ progclass = FileProgress
+ progtrack = progclass(progtrack)
+
try:
self._frepo.add(trans_id, action)
except svr_repo.RepositoryError, e:
+ if progtrack:
+ progtrack.abort()
raise tx.TransportOperationError(str(e))
+ else:
+ if progtrack:
+ sz = int(action.attrs.get("pkg.size", 0))
+ progtrack.progress_callback(0, 0, sz, sz)
def publish_add_file(self, pth, header=None, trans_id=None):
"""The publish operation that adds a file to an existing
@@ -1678,6 +1699,8 @@
ProgressCallback.__init__(self, progtrack)
self.dltotal = 0
self.dlcurrent = 0
+ self.ultotal = 0
+ self.ulcurrent = 0
self.completed = False
def abort(self):
@@ -1685,6 +1708,7 @@
by this file from the ProgressTracker."""
self.progtrack.download_add_progress(0, -self.dlcurrent)
+ self.progtrack.upload_add_progress(-self.ulcurrent)
self.completed = True
def commit(self, size):
@@ -1719,6 +1743,14 @@
self.dlcurrent += new_progress
self.progtrack.download_add_progress(0, new_progress)
+ if self.ultotal != ultot:
+ self.ultotal = ultot
+
+ new_progress = int(ulcur - self.ulcurrent)
+ if new_progress > 0:
+ self.ulcurrent += new_progress
+ self.progtrack.upload_add_progress(new_progress)
+
return 0
--- a/src/modules/client/transport/transport.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/client/transport/transport.py Thu Oct 21 14:28:40 2010 -0700
@@ -2115,7 +2115,8 @@
(chash, newhash), size=s.st_size)
@LockedTransport()
- def publish_add(self, pub, action=None, trans_id=None):
+ def publish_add(self, pub, action=None, ccancel=None, progtrack=None,
+ trans_id=None):
"""Perform the 'add' publication operation to the publisher
supplied in pub. The transaction-id is passed in trans_id."""
@@ -2123,11 +2124,14 @@
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
header = self.__build_header(uuid=self.__get_uuid(pub))
+ if progtrack and ccancel:
+ progtrack.check_cancelation = ccancel
+
for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
single_repository=True, operation="add", versions=[0]):
try:
d.publish_add(action, header=header,
- trans_id=trans_id)
+ progtrack=progtrack, trans_id=trans_id)
return
except tx.ExcessiveTransientFailure, ex:
# If an endpoint experienced so many failures
--- a/src/modules/publish/transaction.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/publish/transaction.py Thu Oct 21 14:28:40 2010 -0700
@@ -131,10 +131,12 @@
purposes."""
def __init__(self, origin_url, create_repo=False, pkg_name=None,
- repo_props=EmptyDict, trans_id=None, xport=None, pub=None):
+ repo_props=EmptyDict, trans_id=None, xport=None, pub=None,
+ progtrack=None):
self.create_repo = create_repo
self.origin_url = origin_url
self.pkg_name = pkg_name
+ self.progtrack = progtrack
self.trans_id = trans_id
def add(self, action):
@@ -195,7 +197,8 @@
"""Provides a publishing interface that uses client transport."""
def __init__(self, origin_url, create_repo=False, pkg_name=None,
- repo_props=EmptyDict, trans_id=None, xport=None, pub=None):
+ repo_props=EmptyDict, trans_id=None, xport=None, pub=None,
+ progtrack=None):
scheme, netloc, path, params, query, fragment = \
urlparse.urlparse(origin_url, "http", allow_fragments=0)
@@ -204,6 +207,7 @@
self.trans_id = trans_id
self.scheme = scheme
self.path = path
+ self.progtrack = progtrack
self.transport = xport
self.publisher = pub
@@ -267,7 +271,8 @@
try:
self.transport.publish_add(self.publisher,
- action=action, trans_id=self.trans_id)
+ action=action, trans_id=self.trans_id,
+ progtrack=self.progtrack)
except apx.TransportError, e:
msg = str(e)
raise TransactionOperationError("add",
@@ -421,7 +426,7 @@
def __new__(cls, origin_url, create_repo=False, pkg_name=None,
repo_props=EmptyDict, trans_id=None, noexecute=False, xport=None,
- pub=None):
+ pub=None, progtrack=None):
scheme, netloc, path, params, query, fragment = \
urlparse.urlparse(origin_url, "http", allow_fragments=0)
@@ -459,4 +464,4 @@
return cls.__schemes[scheme](origin_url,
create_repo=create_repo, pkg_name=pkg_name,
repo_props=repo_props, trans_id=trans_id, xport=xport,
- pub=pub)
+ pub=pub, progtrack=progtrack)
--- a/src/modules/server/depot.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/modules/server/depot.py Thu Oct 21 14:28:40 2010 -0700
@@ -417,6 +417,7 @@
def refresh(self):
"""Catch SIGUSR1 and reload the depot information."""
+ old_pubs = self.repo.publishers
self.repo.reload()
if type(self.cfg) == cfg.SMFConfig:
# For all other cases, reloading depot configuration
@@ -426,6 +427,9 @@
# Handles the BUI (Browser User Interface).
face.init(self)
+ # Map new publishers into operation space.
+ map(self.__map_pub_ops, self.repo.publishers - old_pubs)
+
def __map_pub_ops(self, pub_prefix):
# Map the publisher into the depot's operation namespace if
# needed.
@@ -2107,21 +2111,21 @@
class BackgroundTaskPlugin(SimplePlugin):
"""This class allows background task execution for the depot server. It
- is designed in such a way as to only allow a single task to be queued
+ is designed in such a way as to only allow a few tasks to be queued
for execution at a time.
"""
def __init__(self, bus):
# Setup the background task queue.
SimplePlugin.__init__(self, bus)
- self.__q = Queue.Queue(1)
+ self.__q = Queue.Queue(10)
self.__thread = None
def put(self, task, *args, **kwargs):
- """Schedule the given task for background execution if one
- is not already.
+ """Schedule the given task for background execution if queue
+ isn't full.
"""
- if self.__q.unfinished_tasks:
+ if self.__q.unfinished_tasks > 9:
raise Queue.Full()
self.__q.put_nowait((task, args, kwargs))
--- a/src/publish.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/publish.py Thu Oct 21 14:28:40 2010 -0700
@@ -50,6 +50,7 @@
import pkg.client.api_errors as apx
import pkg.fmri
import pkg.manifest
+import pkg.misc as misc
import pkg.publish.transaction as trans
import pkg.client.transport.transport as transport
import pkg.client.publisher as publisher
@@ -124,7 +125,7 @@
repo_props.setdefault(p_sec, {})
repo_props[p_sec][p_name] = p_value
- xport, pub = setup_transport_and_pubs(repo_uri)
+ xport, pub = setup_transport_and_pubs(repo_uri, remote=False)
try:
trans.Transaction(repo_uri, create_repo=True,
@@ -581,19 +582,16 @@
return 1
return 0
-def setup_transport_and_pubs(repo_uri):
+def setup_transport_and_pubs(repo_uri, remote=True):
+
+ if repo_uri.startswith("null:"):
+ return None, None
- try:
- repo = publisher.Repository(origins=[repo_uri])
- pub = publisher.Publisher(prefix="default", repositories=[repo])
- xport = transport.Transport(transport.GenericTransportCfg(
- publishers=[pub]))
- except apx.UnsupportedRepositoryURI:
- if repo_uri.startswith("null:"):
- return None, None
- raise
+ xport, xport_cfg = transport.setup_transport()
+ targ_pub = transport.setup_publisher(repo_uri, "default",
+ xport, xport_cfg, remote_prefix=remote)
- return xport, pub
+ return xport, targ_pub
def main_func():
gettext.install("pkg", "/usr/share/locale")
@@ -615,6 +613,9 @@
except getopt.GetoptError, e:
usage(_("pkgsend: illegal global option -- %s") % e.opt)
+ if repo_uri and not repo_uri.startswith("null:"):
+ repo_uri = misc.parse_uri(repo_uri)
+
subcommand = None
if pargs:
subcommand = pargs.pop(0)
--- a/src/pull.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/pull.py Thu Oct 21 14:28:40 2010 -0700
@@ -56,11 +56,12 @@
cache_dir = None
complete_catalog = None
download_start = False
-repo_cache = {}
tmpdirs = []
temp_root = None
xport = None
xport_cfg = None
+dest_xport = None
+targ_pub = None
def error(text):
"""Emit an error message prefixed by the command name """
@@ -87,9 +88,9 @@
msg(_("""\
Usage:
- pkgrecv [-s src_repo_uri] [-d (path|dest_uri)] [-kr] [-m match]
- (fmri|pattern) ...
- pkgrecv [-s src_repo_uri] -n
+ pkgrecv [-s src_repo_uri] [-d (path|dest_uri)] [-kr] [-m match] [-n]
+ [--raw] (fmri|pattern) ...
+ pkgrecv [-s src_repo_uri] --newest
Options:
-c cache_dir The path to a directory that will be used to cache
@@ -99,32 +100,43 @@
cache directory was automatically chosen, use this
option to resume the download.
- -d path_or_uri The path of a directory to save the retrieved package
- to, or the URI of a repository to republish it to. If
- not provided, the default value is the current working
- directory. If a directory path is provided, then
- package content will only be retrieved if it does not
- already exist in the target directory. If a repository
- URI is provided, a temporary directory will be created
- and all of the package data retrieved before attempting
- to republish it.
+ -d path_or_uri The filesystem path or URI of the target repository to
+ republish packages to. If not provided, the default
+ value is the current working directory. The target
+ must already exist. New repositories can be created
+ using pkgrepo(1).
-h Display this usage message.
+
-k Keep the retrieved package content compressed, ignored
when republishing. Should not be used with pkgsend.
+
-m match Controls matching behaviour using the following values:
all-timestamps
includes all matching timestamps, not just
latest (implies all-versions)
all-versions
includes all matching versions, not just latest
- -n List the most recent versions of the packages available
+
+ -n Perform a trial run with no changes made.
+
+ -r Recursively evaluates all dependencies for the provided
+ list of packages and adds them to the list.
+
+ -s src_repo_uri A URI representing the location of a pkg(5)
+ repository to retrieve package data from.
+
+ --newest List the most recent versions of the packages available
from the specified repository and exit. (All other
options except -s will be ignored.)
- -r Recursively evaluates all dependencies for the provided
- list of packages and adds them to the list.
- -s src_repo_uri A URI representing the location of a pkg(5)
- repository to retrieve package data from.
+
+ --raw Retrieve and store the raw package data in a set of
+ directory structures by stem and version at the location
+ specified by -d. May only be used with filesystem-
+ based destinations. This can be used with pkgsend(1)
+ include to conveniently modify and republish packages,
+ perhaps by correcting file contents or providing
+ additional package metadata.
Environment:
PKG_DEST Destination directory or repository URI
@@ -145,6 +157,14 @@
continue
shutil.rmtree(d, True)
+ if caller_error and dest_xport and targ_pub:
+ try:
+ dest_xport.publish_refresh_packages(targ_pub)
+ except apx.TransportError:
+ # If this fails, ignore it as this was a last ditch
+ # attempt anyway.
+ pass
+
def abort(err=None, retcode=1):
"""To be called when a fatal error is encountered."""
@@ -153,7 +173,7 @@
msg("")
error(err)
- cleanup()
+ cleanup(caller_error=True)
sys.exit(retcode)
def get_tracker(quiet=False):
@@ -188,32 +208,6 @@
return m.tostr_unsorted()
return m
-def get_repo(uri):
- if uri in repo_cache:
- return repo_cache[uri]
-
- parts = urlparse.urlparse(uri, "file", allow_fragments=0)
- path = urllib.url2pathname(parts[2])
-
- try:
- repo = sr.Repository(read_only=True, root=path)
- except EnvironmentError, _e:
- error("an error occurred while trying to " \
- "initialize the repository directory " \
- "structures:\n%s" % _e)
- sys.exit(1)
- except sr.RepositoryError, _e:
- error(_e)
- sys.exit(1)
- except cfg.ConfigError, _e:
- error("repository configuration error: %s" % _e)
- sys.exit(1)
- except (search_errors.IndexingException, apx.PermissionsException), _e:
- emsg(str(_e), "INDEX")
- sys.exit(1)
- repo_cache[uri] = repo
- return repo
-
def expand_fmri(pfmri, constraint=version.CONSTRAINT_AUTO):
"""Find matching fmri using CONSTRAINT_AUTO cache for performance.
Returns None if no matching fmri is found."""
@@ -239,30 +233,10 @@
except pkg.fmri.FmriError, e:
abort(err=e)
- # XXX publisher prefixes have to be stripped for catalog matching
- # for now; awaits v1 client support, etc.
- pattern_pubs = {}
- for f in patterns:
- if f.publisher:
- pattern_pubs[f.get_fmri(anarchy=True)] = f.publisher
- f.publisher = None
-
- matches, unmatched = catalog.extract_matching_fmris(fmri_list,
+ return catalog.extract_matching_fmris(fmri_list,
patterns=patterns, constraint=version.CONSTRAINT_AUTO,
matcher=pkg.fmri.glob_match)
- if unmatched:
- match_err = apx.InventoryException(**unmatched)
- emsg(match_err)
- abort()
-
- # XXX restore stripped publisher information.
- for m in matches:
- pub = pattern_pubs.pop(str(m), None)
- if pub:
- m.publisher = pub
- return matches
-
def get_dependencies(src_uri, fmri_list, basedir, tracker):
old_limit = sys.getrecursionlimit()
@@ -294,18 +268,18 @@
def add_hashes_to_multi(mfst, multi):
"""Takes a manifest and a multi object. Adds the hashes to the
- multi object, returns (nfiles, nbytes) tuple."""
+ multi object, returns (get_bytes, send_bytes) tuple."""
- nf = 0
- nb = 0
+ getb = 0
+ sendb = 0
for atype in ("file", "license"):
for a in mfst.gen_actions_by_type(atype):
if a.needsdata(None, None):
multi.add_action(a)
- nf += 1
- nb += get_pkg_otw_size(a)
- return nf, nb
+ getb += get_pkg_otw_size(a)
+ sendb += int(a.attrs.get("pkg.size", 0))
+ return getb, sendb
def prune(fmri_list, all_versions, all_timestamps):
"""Returns a filtered version of fmri_list based on the provided
@@ -345,7 +319,7 @@
fm_list.append(l[0])
for e in fm_list:
- msg(e.get_fmri(anarchy=True))
+ msg(e.get_fmri())
def fetch_catalog(src_pub, tracker, txport):
"""Fetch the catalog from src_uri."""
@@ -356,12 +330,20 @@
if not src_pub.meta_root:
# Create a temporary directory for catalog.
- cat_dir = tempfile.mkdtemp(dir=temp_root)
+ cat_dir = tempfile.mkdtemp(dir=temp_root,
+ prefix=global_settings.client_name + "-")
tmpdirs.append(cat_dir)
src_pub.meta_root = cat_dir
src_pub.transport = txport
- src_pub.refresh(True, True)
+ try:
+ src_pub.refresh(True, True)
+ except apx.TransportError, e:
+ # Assume that a catalog doesn't exist for the target publisher,
+ # and drive on. If there was an actual failure due to a
+ # transport issue, let the failure happen whenever some other
+ # operation is attempted later.
+ return []
cat = src_pub.catalog
@@ -378,9 +360,10 @@
return fmri_list
def main_func():
- global cache_dir, download_start, xport, xport_cfg
+ global cache_dir, download_start, xport, xport_cfg, dest_xport, targ_pub
all_timestamps = False
all_versions = False
+ dry_run = False
keep_compressed = False
list_newest = False
recursive = False
@@ -388,7 +371,7 @@
target = None
incoming_dir = None
src_pub = None
- targ_pub = None
+ raw = False
temp_root = misc.config_temp_root()
@@ -399,7 +382,8 @@
src_uri = os.environ.get("PKG_SRC", None)
try:
- opts, pargs = getopt.getopt(sys.argv[1:], "c:d:hkm:nrs:")
+ opts, pargs = getopt.getopt(sys.argv[1:], "c:d:hkm:nrs:",
+ ["newest", "raw"])
except getopt.GetoptError, e:
usage(_("Illegal option -- %s") % e.opt)
@@ -412,12 +396,6 @@
usage(retcode=0)
elif opt == "-k":
keep_compressed = True
- elif opt == "-n":
- list_newest = True
- elif opt == "-r":
- recursive = True
- elif opt == "-s":
- src_uri = arg
elif opt == "-m":
if arg == "all-timestamps":
all_timestamps = True
@@ -425,17 +403,31 @@
all_versions = True
else:
usage(_("Illegal option value -- %s") % arg)
+ elif opt == "-n":
+ dry_run = True
+ elif opt == "-r":
+ recursive = True
+ elif opt == "-s":
+ src_uri = arg
+ elif opt == "--newest":
+ list_newest = True
+ elif opt == "--raw":
+ raw = True
if not src_uri:
usage(_("a source repository must be provided"))
+ else:
+ src_uri = misc.parse_uri(src_uri)
if not cache_dir:
- cache_dir = tempfile.mkdtemp(dir=temp_root)
+ cache_dir = tempfile.mkdtemp(dir=temp_root,
+ prefix=global_settings.client_name + "-")
# Only clean-up cache dir if implicitly created by pkgrecv.
# User's cache-dirs should be preserved
tmpdirs.append(cache_dir)
- incoming_dir = tempfile.mkdtemp(dir=temp_root)
+ incoming_dir = tempfile.mkdtemp(dir=temp_root,
+ prefix=global_settings.client_name + "-")
tmpdirs.append(incoming_dir)
# Create transport and transport config
@@ -451,200 +443,286 @@
dest_xport_cfg.add_cache(cache_dir, readonly=False)
dest_xport_cfg.incoming_root = incoming_dir
- # Configure src publisher
- src_pub = transport.setup_publisher(src_uri, "source", xport, xport_cfg,
+ # Configure src publisher(s).
+ transport.setup_publisher(src_uri, "source", xport, xport_cfg,
remote_prefix=True)
- tracker = get_tracker()
- if list_newest:
- if pargs or len(pargs) > 0:
- usage(_("-n takes no options"))
+ any_unmatched = []
+ total_processed = 0
+ for src_pub in xport_cfg.gen_publishers():
+ tracker = get_tracker()
+ if list_newest:
+ if pargs or len(pargs) > 0:
+ usage(_("-n takes no options"))
+
+ fmri_list = fetch_catalog(src_pub, tracker, xport)
+ list_newest_fmris(fmri_list)
+ continue
+
+ msg(_("Processing packages for publisher %s ...") %
+ src_pub.prefix)
+ if pargs == None or len(pargs) == 0:
+ usage(_("must specify at least one pkgfmri"))
- fmri_list = fetch_catalog(src_pub, tracker, xport)
- list_newest_fmris(fmri_list)
- return 0
+ republish = False
- if pargs == None or len(pargs) == 0:
- usage(_("must specify at least one pkgfmri"))
+ if not target:
+ target = basedir = os.getcwd()
+ elif target and not raw:
+ basedir = tempfile.mkdtemp(dir=temp_root,
+ prefix=global_settings.client_name + "-")
+ tmpdirs.append(basedir)
+ republish = True
+
+ # Turn target into a valid URI.
+ target = misc.parse_uri(target)
+
+ # Setup target for transport.
+ targ_pub = transport.setup_publisher(target,
+ src_pub.prefix, dest_xport, dest_xport_cfg)
- republish = False
+ # Files have to be decompressed for republishing.
+ keep_compressed = False
+ if target.startswith("file://"):
+ # Check to see if the repository exists first.
+ try:
+ t = trans.Transaction(target,
+ xport=dest_xport, pub=targ_pub)
+ except trans.TransactionRepositoryInvalidError, e:
+ txt = str(e) + "\n\n"
+ txt += _("To create a repository, use "
+ "the pkgsend command.")
+ abort(err=txt)
+ except trans.TransactionRepositoryConfigError, e:
+ txt = str(e) + "\n\n"
+ txt += _("The repository configuration "
+ "for the repository located at "
+ "'%s' is not valid or the "
+ "specified path does not exist. "
+ "Please correct the configuration "
+ "of the repository or create a new "
+ "one.") % target
+ abort(err=txt)
+ except trans.TransactionError, e:
+ abort(err=e)
+ else:
+ basedir = target
+ if not os.path.exists(basedir):
+ try:
+ os.makedirs(basedir, misc.PKG_DIR_MODE)
+ except Exception, e:
+ error(_("Unable to create basedir "
+ "'%s': %s") % (basedir, e))
+ abort()
- if not target:
- target = basedir = os.getcwd()
- elif target.find("://") != -1:
- basedir = tempfile.mkdtemp(dir=temp_root)
- tmpdirs.append(basedir)
- republish = True
-
- targ_pub = transport.setup_publisher(target, "target",
- dest_xport, dest_xport_cfg, remote_prefix=True)
+ xport_cfg.pkg_root = basedir
+ dest_xport_cfg.pkg_root = basedir
- # Files have to be decompressed for republishing.
- keep_compressed = False
- if target.startswith("file://"):
- # Check to see if the repository exists first.
- try:
- t = trans.Transaction(target, xport=dest_xport,
- pub=targ_pub)
- except trans.TransactionRepositoryInvalidError, e:
- txt = str(e) + "\n\n"
- txt += _("To create a repository, use the "
- "pkgsend command.")
- abort(err=txt)
- except trans.TransactionRepositoryConfigError, e:
- txt = str(e) + "\n\n"
- txt += _("The repository configuration for "
- "the repository located at '%s' is not "
- "valid or the specified path does not "
- "exist. Please correct the configuration "
- "of the repository or create a new "
- "one.") % target
- abort(err=txt)
- except trans.TransactionError, e:
- abort(err=e)
- else:
- basedir = target
- if not os.path.exists(basedir):
- try:
- os.makedirs(basedir, misc.PKG_DIR_MODE)
- except:
- error(_("Unable to create basedir '%s'.") % \
- basedir)
- return 1
+ if republish:
+ targ_fmris = fetch_catalog(targ_pub, tracker, dest_xport)
+
+ all_fmris = fetch_catalog(src_pub, tracker, xport)
+ fmri_arguments = pargs
+ matches, unmatched = expand_matching_fmris(all_fmris,
+ fmri_arguments)
+
+ # Track anything that failed to match.
+ any_unmatched.append(unmatched)
+ if not matches:
+ # No matches at all; nothing to do for this publisher.
+ continue
+
+ fmri_list = prune(list(set(matches)), all_versions,
+ all_timestamps)
+
+ if recursive:
+ msg(_("Retrieving manifests for dependency "
+ "evaluation ..."))
+ tracker.evaluate_start()
+ fmri_list = prune(get_dependencies(src_uri, fmri_list,
+ basedir, tracker), all_versions, all_timestamps)
+ tracker.evaluate_done()
+
+ def get_basename(pfmri):
+ open_time = pfmri.get_timestamp()
+ return "%d_%s" % \
+ (calendar.timegm(open_time.utctimetuple()),
+ urllib.quote(str(pfmri), ""))
- xport_cfg.pkg_root = basedir
- dest_xport_cfg.pkg_root = basedir
+ # First, retrieve the manifests and calculate package transfer
+ # sizes.
+ npkgs = len(fmri_list)
+ get_bytes = 0
+ send_bytes = 0
- if republish:
- targ_fmris = fetch_catalog(targ_pub, tracker, dest_xport)
+ if not recursive:
+ msg(_("Retrieving and evaluating %d package(s)...") %
+ npkgs)
+
+ tracker.evaluate_start(npkgs=npkgs)
+ skipped = False
+ retrieve_list = []
+ while fmri_list:
+ f = fmri_list.pop()
- all_fmris = fetch_catalog(src_pub, tracker, xport)
- fmri_arguments = pargs
- fmri_list = prune(list(set(expand_matching_fmris(all_fmris,
- fmri_arguments))), all_versions, all_timestamps)
+ if republish and f in targ_fmris:
+ if not skipped:
+ # Ensure a new line is output so message
+ # is on separate line from spinner.
+ msg("")
+ msg(_("Skipping %s: already present "
+ "at destination") % f)
+ skipped = True
+ continue
- if recursive:
- msg(_("Retrieving manifests for dependency evaluation ..."))
- tracker.evaluate_start()
- fmri_list = prune(get_dependencies(src_uri, fmri_list, basedir,
- tracker), all_versions, all_timestamps)
+ m = get_manifest(f, basedir)
+ pkgdir = xport_cfg.get_pkg_dir(f)
+ mfile = xport.multi_file_ni(src_pub, pkgdir,
+ not keep_compressed, tracker)
+
+ getb, sendb = add_hashes_to_multi(m, mfile)
+ get_bytes += getb
+ if republish:
+ send_bytes += sendb
+
+ retrieve_list.append((f, mfile))
+
+ tracker.evaluate_progress(fmri=f)
tracker.evaluate_done()
- def get_basename(pfmri):
- open_time = pfmri.get_timestamp()
- return "%d_%s" % \
- (calendar.timegm(open_time.utctimetuple()),
- urllib.quote(str(pfmri), ""))
-
- # First, retrieve the manifests and calculate package transfer sizes.
- npkgs = len(fmri_list)
- nfiles = 0
- nbytes = 0
+ # Next, retrieve and store the content for each package.
+ tracker.republish_set_goal(len(retrieve_list), get_bytes,
+ send_bytes)
- if not recursive:
- msg(_("Retrieving manifests for package evaluation ..."))
-
- tracker.evaluate_start(npkgs=npkgs)
- retrieve_list = []
- while fmri_list:
- f = fmri_list.pop()
-
- if republish and f in targ_fmris:
- msg(_("Skipping %s: already present "
- "at destination") % f)
+ if dry_run:
+ tracker.republish_done()
+ cleanup()
continue
- m = get_manifest(f, basedir)
- pkgdir = xport_cfg.get_pkg_dir(f)
- mfile = xport.multi_file_ni(src_pub, pkgdir,
- not keep_compressed, tracker)
-
- nf, nb = add_hashes_to_multi(m, mfile)
- nfiles += nf
- nbytes += nb
+ processed = 0
+ while retrieve_list:
+ f, mfile = retrieve_list.pop()
+ tracker.republish_start_pkg(f.pkg_name)
+
+ if mfile:
+ download_start = True
+ mfile.wait_files()
- retrieve_list.append((f, mfile))
+ if not republish:
+ # Nothing more to do for this package.
+ tracker.republish_end_pkg()
+ continue
+
+ m = get_manifest(f, basedir)
- tracker.evaluate_progress(fmri=f)
- tracker.evaluate_done()
+ # Get first line of original manifest so that inclusion
+ # of the scheme can be determined.
+ use_scheme = True
+ contents = get_manifest(f, basedir, contents=True)
+ if contents.splitlines()[0].find("pkg:/") == -1:
+ use_scheme = False
- # Next, retrieve and store the content for each package.
- msg(_("Retrieving package content ..."))
- tracker.download_set_goal(len(retrieve_list), nfiles, nbytes)
+ pkg_name = f.get_fmri(include_scheme=use_scheme)
+ pkgdir = xport_cfg.get_pkg_dir(f)
- publish_list = []
- while retrieve_list:
- f, mfile = retrieve_list.pop()
- tracker.download_start_pkg(f.get_fmri(include_scheme=False))
+ # This is needed so any previous failures for a package
+ # can be aborted.
+ trans_id = get_basename(f)
- if mfile:
- mfile.wait_files()
- if not download_start:
- download_start = True
+ if not targ_pub:
+ targ_pub = transport.setup_publisher(target,
+ src_pub.prefix, dest_xport, dest_xport_cfg,
+ remote_prefix=True)
+
+ try:
+ t = trans.Transaction(target, pkg_name=pkg_name,
+ trans_id=trans_id, xport=dest_xport,
+ pub=targ_pub, progtrack=tracker)
- if republish:
- publish_list.append(f)
- tracker.download_end_pkg()
- tracker.download_done()
- tracker.reset()
+ # Remove any previous failed attempt to
+ # to republish this package.
+ try:
+ t.close(abandon=True)
+ except:
+ # It might not exist already.
+ pass
- # Finally, republish the packages if needed.
- while publish_list:
- f = publish_list.pop()
- msg(_("Republishing %s ...") % f)
-
- m = get_manifest(f, basedir)
+ t.open()
+ for a in m.gen_actions():
+ if a.name == "set" and \
+ a.attrs.get("name", "") in ("fmri",
+ "pkg.fmri"):
+ # To be consistent with the
+ # server, the fmri can't be
+ # added to the manifest.
+ continue
- # Get first line of original manifest so that inclusion of the
- # scheme can be determined.
- use_scheme = True
- contents = get_manifest(f, basedir, contents=True)
- if contents.splitlines()[0].find("pkg:/") == -1:
- use_scheme = False
-
- pkg_name = f.get_fmri(include_scheme=use_scheme)
- pkgdir = xport_cfg.get_pkg_dir(f)
+ if hasattr(a, "hash"):
+ fname = os.path.join(pkgdir,
+ a.hash)
+ a.data = lambda: open(fname,
+ "rb")
+ t.add(a)
+ # Always defer catalog update.
+ t.close(add_to_catalog=False)
+ except trans.TransactionError, e:
+ abort(err=e)
- # This is needed so any previous failures for a package
- # can be aborted.
- trans_id = get_basename(f)
-
- if not targ_pub:
- targ_pub = transport.setup_publisher(target, "target",
- dest_xport, dest_xport_cfg, remote_prefix=True)
-
- try:
- t = trans.Transaction(target, pkg_name=pkg_name,
- trans_id=trans_id, xport=dest_xport, pub=targ_pub)
-
- # Remove any previous failed attempt to
- # to republish this package.
+ # Dump data retrieved so far after each successful
+ # republish to conserve space.
try:
- t.close(abandon=True)
- except:
- # It might not exist already.
- pass
+ shutil.rmtree(dest_xport_cfg.incoming_root)
+ except EnvironmentError, e:
+ raise apx._convert_error(e)
+ misc.makedirs(dest_xport_cfg.incoming_root)
+
+ processed += 1
+ tracker.republish_end_pkg()
+
+ tracker.republish_done()
+ tracker.reset()
+
+ if processed > 0:
+ # If any packages were published, trigger an update of
+ # the catalog.
+ total_processed += processed
+ dest_xport.publish_refresh_packages(targ_pub)
+
+ # Prevent further use.
+ targ_pub = None
- t.open()
- for a in m.gen_actions():
- if a.name == "set" and \
- a.attrs.get("name", "") in ("fmri",
- "pkg.fmri"):
- # To be consistent with the server,
- # the fmri can't be added to the
- # manifest.
- continue
+ # Find the intersection of patterns that failed to match.
+ unmatched = {}
+ for pub_unmatched in any_unmatched:
+ if not pub_unmatched:
+ # If any publisher matched all patterns, then treat
+ # the operation as successful.
+ unmatched = {}
+ break
- if hasattr(a, "hash"):
- fname = os.path.join(pkgdir,
- a.hash)
- a.data = lambda: open(fname,
- "rb")
- t.add(a)
- t.close()
- except trans.TransactionError, e:
- abort(err=e)
- return 1
+ # Otherwise, find the intersection of unmatched patterns so far.
+ for k in pub_unmatched:
+ try:
+ src = set(unmatched[k])
+ unmatched[k] = \
+ src.intersection(pub_unmatched[k])
+ except KeyError:
+ # Nothing to intersect with; assign instead.
+ unmatched[k] = pub_unmatched[k]
+
+ # Prune types of matching that didn't have any match failures.
+ for k, v in unmatched.items():
+ if not v:
+ del unmatched[k]
+
+ if unmatched:
+ # If any match failures remain, abort with an error.
+ match_err = apx.InventoryException(**unmatched)
+ emsg(match_err)
+ if total_processed > 0:
+ # Partial failure.
+ abort(retcode=3)
+ abort()
# Dump all temporary data.
cleanup()
--- a/src/tests/cli/t_pkg_image_create.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/tests/cli/t_pkg_image_create.py Thu Oct 21 14:28:40 2010 -0700
@@ -56,7 +56,7 @@
# The fifth depot is purposefully one with the publisher
# operation disabled.
- self.dcs[5].set_disable_ops(["publisher/0"])
+ self.dcs[5].set_disable_ops(["publisher/0", "publisher/1"])
self.dcs[5].start()
def test_basic(self):
--- a/src/tests/cli/t_pkg_refresh.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/tests/cli/t_pkg_refresh.py Thu Oct 21 14:28:40 2010 -0700
@@ -94,7 +94,7 @@
order."""
# 127.0.0.1 - - [15/Oct/2009:00:15:38]
- # "GET /catalog/1/catalog.base.C HTTP/1.1" 200 189 ""
+ # "GET [/<pub>]/catalog/1/catalog.base.C HTTP/1.1" 200 189 ""
# "pkg/b1f63b112bff+ (sunos i86pc; 5.11 snv_122; none; pkg)"
entry_comps = [
r"(?P<host>\S+)",
@@ -125,6 +125,9 @@
if req_method != method:
continue
+ # Strip publisher from URI for this part.
+ uri = uri.replace("/test1", "")
+ uri = uri.replace("/test2", "")
req_parts = uri.strip("/").split("/", 3)
if req_parts[0] != op:
continue
--- a/src/tests/cli/t_pkgrecv.py Wed Oct 20 15:32:12 2010 -0700
+++ b/src/tests/cli/t_pkgrecv.py Thu Oct 21 14:28:40 2010 -0700
@@ -109,11 +109,12 @@
""" Start two depots.
depot 1 gets foo and moo, depot 2 gets foo and bar
depot1 is mapped to publisher test1 (preferred)
- depot2 is mapped to publisher test1 (alternate) """
+ depot2 is mapped to publisher test1 (alternate)
+ depot3 and depot4 are scratch depots"""
# This test suite needs actual depots.
- pkg5unittest.ManyDepotTestCase.setUp(self, ["test1", "test1"],
- start_depots=True)
+ pkg5unittest.ManyDepotTestCase.setUp(self, ["test1", "test1",
+ "test2", "test2"], start_depots=True)
self.make_misc_files(self.misc_files)
@@ -133,6 +134,9 @@
self.durl2 = self.dcs[2].get_depot_url()
self.tempdir = tempfile.mkdtemp(dir=self.test_root)
+ self.durl3 = self.dcs[3].get_depot_url()
+ self.durl4 = self.dcs[4].get_depot_url()
+
@staticmethod
def get_repo(uri):
parts = urlparse.urlparse(uri, "file", allow_fragments=0)
@@ -150,7 +154,7 @@
invalid options or option values return expected exit code."""
# Test that bad options return expected exit code.
- self.pkgrecv(command="-n", exit=2)
+ self.pkgrecv(command="--newest", exit=2)
self.pkgrecv(self.durl1, "-!", exit=2)
self.pkgrecv(self.durl1, "-p foo", exit=2)
self.pkgrecv(self.durl1, "-d %s [email protected]" % self.tempdir,
@@ -165,15 +169,15 @@
self.pkgrecv(self.durl1, "-d file://%s foo" % npath, exit=1)
# Test list newest.
- self.pkgrecv(self.durl1, "-n")
+ self.pkgrecv(self.durl1, "--newest")
output = self.reduceSpaces(self.output)
# The latest version of amber and bronze should be listed
# (sans publisher prefix currently).
- amber = self.published[1].replace("pkg://test1/", "pkg:/")
- scheme = self.published[6].replace("pkg://test1/", "pkg:/")
- bronze = self.published[4].replace("pkg://test1/", "pkg:/")
- tree = self.published[5].replace("pkg://test1/", "pkg:/")
+ amber = self.published[1]
+ scheme = self.published[6]
+ bronze = self.published[4]
+ tree = self.published[5]
expected = "\n".join((amber, scheme, tree, bronze)) + "\n"
self.assertEqualDiff(expected, output)
@@ -183,7 +187,7 @@
f = fmri.PkgFmri(self.published[3], None)
# First, retrieve the package.
- self.pkgrecv(self.durl1, "-d %s %s" % (self.tempdir, f))
+ self.pkgrecv(self.durl1, "--raw -d %s %s" % (self.tempdir, f))
# Next, load the manifest.
basedir = os.path.join(self.tempdir, f.get_dir_path())
@@ -225,7 +229,8 @@
# First, pkgrecv the pkg to a directory. The files are
# kept compressed so they can be compared directly to the
# repository's internal copy.
- self.pkgrecv(self.durl1, "-k -d %s %s" % (self.tempdir, f))
+ self.pkgrecv(self.durl1, "--raw -k -d %s %s" % (self.tempdir,
+ f))
# Next, compare the manifests.
orepo = self.get_repo(self.dpath1)
@@ -320,6 +325,7 @@
# Fourth, create an image and verify that the sent package is
# seen by the client.
+ self.wait_repo(self.dpath2)
self.image_create(self.durl2, prefix="test1")
self.pkg("info -r [email protected]")
@@ -350,7 +356,7 @@
# Retrieve bronze recursively to a directory, this should
# also retrieve its dependency: amber, and amber's dependency:
# tree.
- self.pkgrecv(self.durl1, "-r -k -d %s %s" % (self.tempdir,
+ self.pkgrecv(self.durl1, "--raw -r -k -d %s %s" % (self.tempdir,
bronze))
amber = fmri.PkgFmri(self.published[1], None)
@@ -371,8 +377,8 @@
# Retrieve bronze using -m all-timestamps and a version pattern.
# This should only retrieve bronze20_1 and bronze20_2.
- self.pkgrecv(self.durl1, "-m all-timestamps -r -k -d %s %s" % (
- self.tempdir, "[email protected]"))
+ self.pkgrecv(self.durl1, "--raw -m all-timestamps -r -k "
+ "-d %s %s" % (self.tempdir, "[email protected]"))
# Verify that only expected packages were retrieved.
expected = [
@@ -392,8 +398,8 @@
# Retrieve bronze using -m all-timestamps and a package stem.
# This should retrieve bronze10, bronze20_1, and bronze20_2.
- self.pkgrecv(self.durl1, "-m all-timestamps -r -k -d %s %s" % (
- self.tempdir, "bronze"))
+ self.pkgrecv(self.durl1, "--raw -m all-timestamps -r -k "
+ "-d %s %s" % (self.tempdir, "bronze"))
# Verify that only expected packages were retrieved.
expected = [
@@ -414,8 +420,8 @@
# Retrieve bronze using -m all-versions, this should only
# retrieve bronze10 and bronze20_2.
- self.pkgrecv(self.durl1, "-m all-versions -r -k -d %s %s" % (
- self.tempdir, "bronze"))
+ self.pkgrecv(self.durl1, "--raw -m all-versions -r -k "
+ "-d %s %s" % (self.tempdir, "bronze"))
# Verify that only expected packages were retrieved.
expected = [
@@ -439,7 +445,7 @@
os.environ["PKG_DEST"] = self.tempdir
# First, retrieve the package.
- self.pkgrecv(command="%s" % f)
+ self.pkgrecv(command="--raw %s" % f)
# Next, load the manifest.
basedir = os.path.join(self.tempdir, f.get_dir_path())
@@ -482,6 +488,36 @@
# This would fail before behavior fixed to skip existing pkgs.
self.pkgrecv(self.durl1, "-r -d file://%s %s" % (npath, f2))
+ def test_7_recv_multipublisher(self):
+ """Verify that pkgrecv handles multi-publisher repositories as
+ expected."""
+
+ # Setup a repository with packages from multiple publishers.
+ amber = self.amber10.replace("open ", "open pkg://test2/")
+ self.pkgsend_bulk(self.durl3, amber)
+ self.pkgrecv(self.durl1, "-d %s [email protected] [email protected]" %
+ self.durl3)
+
+ # Now attempt to receive from a repository with packages from
+ # multiple publishers and verify entry exists only for test1.
+ self.pkgrecv(self.durl3, "-d %s bronze" % self.durl4)
+ self.pkgrecv(self.durl3, "--newest")
+ self.assertNotEqual(self.output.find("test1/bronze"), -1)
+ self.assertEqual(self.output.find("test2/bronze"), -1)
+
+ # Now retrieve amber, and verify entries exist for both pubs.
+ self.wait_repo(self.dcs[4].get_repodir())
+ self.wait_repo(self.dcs[3].get_repodir())
+ self.pkgrecv(self.durl3, "-d %s amber" % self.durl4)
+ self.pkgrecv(self.durl4, "--newest")
+ self.assertNotEqual(self.output.find("test1/amber"), -1)
+ self.assertNotEqual(self.output.find("test2/amber"), -1)
+
+ # Verify attempting to retrieve a non-existent package fails
+ # for a multi-publisher repository.
+ self.pkgrecv(self.durl3, "-d %s nosuchpackage" % self.durl4,
+ exit=1)
+
if __name__ == "__main__":
unittest.main()