--- a/src/modules/actions/signature.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/actions/signature.py Fri Jun 24 18:04:04 2016 -0700
@@ -155,8 +155,7 @@
chain_csizes.append(csize)
for attr in chashes:
- chain_chshes[attr].append(
- chashes[attr].hexdigest())
+ chain_chshes[attr].append(chashes[attr])
# Remove any unused hash attributes.
for cattrs in (chain_hshes, chain_chshes):
@@ -269,7 +268,7 @@
shutil.rmtree(tmp_dir)
tmp_a.attrs["pkg.csize"] = csize
for attr in chashes:
- tmp_a.attrs[attr] = chashes[attr].hexdigest()
+ tmp_a.attrs[attr] = chashes[attr]
elif self.hash:
tmp_a.hash = self.hash
for attr in digest.DEFAULT_HASH_ATTRS:
@@ -303,8 +302,7 @@
shutil.rmtree(tmp_dir)
csizes.append(csize)
for attr in chashes:
- chain_chashes[attr].append(
- chashes[attr].hexdigest())
+ chain_chashes[attr].append(chashes[attr])
if chain_hashes:
for attr in digest.DEFAULT_CHAIN_ATTRS:
--- a/src/modules/client/transport/fileobj.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/client/transport/fileobj.py Fri Jun 24 18:04:04 2016 -0700
@@ -240,6 +240,12 @@
# Header and message methods
+ @property
+ def headers(self):
+ if not self.__headers_arrived:
+ self.__fill_headers()
+ return self.__headers
+
def get_http_message(self):
"""Return the status message that may be included
with a numerical HTTP response code. Not all HTTP
--- a/src/modules/client/transport/repo.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/client/transport/repo.py Fri Jun 24 18:04:04 2016 -0700
@@ -49,7 +49,7 @@
import pkg.server.repository as svr_repo
import pkg.server.query_parser as sqp
-from pkg.misc import N_, force_str
+from pkg.misc import N_, compute_compressed_attrs, EmptyDict
class TransportRepo(object):
"""The TransportRepo class handles transport requests.
@@ -142,7 +142,11 @@
raise NotImplementedError
- def publish_add_file(self, action, header=None, trans_id=None):
+ def publish_add_file(self, pth, header=None, trans_id=None,
+ basename=None, progtrack=None):
+ raise NotImplementedError
+
+ def publish_add_manifest(self, pth, header=None, trans_id=None):
raise NotImplementedError
def publish_abandon(self, header=None, trans_id=None):
@@ -208,6 +212,18 @@
raise NotImplementedError
+ def get_compressed_attrs(self, fhash, header=None, pub=None,
+ trans_id=None, hashes=True):
+ """Given a fhash, returns a tuple of (csize, chashes) where
+ 'csize' is the size of the file in the repository and 'chashes'
+ is a dictionary containing any hashes of the compressed data
+ known by the repository. If the repository cannot provide the
+ hash information or 'hashes' is False, chashes will be an empty
+ dictionary. If the repository does not have the file, a tuple
+ of (None, None) will be returned instead."""
+
+ raise NotImplementedError
+
def build_refetch_header(self, header):
"""Based on existing header contents, build a header that
should be used for a subsequent retry when fetching content
@@ -814,13 +830,21 @@
progclass=progclass, progtrack=progtrack)
self.__check_response_body(fobj)
- def publish_add_file(self, pth, header=None, trans_id=None):
+ def publish_add_file(self, pth, header=None, trans_id=None,
+ basename=None, progtrack=None):
"""The publish operation that adds content to a repository.
- The action must be populated with a data property.
Callers may supply a header, and should supply a transaction
id in trans_id."""
attrs = {}
+ progclass = None
+
+ if progtrack:
+ progclass = FileProgress
+
+ if basename:
+ attrs["basename"] = basename
+
baseurl = self.__get_request_url("file/1/")
requesturl = urljoin(baseurl, trans_id)
@@ -832,7 +856,33 @@
if header:
headers.update(header)
- fobj = self._post_url(requesturl, header=headers, data_fp=pth)
+ fobj = self._post_url(requesturl, header=headers, data_fp=pth,
+ progclass=progclass, progtrack=progtrack)
+ self.__check_response_body(fobj)
+
+ def publish_add_manifest(self, pth, header=None, trans_id=None):
+ """The publish operation that adds content to a repository.
+ Callers may supply a header, and should supply a transaction
+ id in trans_id."""
+
+ baseurl = self.__get_request_url("manifest/1/")
+ requesturl = urljoin(baseurl, trans_id)
+ # Compress the manifest for the HTTPRepo case.
+ size = int(os.path.getsize(pth))
+ with open(pth, "rb") as f:
+ data = f.read()
+ basename = os.path.basename(pth) + ".gz"
+ dirname = os.path.dirname(pth)
+ pathname = os.path.join(dirname, basename)
+ compute_compressed_attrs(basename,
+ data=data, size=size, compress_dir=dirname)
+
+ headers = {}
+ if header:
+ headers.update(header)
+
+ fobj = self._post_url(requesturl, header=header,
+ data_fp=pathname)
self.__check_response_body(fobj)
def publish_abandon(self, header=None, trans_id=None):
@@ -1065,6 +1115,51 @@
return True
+ def get_compressed_attrs(self, fhash, header=None, pub=None,
+ trans_id=None, hashes=True):
+ """Given a fhash, returns a tuple of (csize, chashes) where
+ 'csize' is the size of the file in the repository and 'chashes'
+ is a dictionary containing any hashes of the compressed data
+ known by the repository. If the repository cannot provide the
+ hash information or 'hashes' is False, chashes will be an empty
+ dictionary. If the repository does not have the file, a tuple
+ of (None, None) will be returned instead."""
+
+ # If the publisher's prefix isn't contained in trans_id,
+ # assume the server doesn't have the file.
+ pfx = getattr(pub, "prefix", None)
+ if (pfx and trans_id and
+ quote("pkg://{0}/".format(pfx), safe='') not in trans_id):
+ return (None, None)
+
+ # If caller requests hashes and server supports providing them
+ # (v2 of file operation), then attempt to retrieve size and
+ # hashes. Otherwise, fallback to the v0 file operation which
+ # only returns size (so is faster).
+ if hashes and self.supports_version("file", [2]) > -1:
+ version = 2
+ else:
+ version = 0
+
+ baseurl = self.__get_request_url("file/{0}/".format(version),
+ pub=pub)
+ requesturl = urljoin(baseurl, fhash)
+
+ try:
+ # see if repository has file
+ resp = self._fetch_url_header(requesturl, header)
+ resp.read()
+ csize = resp.getheader("Content-Length", None)
+ chashes = dict(
+ val.split("=", 1)
+ for hdr, val in six.iteritems(resp.headers)
+ if hdr.lower().startswith("x-ipkg-attr")
+ )
+ return (csize, chashes)
+ except Exception:
+ # repository transport issue or does not have file
+ return (None, None)
+
def build_refetch_header(self, header):
"""For HTTP requests that have failed due to corrupt content,
if that request didn't specify 'Cache-control: no-cache' in
@@ -1593,7 +1688,7 @@
"catalog": ["1"],
"close": ["0"],
"file": ["0", "1"],
- "manifest": ["0"],
+ "manifest": ["0", "1"],
"open": ["0"],
"publisher": ["0", "1"],
"search": ["1"],
@@ -1642,12 +1737,33 @@
sz = int(action.attrs.get("pkg.size", 0))
progtrack.progress_callback(0, 0, sz, sz)
- def publish_add_file(self, pth, header=None, trans_id=None):
+ def publish_add_file(self, pth, header=None, trans_id=None,
+ basename=None, progtrack=None):
"""The publish operation that adds a file to an existing
transaction."""
+ progclass = None
+ if progtrack:
+ progclass = FileProgress
+ progtrack = progclass(progtrack)
+
+ try:
+ self._frepo.add_file(trans_id, pth, basename)
+ except svr_repo.RepositoryError as e:
+ if progtrack:
+ progtrack.abort()
+ raise tx.TransportOperationError(str(e))
+ else:
+ if progtrack:
+ sz = int(os.path.getsize(pth))
+ progtrack.progress_callback(0, 0, sz, sz)
+
+ def publish_add_manifest(self, pth, header=None, trans_id=None):
+ """The publish operation that adds a manifest to an existing
+ transaction."""
+
try:
- self._frepo.add_file(trans_id, pth)
+ self._frepo.add_manifest(trans_id, pth)
except svr_repo.RepositoryError as e:
raise tx.TransportOperationError(str(e))
@@ -1814,6 +1930,39 @@
return True
+ def get_compressed_attrs(self, fhash, header=None, pub=None,
+ trans_id=None, hashes=True):
+ """Given a fhash, returns a tuple of (csize, chashes) where
+ 'csize' is the size of the file in the repository and 'chashes'
+ is a dictionary containing any hashes of the compressed data
+ known by the repository. If the repository cannot provide the
+ hash information or 'hashes' is False, chashes will be an empty
+ dictionary. If the repository does not have the file, a tuple
+ of (None, None) will be returned instead."""
+
+ # If the publisher's prefix isn't contained in trans_id,
+ # assume the server doesn't have the file.
+ pfx = getattr(pub, "prefix", None)
+ if (pfx and trans_id and
+ quote("pkg://{0}/".format(pfx), safe='') not in trans_id):
+ return (None, None)
+
+ try:
+ # see if repository has file
+ fpath = self._frepo.file(fhash, pub=pfx)
+ if hashes:
+ csize, chashes = compute_compressed_attrs(fhash,
+ file_path=fpath)
+ else:
+ csize = os.stat(fpath).st_size
+ chashes = EmptyDict
+ return (csize, chashes)
+ except (EnvironmentError,
+ svr_repo.RepositoryError,
+ svr_repo.RepositoryFileNotFoundError):
+ # repository transport issue or does not have file
+ return (None, None)
+
def build_refetch_header(self, header):
"""Pointless to attempt refetch of corrupt content for
this protocol."""
--- a/src/modules/client/transport/transport.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/client/transport/transport.py Fri Jun 24 18:04:04 2016 -0700
@@ -84,6 +84,9 @@
self.pkg_pub_map = None
self.alt_pubs = None
+ # An integer that indicates the maximum times to check if a
+ # file needs to be uploaded for the transport.
+ self.max_transfer_checks = 20
def add_cache(self, path, layout=None, pub=None, readonly=True):
"""Adds the directory specified by 'path' as a location to read
@@ -511,7 +514,6 @@
user_agent = property(__get_user_agent,
doc="A string that identifies the user agent for the transport.")
-
class LockedTransport(object):
"""A decorator class that wraps transport functions, calling
their lock and unlock methods. Due to implementation differences
@@ -566,6 +568,9 @@
self.stats = tstats.RepoChooser()
self.repo_status = {}
self.__tmp_crls = {}
+ # Used to record those actions that will have their payload
+ # transferred.
+ self.__hashes = set()
# Used to record those CRLs which are unreachable during the
# current operation.
self.__bad_crls = set()
@@ -1245,6 +1250,29 @@
raise failures
@LockedTransport()
+ def get_compressed_attrs(self, fhash, pub=None, trans_id=None,
+ hashes=True):
+ """Given a fhash, returns a tuple of (csize, chashes) where
+ 'csize' is the size of the file in the repository and 'chashes'
+ is a dictionary containing any hashes of the compressed data
+ known by the repository. If the repository cannot provide the
+ hash information or 'hashes' is False, chashes will be an empty
+ dictionary. If the repository does not have the file, a tuple
+ of (None, None) will be returned instead."""
+
+ failures = tx.TransportFailures()
+ # If the operation fails, it doesn't matter as it won't cause a
+ # correctness issue, and it could be the repository simply
+ # doesn't have the file, so don't try more than once.
+ retry_count = 1
+ header = self.__build_header(uuid=self.__get_uuid(pub))
+
+ for d, retries in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True):
+ return d.get_compressed_attrs(fhash, header,
+ pub=pub, trans_id=trans_id, hashes=hashes)
+
+ @LockedTransport()
def get_manifest(self, fmri, excludes=misc.EmptyI, intent=None,
ccancel=None, pub=None, content_only=False, alt_repo=None):
"""Given a fmri, and optional excludes, return a manifest
@@ -2770,10 +2798,11 @@
raise failures
@LockedTransport()
- def publish_add_file(self, pub, pth, trans_id=None):
+ def publish_add_file(self, pub, pth, trans_id=None, basename=None,
+ progtrack=None):
"""Perform the 'add_file' publication operation to the publisher
- supplied in pub. The caller should include the action in the
- action argument. The transaction-id is passed in trans_id."""
+ supplied in pub. The caller should include the path in the
+ pth argument. The transaction-id is passed in trans_id."""
failures = tx.TransportFailures()
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
@@ -2788,6 +2817,41 @@
versions=[1]):
try:
d.publish_add_file(pth, header=header,
+ trans_id=trans_id, basename=basename,
+ progtrack=progtrack)
+ return
+ except tx.ExcessiveTransientFailure as ex:
+ # If an endpoint experienced so many failures
+ # that we just gave up, grab the list of
+ # failures that it contains
+ failures.extend(ex.failures)
+ except tx.TransportException as e:
+ if e.retryable:
+ failures.append(e)
+ else:
+ raise
+
+ raise failures
+
+ @LockedTransport()
+ def publish_add_manifest(self, pub, pth, trans_id=None):
+ """Perform the 'add_manifest' publication operation to the publisher
+ supplied in pub. The caller should include the path in the
+ pth argument. The transaction-id is passed in trans_id."""
+
+ failures = tx.TransportFailures()
+ retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+ header = self.__build_header(uuid=self.__get_uuid(pub))
+
+ # Call setup if the transport isn't configured or was shutdown.
+ if not self.__engine:
+ self.__setup()
+
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True,
+ operation="manifest", versions=[1]):
+ try:
+ d.publish_add_manifest(pth, header=header,
trans_id=trans_id)
return
except tx.ExcessiveTransientFailure as ex:
@@ -3140,6 +3204,73 @@
return False
return True
+ def supports_version(self, pub, op, verlist):
+ """Returns version-id of highest supported version.
+ If the version is not supported, or no data is available,
+ -1 is returned instead."""
+
+ retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+
+ # Call setup if transport isn't configured, or was shutdown.
+ if not self.__engine:
+ self.__setup()
+
+ # For backward compatibility, we pass version 0 to __gen_repo
+ # so that unsupported operation exception won't be raised if
+ # higher version is not supported, such as manifest/1.
+ for d, retries, v in self.__gen_repo(pub, retry_count,
+ origin_only=True, single_repository=True,
+ operation=op, versions=[0]):
+ return d.supports_version(op, verlist)
+
+ def get_transfer_info(self, pub):
+ """Return a tuple of (compressed, hashes) where 'compressed'
+ indicates whether files can be transferred compressed and
+ 'hashes', the set of hashes of those actions that will have
+ their payload transferred."""
+
+ compressed = self.supports_version(pub, 'manifest', [1]) > -1
+ return compressed, self.__hashes
+
+ def get_transfer_size(self, pub, actions):
+ """Return estimated transfer size given a list of actions that
+ will have their payload transferred."""
+
+ for d, retries in self.__gen_repo(pub, 1,
+ origin_only=True, single_repository=True):
+ scheme, netloc, path, params, query, fragment = \
+ urlparse(d._url, "http", allow_fragments=0)
+ break
+
+ local = scheme == "file"
+ sendb = 0
+ uploaded = 0
+ support = self.supports_version(pub, "manifest", [1]) > -1
+ for a in actions:
+ if not a.has_payload:
+ continue
+ if not support:
+ sendb += int(a.attrs.get("pkg.size", 0))
+ continue
+ if a.hash not in self.__hashes:
+ if (local or uploaded <
+ self.cfg.max_transfer_checks):
+ # If the repository is local
+ # (filesystem-based) or less than
+ # max_transfer_checks, call
+ # get_compressed_attrs()...
+ has_file, dummy = \
+ self.get_compressed_attrs(
+ a.hash, pub=pub, hashes=False)
+ if has_file:
+ continue
+ # If server doesn't have file, assume it will be
+ # uploaded.
+ sendb += int(a.attrs.get("pkg.csize", 0))
+ self.__hashes.add(a.hash)
+ uploaded += 1
+ return sendb
+
class MultiXfr(object):
"""A transport object for performing multiple simultaneous
--- a/src/modules/digest.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/digest.py Fri Jun 24 18:04:04 2016 -0700
@@ -75,6 +75,13 @@
# using the "most preferred" hash. See get_preferred_hash(..),
# get_least_preferred_hash(..) and get_common_preferred_hash(..)
#
+
+LEGACY_HASH_ATTRS = ["hash"]
+LEGACY_CHASH_ATTRS = ["chash"]
+LEGACY_CONTENT_HASH_ATTRS = ["elfhash"]
+LEGACY_CHAIN_ATTRS = ["chain"]
+LEGACY_CHAIN_CHASH_ATTRS = ["chain.chashes"]
+
if DebugValues["hash"] == "sha1+sha512_256" and sha512_supported:
# Simulate pkg(7) where SHA-1 and SHA-512/256 are used for publication
DEFAULT_HASH_ATTRS = ["hash", "pkg.hash.sha512_256"]
--- a/src/modules/misc.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/misc.py Fri Jun 24 18:04:04 2016 -0700
@@ -642,11 +642,55 @@
hash_results[attr] = hash_results[attr].hexdigest()
return hash_results, content.read()
-def compute_compressed_attrs(fname, file_path, data, size, compress_dir,
- bufsz=64*1024, chash_attrs=None, chash_algs=None):
+
+class _GZWriteWrapper(object):
+ """Used by compute_compressed_attrs to calculate data size and compute
+ hashes as the data is written instead of having to read the written data
+ again later."""
+
+ def __init__(self, path, chashes):
+ """If path is None, the data will be discarded immediately after
+ computing size and hashes."""
+
+ if path:
+ self._ofile = open(path, "wb")
+ else:
+ self._ofile = None
+ self._chashes = chashes
+ self._size = 0
+
+ def close(self):
+ """Close the file."""
+ if self._ofile:
+ self._ofile.close()
+ self._ofile = None
+
+ def flush(self):
+ """Flush the file."""
+ if self._ofile:
+ self._ofile.flush()
+
+ @property
+ def size(self):
+ """Return the size of the file."""
+ return self._size
+
+ def write(self, data):
+ """Write data to the file and compute the hashes of the data."""
+ if self._ofile:
+ self._ofile.write(data)
+ self._size += len(data)
+ for chash_attr in self._chashes:
+ self._chashes[chash_attr].update(
+ data) # pylint: disable=E1101
+
+
+def compute_compressed_attrs(fname, file_path=None, data=None, size=None,
+ compress_dir=None, bufsz=64*1024, chash_attrs=None, chash_algs=None):
"""Returns the size and one or more hashes of the compressed data. If
the file located at file_path doesn't exist or isn't gzipped, it creates
- a file in compress_dir named fname.
+ a file in compress_dir named fname. If compress_dir is None, the
+ attributes are calculated but no data will be written.
'chash_attrs' is a list of the chash attributes we should compute, with
'chash_algs' being a dictionary that maps the attribute names to the
@@ -658,6 +702,10 @@
if chash_algs is None:
chash_algs = digest.CHASH_ALGS
+ chashes = {}
+ for chash_attr in chash_attrs:
+ chashes[chash_attr] = chash_algs[chash_attr]()
+
#
# This check prevents compressing a file which is already compressed.
# This takes CPU load off the depot on large imports of mostly-the-same
@@ -675,8 +723,13 @@
opath = file_path
if fileneeded:
- opath = os.path.join(compress_dir, fname)
- ofile = PkgGzipFile(opath, "wb")
+ if compress_dir:
+ opath = os.path.join(compress_dir, fname)
+ else:
+ opath = None
+
+ fobj = _GZWriteWrapper(opath, chashes)
+ ofile = PkgGzipFile(mode="wb", fileobj=fobj)
nbuf = size // bufsz
@@ -688,32 +741,33 @@
m = nbuf * bufsz
ofile.write(data[m:])
ofile.close()
-
- data = None
-
- # Now that the file has been compressed, determine its
- # size.
- fs = os.stat(opath)
- csize = str(fs.st_size)
+ fobj.close()
+ csize = str(fobj.size)
+ for attr in chashes:
+ chashes[attr] = chashes[attr].hexdigest()
+ return csize, chashes
# Compute the SHA hash of the compressed file. In order for this to
# work correctly, we have to use the PkgGzipFile class. It omits
# filename and timestamp information from the gzip header, allowing us
# to generate deterministic hashes for different files with identical
# content.
- cfile = open(opath, "rb")
- chashes = {}
- for chash_attr in chash_attrs:
- chashes[chash_attr] = chash_algs[chash_attr]()
- while True:
- cdata = cfile.read(bufsz)
- # cdata is bytes
- if cdata == b"":
- break
- for chash_attr in chashes:
- chashes[chash_attr].update(
- cdata) # pylint: disable=E1101
- cfile.close()
+ fs = os.stat(opath)
+ csize = str(fs.st_size)
+ with open(opath, "rb") as cfile:
+ while True:
+ cdata = cfile.read(bufsz)
+ # cdata is bytes
+ if cdata == b"":
+ break
+ for chash_attr in chashes:
+ chashes[chash_attr].update(
+ cdata) # pylint: disable=E1101
+
+ # The returned dictionary can now be populated with the hexdigests
+ # instead of the hash objects themselves.
+ for attr in chashes:
+ chashes[attr] = chashes[attr].hexdigest()
return csize, chashes
class ProcFS(object):
--- a/src/modules/publish/transaction.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/publish/transaction.py Fri Jun 24 18:04:04 2016 -0700
@@ -29,12 +29,23 @@
though the other classes can be referred to for documentation purposes."""
import os
+import shutil
import six
from six.moves.urllib.parse import quote, unquote, urlparse, urlunparse
+import tempfile
from pkg.misc import EmptyDict
import pkg.actions as actions
import pkg.config as cfg
+import pkg.digest as digest
+
+# If elf module is supported, we will extract ELF information.
+try:
+ import pkg.elf as elf
+ haveelf = True
+except ImportError:
+ haveelf = False
+import pkg.misc as misc
import pkg.portable.util as os_util
import pkg.server.repository as sr
import pkg.client.api_errors as apx
@@ -133,7 +144,7 @@
self.progtrack = progtrack
self.trans_id = trans_id
- def add(self, action):
+ def add(self, action, exact=False, path=None):
"""Adds an action and its related content to an in-flight
transaction. Returns nothing."""
@@ -206,8 +217,16 @@
self.progtrack = progtrack
self.transport = xport
self.publisher = pub
+ self.__local = False
+ self.__uploaded = 0
+ self.__uploads = {}
+ self.__transactions = {}
+ self._tmpdir = None
+ self._append_mode = False
+ self._upload_mode = None
if scheme == "file":
+ self.__local = True
self.create_file_repo(repo_props=repo_props,
create_repo=create_repo)
elif scheme != "file" and create_repo:
@@ -252,7 +271,7 @@
self.transport.publish_cache_repository(self.publisher, repo)
- def add(self, action):
+ def add(self, action, exact=False, path=None):
"""Adds an action and its related content to an in-flight
transaction. Returns nothing."""
@@ -264,6 +283,22 @@
raise TransactionOperationError("add",
trans_id=self.trans_id, msg=str(e))
+ # If the server supports it, we'll upload the manifest as-is
+ # by accumulating the manifest contents in self.__transactions.
+ man = self.__transactions.get(self.trans_id)
+ if man is not None:
+ try:
+ self._process_action(action, exact=exact,
+ path=path)
+ except apx.TransportError as e:
+ msg = str(e)
+ raise TransactionOperationError("add",
+ trans_id=self.trans_id, msg=msg)
+ self.__transactions[self.trans_id] = man + \
+ str(action) + "\n"
+ return
+
+ # Fallback to older logic.
try:
self.transport.publish_add(self.publisher,
action=action, trans_id=self.trans_id,
@@ -273,7 +308,175 @@
raise TransactionOperationError("add",
trans_id=self.trans_id, msg=msg)
- def add_file(self, pth):
+ def __get_elf_attrs(self, action, fname, data):
+ """Helper function to get the ELF information."""
+
+ if not haveelf or data[:4] != b"\x7fELF" or (
+ "elfarch" in action.attrs and
+ "elfbits" in action.attrs and
+ "elfhash" in action.attrs):
+ return misc.EmptyDict
+
+ elf_name = os.path.join(self._tmpdir,
+ ".temp-{0}".format(fname))
+ with open(elf_name, "wb") as elf_file:
+ elf_file.write(data)
+
+ try:
+ elf_info = elf.get_info(elf_name)
+ except elf.ElfError as e:
+ raise TransactionError(e)
+
+ attrs = {}
+ try:
+ # Check which content checksums to
+ # compute and add to the action
+ elf256 = "pkg.content-type.sha256"
+ elf1 = "elfhash"
+
+ if elf256 in \
+ digest.DEFAULT_CONTENT_HASH_ATTRS:
+ get_sha256 = True
+ else:
+ get_sha256 = False
+
+ if elf1 in \
+ digest.DEFAULT_CONTENT_HASH_ATTRS:
+ get_sha1 = True
+ else:
+ get_sha1 = False
+
+ hashes = elf.get_hashes(elf_name,
+ sha1=get_sha1, sha256=get_sha256)
+
+ if get_sha1:
+ attrs[elf1] = hashes[elf1]
+
+ if get_sha256:
+ attrs[elf256] = \
+ hashes[elf256]
+
+ except elf.ElfError:
+ pass
+ attrs["elfbits"] = str(elf_info["bits"])
+ attrs["elfarch"] = elf_info["arch"]
+ os.unlink(elf_name)
+ return attrs
+
+ def __get_compressed_attrs(self, fhash, data, size):
+ """Given a fhash, data, and size of a file, returns a tuple
+ of (csize, chashes) where 'csize' is the size of the file
+ in the repository and 'chashes' is a dictionary containing
+ any hashes of the compressed data known by the repository."""
+
+ if self.__local or self.__uploaded < \
+ self.transport.cfg.max_transfer_checks:
+ # If the repository is local (filesystem-based) or
+ # number of files uploaded is less than
+ # max_transfer_checks, call get_compressed_attrs()...
+ csize, chashes = self.transport.get_compressed_attrs(
+ fhash, pub=self.publisher, trans_id=self.trans_id)
+ else:
+ # ...or the repository is not filesystem-based and
+ # enough files are missing that we want to avoid the
+ # overhead of calling get_compressed_attrs().
+ csize, chashes = None, None
+
+ if chashes:
+ # If any of the default content hash attributes we need
+ # is not available from the repository, they must be
+ # recomputed below.
+ for k in digest.DEFAULT_CHASH_ATTRS:
+ if k not in chashes:
+ chashes = None
+ break
+ return csize, chashes
+
+ def _process_action(self, action, exact=False, path=None):
+ """Adds all expected attributes to the provided action and
+ upload the file for the action if needed.
+
+ If 'exact' is True and 'path' is 'None', the action won't
+ be modified and no file will be uploaded.
+
+ If 'exact' is True and a 'path' is provided, the file of that
+ path will be uploaded as-is (it is assumed that the file is
+ already in repository format).
+ """
+
+ if self._append_mode and action.name != "signature":
+ raise TransactionOperationError(non_sig=True)
+
+ size = int(action.attrs.get("pkg.size", 0))
+
+ if action.has_payload and size <= 0:
+ # XXX hack for empty files
+ action.data = lambda: open(os.devnull, "rb")
+
+ if action.data is None:
+ return
+
+ if exact:
+ if path:
+ self.add_file(path, basename=action.hash,
+ progtrack=self.progtrack)
+ return
+
+ # Get all hashes for this action.
+ hashes, data = misc.get_data_digest(action.data(),
+ length=size, return_content=True,
+ hash_attrs=digest.DEFAULT_HASH_ATTRS,
+ hash_algs=digest.HASH_ALGS)
+ # Set the hash member for backwards compatibility and
+ # remove it from the dictionary.
+ action.hash = hashes.pop("hash", None)
+ action.attrs.update(hashes)
+
+ # Now set the hash value that will be used for storing the file
+ # in the repository.
+ hash_attr, hash_val, hash_func = \
+ digest.get_least_preferred_hash(action)
+ fname = hash_val
+
+ hdata = self.__uploads.get(fname)
+ if hdata is not None:
+ elf_attrs, csize, chashes = hdata
+ else:
+ # We haven't processed this file before, determine if
+ # it needs to be uploaded and what information the
+ # repository knows about it.
+ elf_attrs = self.__get_elf_attrs(action, fname, data)
+ csize, chashes = self.__get_compressed_attrs(fname,
+ data, size)
+
+ # 'csize' indicates that if file needs to be uploaded.
+ fileneeded = csize is None
+ if fileneeded:
+ fpath = os.path.join(self._tmpdir, fname)
+ csize, chashes = misc.compute_compressed_attrs(
+ fname, data=data, size=size,
+ compress_dir=self._tmpdir)
+ # Upload the compressed file for each action.
+ self.add_file(fpath, basename=fname,
+ progtrack=self.progtrack)
+ os.unlink(fpath)
+ self.__uploaded += 1
+ elif not chashes:
+ # If not fileneeded, and repository can't
+ # provide desired hashes, call
+ # compute_compressed_attrs() in a way that
+ # avoids writing the file to get the attributes
+ # we need.
+ csize, chashes = misc.compute_compressed_attrs(
+ fname, data=data, size=size)
+
+ self.__uploads[fname] = (elf_attrs, csize, chashes)
+
+ action.attrs.update(elf_attrs)
+ action.attrs.update(chashes)
+ action.attrs["pkg.csize"] = csize
+
+ def add_file(self, pth, basename=None, progtrack=None):
"""Adds an additional file to the inflight transaction so that
it will be available for retrieval once the transaction is
closed."""
@@ -286,12 +489,39 @@
try:
self.transport.publish_add_file(self.publisher,
- pth=pth, trans_id=self.trans_id)
+ pth=pth, trans_id=self.trans_id, basename=basename,
+ progtrack=progtrack)
except apx.TransportError as e:
msg = str(e)
raise TransactionOperationError("add_file",
trans_id=self.trans_id, msg=msg)
+ def add_manifest(self, pth):
+ """Adds an additional manifest to the inflight transaction so
+ that it will be available for retrieval once the transaction is
+ closed."""
+
+ if not os.path.isfile(pth):
+ raise TransactionOperationError("add_manifest",
+ trans_id=self.trans_id, msg=str(_("The file to "
+ "be added is not a file. The path given was {0}.").format(
+ pth)))
+
+ try:
+ self.transport.publish_add_manifest(self.publisher,
+ pth=pth, trans_id=self.trans_id)
+ except apx.TransportError as e:
+ msg = str(e)
+ raise TransactionOperationError("add_manifest",
+ trans_id=self.trans_id, msg=msg)
+
+ def _cleanup_upload(self):
+ """Remove any temporary files generated in upload mode."""
+
+ if self._tmpdir:
+ # we don't care if this fails.
+ shutil.rmtree(self._tmpdir, ignore_errors=True)
+
def close(self, abandon=False, add_to_catalog=True):
"""Ends an in-flight transaction. Returns a tuple containing
a package fmri (if applicable) and the final state of the
@@ -306,6 +536,7 @@
"""
if abandon:
+ self.__transactions.pop(self.trans_id, None)
try:
state, fmri = self.transport.publish_abandon(
self.publisher, trans_id=self.trans_id)
@@ -313,7 +544,19 @@
msg = str(e)
raise TransactionOperationError("abandon",
trans_id=self.trans_id, msg=msg)
+ finally:
+ self._cleanup_upload()
+
else:
+ man = self.__transactions.get(self.trans_id)
+ if man is not None:
+ # upload manifest here
+ path = os.path.join(self._tmpdir, "manifest")
+ with open(path, "w") as f:
+ f.write(man)
+ self.add_manifest(path)
+ self.__transactions.pop(self.trans_id, None)
+
try:
state, fmri = self.transport.publish_close(
self.publisher, trans_id=self.trans_id,
@@ -322,9 +565,34 @@
msg = str(e)
raise TransactionOperationError("close",
trans_id=self.trans_id, msg=msg)
+ finally:
+ self._cleanup_upload()
return state, fmri
+ def _init_upload(self):
+ """Initialization for upload mode."""
+
+ if self._upload_mode or self._upload_mode is not None:
+ return
+
+ op = "init_upload"
+ try:
+ self._upload_mode = self.transport.supports_version(
+ self.publisher, "manifest", [1]) > -1
+ except apx.TransportError as e:
+ msg = str(e)
+ raise TransactionOperationError(op,
+ trans_id=self.trans_id, msg=msg)
+
+ if not self._upload_mode:
+ return
+
+ # Create temporary directory and initialize self.__transactions.
+ temp_root = misc.config_temp_root()
+ self._tmpdir = tempfile.mkdtemp(dir=temp_root)
+ self.__transactions.setdefault(self.trans_id, "")
+
def open(self):
"""Starts an in-flight transaction. Returns a URL-encoded
transaction ID on success."""
@@ -347,12 +615,15 @@
msg=_("Unknown failure; no transaction ID provided"
" in response."))
+ self._init_upload()
+
return self.trans_id
def append(self):
"""Starts an in-flight transaction to append to an existing
manifest. Returns a URL-encoded transaction ID on success."""
+ self._append_mode = True
trans_id = None
try:
@@ -371,6 +642,8 @@
msg=_("Unknown failure; no transaction ID provided"
" in response."))
+ self._init_upload()
+
return self.trans_id
def refresh_index(self):
--- a/src/modules/server/depot.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/server/depot.py Fri Jun 24 18:04:04 2016 -0700
@@ -641,6 +641,28 @@
manifest_0._cp_config = { "response.stream": True }
+ def manifest_1(self, *tokens):
+ """Outputs the contents of the manifest or uploads the
+ manifest."""
+
+ method = cherrypy.request.method
+ if method == "GET":
+ return self.manifest_0(*tokens)
+ elif method in ("POST", "PUT"):
+ return self.__upload_manifest(*tokens)
+ raise cherrypy.HTTPError(http_client.METHOD_NOT_ALLOWED,
+ "{0} is not allowed".format(method))
+
+ # We need to prevent cherrypy from processing the request body so that
+ # manifest can parse the request body itself. In addition, we also need
+ # to set the timeout higher since the default is five minutes; not
+ # really enough for a slow connection to upload content.
+ manifest_1._cp_config = {
+ "request.process_request_body": False,
+ "response.timeout": 3600,
+ "response.stream": True
+ }
+
@staticmethod
def _tar_stream_close(**kwargs):
"""This is a special function to finish a tar_stream-based
@@ -715,6 +737,48 @@
"response.stream": True
}
+ def file_2(self, *tokens):
+ """Outputs the contents of the file, named by the SHA hash
+ name in the request path, directly to the client."""
+
+ method = cherrypy.request.method
+ if method == "HEAD":
+ try:
+ fhash = tokens[0]
+ except IndexError:
+ fhash = None
+
+ try:
+ fpath = self.repo.file(fhash,
+ pub=self._get_req_pub())
+ except srepo.RepositoryFileNotFoundError as e:
+ raise cherrypy.HTTPError(http_client.NOT_FOUND,
+ str(e))
+ except srepo.RepositoryError as e:
+ # Treat any remaining repository error as a 404,
+ # but log the error and include the real failure
+ # information.
+ cherrypy.log("Request failed: {0}".format(
+ str(e)))
+ raise cherrypy.HTTPError(http_client.NOT_FOUND,
+ str(e))
+
+ csize, chashes = misc.compute_compressed_attrs(fhash,
+ file_path=fpath)
+ response = cherrypy.response
+ for i, attr in enumerate(chashes):
+ response.headers["X-Ipkg-Attr-{0}".format(i)] = \
+ "{0}={1}".format(attr, chashes[attr])
+
+ # set expiration of response to one day
+ self.__set_response_expires("file", 86400, 86400)
+
+ return serve_file(fpath, "application/data")
+
+ return self.file_1(*tokens)
+
+ file_2._cp_config = { "response.stream": True }
+
@cherrypy.tools.response_headers(headers=[("Pragma", "no-cache"),
("Cache-Control", "no-cache, no-transform, must-revalidate"),
("Expires", 0)])
@@ -1045,9 +1109,46 @@
raise cherrypy.HTTPError(http_client.BAD_REQUEST,
_("file/1 must be sent a file."))
data = request.rfile
+ attrs = dict(
+ val.split("=", 1)
+ for hdr, val in request.headers.items()
+ if hdr.lower().startswith("x-ipkg-setattr")
+ )
+ basename = attrs.get("basename", None)
+ try:
+ self.repo.add_file(trans_id, data, basename, size)
+ except srepo.RepositoryError as e:
+ # Assume a bad request was made. A 404 can't be
+ # returned here as misc.versioned_urlopen will interpret
+ # that to mean that the server doesn't support this
+ # operation.
+ raise cherrypy.HTTPError(http_client.BAD_REQUEST, str(e))
+ response.headers["Content-Length"] = "0"
+ return response.body
+
+ def __upload_manifest(self, *tokens):
+ """Adds a file to an in-flight transaction for the Transaction
+ ID specified in the request path. The content is expected to be
+ in the request body. Returns no output."""
try:
- self.repo.add_file(trans_id, data, size)
+ # cherrypy decoded it, but we actually need it encoded.
+ trans_id = quote(tokens[0], "")
+ except IndexError:
+ raise
+ trans_id = None
+
+ request = cherrypy.request
+ response = cherrypy.response
+
+ size = int(request.headers.get("Content-Length", 0))
+ if size < 0:
+ raise cherrypy.HTTPError(http_client.BAD_REQUEST,
+ _("manifest/1 must be sent a file."))
+ data = request.rfile
+
+ try:
+ self.repo.add_manifest(trans_id, data)
except srepo.RepositoryError as e:
# Assume a bad request was made. A 404 can't be
# returned here as misc.versioned_urlopen will interpret
--- a/src/modules/server/repository.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/server/repository.py Fri Jun 24 18:04:04 2016 -0700
@@ -1162,7 +1162,7 @@
finally:
self.__unlock_rstore()
- def add_file(self, trans_id, data, size=None):
+ def add_file(self, trans_id, data, basename=None, size=None):
"""Adds a file to an in-flight transaction.
'trans_id' is the identifier of a transaction that
@@ -1171,6 +1171,8 @@
'data' is the string object containing the payload of the
file to add.
+ 'basename' is the basename of the file.
+
'size' is an optional integer value indicating the size of
the provided payload.
"""
@@ -1184,7 +1186,31 @@
t = self.__get_transaction(trans_id)
try:
- t.add_file(data, size)
+ t.add_file(data, basename, size)
+ except trans.TransactionError as e:
+ raise RepositoryError(e)
+ return
+
+ def add_manifest(self, trans_id, data):
+ """Adds a manifest to an in-flight transaction.
+
+ 'trans_id' is the identifier of a transaction that
+ the manifest should be added to.
+
+ 'data' is the string object containing the payload of the
+ manifest to add.
+ """
+
+ if self.mirror:
+ raise RepositoryMirrorError()
+ if self.read_only:
+ raise RepositoryReadOnlyError()
+ if not self.trans_root:
+ raise RepositoryUnsupportedOperationError()
+
+ t = self.__get_transaction(trans_id)
+ try:
+ t.add_manifest(data)
except trans.TransactionError as e:
raise RepositoryError(e)
return
@@ -3512,12 +3538,20 @@
continue
rstore.add_content(refresh_index=refresh_index)
- def add_file(self, trans_id, data, size=None):
+ def add_file(self, trans_id, data, basename=None, size=None):
"""Adds a file to a transaction with the specified Transaction
ID."""
rstore = self.get_trans_rstore(trans_id)
- return rstore.add_file(trans_id, data=data, size=size)
+ return rstore.add_file(trans_id, data=data, basename=basename,
+ size=size)
+
+ def add_manifest(self, trans_id, data):
+ """Adds a manifest to a transaction with the specified
+ Transaction ID."""
+
+ rstore = self.get_trans_rstore(trans_id)
+ return rstore.add_manifest(trans_id, data=data)
def rebuild(self, build_catalog=True, build_index=False, pub=None):
"""Rebuilds the repository catalog and search indexes using the
--- a/src/modules/server/transaction.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/modules/server/transaction.py Fri Jun 24 18:04:04 2016 -0700
@@ -33,6 +33,7 @@
import shutil
import six
import time
+import zlib
from six.moves.urllib.parse import quote, unquote
import pkg.actions as actions
@@ -472,7 +473,7 @@
# get all hashes for this action
hashes, data = misc.get_data_digest(action.data(),
length=size, return_content=True,
- hash_attrs=digest.DEFAULT_HASH_ATTRS,
+ hash_attrs=digest.LEGACY_HASH_ATTRS,
hash_algs=digest.HASH_ALGS)
# set the hash member for backwards compatibility and
@@ -510,13 +511,13 @@
elf1 = "elfhash"
if elf256 in \
- digest.DEFAULT_CONTENT_HASH_ATTRS:
+ digest.LEGACY_CONTENT_HASH_ATTRS:
get_sha256 = True
else:
get_sha256 = False
if elf1 in \
- digest.DEFAULT_CONTENT_HASH_ATTRS:
+ digest.LEGACY_CONTENT_HASH_ATTRS:
get_sha1 = True
else:
get_sha1 = False
@@ -550,10 +551,8 @@
csize, chashes = misc.compute_compressed_attrs(
fname, dst_path, data, size, self.dir)
for attr in chashes:
- action.attrs[attr] = chashes[attr].hexdigest()
+ action.attrs[attr] = chashes[attr]
action.attrs["pkg.csize"] = csize
- chash = None
- data = None
self.remaining_payload_cnt = \
len(action.attrs.get("chain.sizes", "").split())
@@ -613,8 +612,39 @@
self.types_found.add(action.name)
- def add_file(self, f, size=None):
+ def add_file(self, f, basename=None, size=None):
"""Adds the file to the Transaction."""
+
+ # If basename provided, just store the file as-is with the
+ # basename.
+ if basename:
+ fileneeded = True
+ try:
+ dst_path = self.rstore.file(basename)
+ fileneeded = False
+ except Exception as e:
+ dst_path = os.path.join(self.dir, basename)
+
+ if not fileneeded:
+ return
+
+ if isinstance(f, six.string_types):
+ portable.copyfile(f, dst_path)
+ return
+
+ bufsz = 128 * 1024
+ if bufsz > size:
+ bufsz = size
+
+ with open(dst_path, "wb") as wf:
+ while True:
+ data = f.read(bufsz)
+ # data is bytes
+ if data == b"":
+ break
+ wf.write(data)
+ return
+
hashes, data = misc.get_data_digest(f, length=size,
return_content=True, hash_attrs=digest.DEFAULT_HASH_ATTRS,
hash_algs=digest.HASH_ALGS)
@@ -636,15 +666,82 @@
raise
dst_path = None
- csize, chashes = misc.compute_compressed_attrs(fname, dst_path,
+ misc.compute_compressed_attrs(fname, dst_path,
data, size, self.dir,
chash_attrs=digest.DEFAULT_CHASH_ATTRS,
chash_algs=digest.CHASH_ALGS)
- chashes = None
- data = None
self.remaining_payload_cnt -= 1
+ def add_manifest(self, f):
+ """Adds the manifest to the Transaction."""
+
+ if isinstance(f, six.string_types):
+ f = open(f, "rb")
+ # Store the manifest file.
+ fpath = os.path.join(self.dir, "manifest")
+ with open(fpath, "ab+") as wf:
+ try:
+ misc.gunzip_from_stream(f, wf, ignore_hash=True)
+ wf.seek(0)
+ content = wf.read()
+ except zlib.error:
+ # No need to decompress it if it's not a gzipped
+ # file.
+ f.seek(0)
+ content = f.read()
+ wf.write(content)
+ # Do some sanity checking on packages marked or being marked
+ # obsolete or renamed.
+ m = pkg.manifest.Manifest()
+ m.set_content(misc.force_str(content))
+ for action in m.gen_actions():
+ if action.name == "set" and \
+ action.attrs["name"] == "pkg.obsolete" and \
+ action.attrs["value"] == "true":
+ self.obsolete = True
+ if self.types_found.difference(
+ set(("set", "signature"))):
+ raise TransactionOperationError(_("An obsolete "
+ "package cannot contain actions other than "
+ "'set' and 'signature'."))
+ elif action.name == "set" and \
+ action.attrs["name"] == "pkg.renamed" and \
+ action.attrs["value"] == "true":
+ self.renamed = True
+ if self.types_found.difference(
+ set(("depend", "set", "signature"))):
+ raise TransactionOperationError(_("A renamed "
+ "package cannot contain actions other than "
+ "'set', 'depend', and 'signature'."))
+
+ if not self.has_reqdeps and action.name == "depend" and \
+ action.attrs["type"] == "require":
+ self.has_reqdeps = True
+
+ if self.obsolete and self.renamed:
+ # Reset either obsolete or renamed, depending on which
+ # action this was.
+ if action.attrs["name"] == "pkg.obsolete":
+ self.obsolete = False
+ else:
+ self.renamed = False
+ raise TransactionOperationError(_("A package may not "
+ " be marked for both obsoletion and renaming."))
+ elif self.obsolete and action.name not in ("set", "signature"):
+ raise TransactionOperationError(_("A '{type}' action "
+ "cannot be present in an obsolete package: "
+ "{action}").format(
+ type=action.name, action=action))
+ elif self.renamed and action.name not in \
+ ("depend", "set", "signature"):
+ raise TransactionOperationError(_("A '{type}' action "
+ "cannot be present in a renamed package: "
+ "{action}").format(
+ type=action.name, action=action))
+
+ self.types_found.add(action.name)
+
def accept_publish(self, add_to_catalog=True):
"""Transaction meets consistency criteria, and can be published.
Publish, making appropriate catalog entries."""
--- a/src/pull.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/pull.py Fri Jun 24 18:04:04 2016 -0700
@@ -321,8 +321,10 @@
sendb = 0
sendcb = 0
+ hashes = set()
for a in mfst.gen_actions():
- if a.has_payload:
+ if a.has_payload and a.hash not in hashes:
+ hashes.add(a.hash)
getb += get_pkg_otw_size(a)
getf += 1
sendb += int(a.attrs.get("pkg.size", 0))
@@ -336,9 +338,11 @@
"""Takes a manifest and a multi object and adds the hashes to the multi
object."""
+ hashes = set()
for a in mfst.gen_actions():
- if a.has_payload:
+ if a.has_payload and a.hash not in hashes:
multi.add_action(a)
+ hashes.add(a.hash)
def prune(fmri_list, all_versions, all_timestamps):
"""Returns a filtered version of fmri_list based on the provided
@@ -1518,12 +1522,11 @@
# mogrify is done.
nm = m
- getb, getf, sendb, sendcb = get_sizes(nm)
+ getb, getf = get_sizes(nm)[:2]
if republish:
- # For now, normal republication always uses
- # uncompressed data as already compressed data
- # is not supported for publication.
- send_bytes += sendb
+ send_bytes += dest_xport.get_transfer_size(
+ new_targ_pubs[nf.publisher],
+ nm.gen_actions())
# Store a mapping between new fmri and new manifest for
# future use.
@@ -1574,7 +1577,14 @@
continue
processed = 0
+ uploads = set()
pkgs_to_get = sorted(pkgs_to_get)
+ hashes = set()
+ if republish and pkgs_to_get:
+ # If files can be transferred compressed, keep them
+ # compressed in the source.
+ keep_compressed, hashes = dest_xport.get_transfer_info(
+ new_targ_pubs[pkgs_to_get[0].publisher])
for nf in pkgs_to_get:
tracker.republish_start_pkg(nf)
# Processing republish.
@@ -1629,13 +1639,28 @@
# added to the manifest.
continue
+ fname = None
+ fhash = None
if a.has_payload:
+ fhash = a.hash
fname = os.path.join(pkgdir,
- a.hash)
+ fhash)
a.data = lambda: open(fname,
"rb")
- t.add(a)
+
+ if fhash in hashes and \
+ fhash not in uploads:
+ # If the payload will be
+ # transferred and not have been
+ # uploaded, upload it...
+ t.add(a, exact=True, path=fname)
+ uploads.add(fhash)
+ else:
+ # ...otherwise, just add the
+ # action to the transaction.
+ t.add(a, exact=True)
+
if a.name == "signature" and \
not do_mog:
# We always store content in the
@@ -1645,7 +1670,11 @@
least_preferred=True):
fname = os.path.join(
pkgdir, fp)
- t.add_file(fname)
+ if keep_compressed:
+ t.add_file(fname,
+ basename=fp)
+ else:
+ t.add_file(fname)
# Always defer catalog update.
t.close(add_to_catalog=False)
except trans.TransactionError as e:
--- a/src/tests/cli/t_pkgrecv.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/tests/cli/t_pkgrecv.py Fri Jun 24 18:04:04 2016 -0700
@@ -1496,6 +1496,18 @@
tact = list(tm.gen_actions_by_type('file'))[0]
self.assertEqual("42." + oelfhash, tact.attrs["elfhash"])
+ def test_16_recv_old_republish(self):
+ """Verify that older logic of republication in pkgrecv works."""
+
+ f = fmri.PkgFmri(self.published[3], None)
+
+ self.dcs[2].stop()
+ self.dcs[2].set_disable_ops(["manifest/1"])
+ self.dcs[2].start()
+
+ self.pkgrecv(self.durl1, "-d {0} {1}".format(self.durl2, f))
+ self.dcs[2].unset_disable_ops()
+
class TestPkgrecvHTTPS(pkg5unittest.HTTPSTestClass):
--- a/src/tests/cli/t_pkgsend.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/tests/cli/t_pkgsend.py Fri Jun 24 18:04:04 2016 -0700
@@ -1215,23 +1215,7 @@
# the expected name.
shutil.rmtree(rpath)
- def test_22_publish(self):
- """Verify that pkgsend publish works as expected."""
-
- rootdir = self.test_root
- dir_1 = os.path.join(rootdir, "dir_1")
- dir_2 = os.path.join(rootdir, "dir_2")
- os.mkdir(dir_1)
- os.mkdir(dir_2)
- open(os.path.join(dir_1, "A"), "w").close()
- open(os.path.join(dir_2, "B"), "w").close()
- mfpath = os.path.join(rootdir, "manifest_test")
- with open(mfpath, "w") as mf:
- mf.write("""file NOHASH mode=0755 owner=root group=bin path=/A
- file NOHASH mode=0755 owner=root group=bin path=/B
- set name=pkg.fmri [email protected],5.10
- """)
-
+ def __test_publish(self, dir_1, dir_2, mfpath):
dhurl = self.dc.get_depot_url()
# -s may be specified either as a global option or as a local
# option for the publish subcommand.
@@ -1254,6 +1238,31 @@
self.pkg("verify")
self.image_destroy()
+ def test_22_publish(self):
+ """Verify that pkgsend publish works as expected."""
+
+ rootdir = self.test_root
+ dir_1 = os.path.join(rootdir, "dir_1")
+ dir_2 = os.path.join(rootdir, "dir_2")
+ os.mkdir(dir_1)
+ os.mkdir(dir_2)
+ open(os.path.join(dir_1, "A"), "w").close()
+ open(os.path.join(dir_2, "B"), "w").close()
+ mfpath = os.path.join(rootdir, "manifest_test")
+ with open(mfpath, "w") as mf:
+ mf.write("""file NOHASH mode=0755 owner=root group=bin path=/A
+ file NOHASH mode=0755 owner=root group=bin path=/B
+ set name=pkg.fmri [email protected],5.10
+ """)
+ self.__test_publish(dir_1, dir_2, mfpath)
+
+ # Verify that older logic for pkgsend publish works.
+ self.dc.stop()
+ self.dc.set_disable_ops(["manifest/1"])
+ self.dc.start()
+ self.__test_publish(dir_1, dir_2, mfpath)
+ self.dc.unset_disable_ops()
+
def test_23_pkgsend_no_version(self):
"""Verify that FMRI without version cannot be specified."""
@@ -1330,7 +1339,8 @@
self.pkgsend("", "-s {0} publish {1}".format(furi, mfpath))
self.image_create(furi)
self.pkg("contents -rm multihash")
- self.assertTrue("pkg.hash.{0}=spaghetti".format(hash_alg in self.output))
+ self.assertTrue("pkg.hash.{0}=spaghetti".format(
+ hash_alg) in self.output)
self.pkgsend("", "-s {0} publish {1}".format(furi, mfpath),
debug_hash="sha1+{0}".format(hash_alg))
--- a/src/tests/cli/t_pkgsign.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/tests/cli/t_pkgsign.py Fri Jun 24 18:04:04 2016 -0700
@@ -657,7 +657,7 @@
self.pkg("install example_pkg", exit=1)
- def test_sign_5(self):
+ def base_sign_5(self):
"""Test that http repos work."""
self.dcs[1].start()
@@ -685,6 +685,16 @@
api_obj = self.get_img_api_obj()
self._api_install(api_obj, ["example_pkg"])
+ def test_sign_5(self):
+ """Test that http repos work."""
+
+ self.base_sign_5()
+
+ # Verify that older logic of publication api works.
+ self.dcs[1].stop()
+ self.dcs[1].set_disable_ops(["manifest/1"])
+ self.base_sign_5()
+
def test_length_two_chains(self):
"""Check that chains of length two work correctly."""
@@ -2008,7 +2018,10 @@
plist = self.pkgsend_bulk(self.rurl1, self.example_pkg10)
- self.dcs[1].set_disable_ops(["add"])
+ # New publication uses manifest/1 to upload manifest as-is
+ # and avoid using add ops. Disable manifest/1 to fall back
+ # to older logic here for testing.
+ self.dcs[1].set_disable_ops(["add", "manifest/1"])
self.dcs[1].start()
sign_args = "-k {key} -c {cert} {pkg}".format(
@@ -2023,7 +2036,11 @@
"""Test that publishing to a depot which doesn't support file
fails as expected."""
- self.dcs[1].set_disable_ops(["file"])
+ # New publication uses manifest/1 which uses file/1, so if we
+ # disable file ops, we can't use the new publication model.
+ # Disable manifest/1 to fall back to older logic here for
+ # testing.
+ self.dcs[1].set_disable_ops(["file", "manifest/1"])
self.dcs[1].start()
plist = self.pkgsend_bulk(self.durl1, self.example_pkg10)
@@ -2842,12 +2859,13 @@
def setUp(self):
pkg5unittest.ManyDepotTestCase.setUp(self,
- ["test", "test", "crl"])
+ ["test", "test", "crl", "test"])
self.make_misc_files(self.misc_files)
self.durl1 = self.dcs[1].get_depot_url()
self.rurl1 = self.dcs[1].get_repo_url()
self.durl2 = self.dcs[2].get_depot_url()
self.rurl2 = self.dcs[2].get_repo_url()
+ self.durl4 = self.dcs[4].get_depot_url()
DebugValues["crl_host"] = self.dcs[3].get_depot_url()
self.ta_dir = None
@@ -3073,16 +3091,27 @@
are signed with the same certificates and intermediate
certificates are involved, pkgrecv continues to work."""
+ self.__test_sign_pkgrecv_cache_sign_interaction()
+ # Verify that older logic of publication api works.
+ self.dcs[1].stop()
+ self.dcs[2].stop()
+ self.dcs[1].set_disable_ops(["manifest/1"])
+ self.dcs[2].set_disable_ops(["manifest/1"])
+ self.__test_sign_pkgrecv_cache_sign_interaction()
+
+ def __test_sign_pkgrecv_cache_sign_interaction(self):
+ self.dcs[1].start()
+ self.dcs[2].start()
manf = """
open a@1,5.11-0
close
"""
- self.pkgsend_bulk(self.rurl2, manf)
+ self.pkgsend_bulk(self.durl2, manf)
manf = """
open b@1,5.11-0
close
"""
- self.pkgsend_bulk(self.rurl2, manf)
+ self.pkgsend_bulk(self.durl2, manf)
ta_path = os.path.join(self.raw_trust_anchor_dir,
"ta2_cert.pem")
@@ -3093,11 +3122,11 @@
pkg="'*'"
)
- self.pkgsign(self.rurl2, sign_args)
+ self.pkgsign(self.durl2, sign_args)
cache_dir = os.path.join(self.test_root, "cache")
- self.pkgrecv(self.rurl2, "-c {0} -d {1} '*'".format(
- cache_dir, self.rurl1))
+ self.pkgrecv(self.durl2, "-c {0} -d {1} '*'".format(
+ cache_dir, self.durl1))
def test_sign_pkgrecv_a(self):
"""Check that signed packages can be archived."""
@@ -3236,6 +3265,58 @@
cnt += 1
self.assertEqual(cnt, 2)
+ def test_sign_pkgrecv_across_repositories(self):
+ """Check that signed packages can be pkgrecved to a new
+ repository that enables new hashes but the new hashes won't
+ be added to the packages so that the existing signatures won't
+ be invalidated"""
+
+ # We create an image simply so we can use "contents -g" to
+ # inspect the repository.
+ self.image_create()
+ self.dcs[1].start()
+ self.dcs[2].start()
+ plist = self.pkgsend_bulk(self.rurl2, self.example_pkg10)
+ ta_path = os.path.join(self.raw_trust_anchor_dir,
+ "ta3_cert.pem")
+ sign_args = "-k {key} -c {cert} -i {ch1} -i {ta} " \
+ "{name}".format(**{
+ "name": plist[0],
+ "key": os.path.join(self.keys_dir,
+ "cs1_ch1_ta3_key.pem"),
+ "cert": os.path.join(self.cs_dir,
+ "cs1_ch1_ta3_cert.pem"),
+ "ch1": os.path.join(self.chain_certs_dir,
+ "ch1_ta3_cert.pem"),
+ "ta": ta_path,
+ })
+
+ self.pkgsign(self.rurl2, sign_args)
+ self.pkgrecv(self.rurl2, "-d {0} example_pkg".format(self.durl1))
+ self.pkg("contents -g {0} -m example_pkg".format(self.durl1))
+ self.assertTrue("pkg.hash.sha256" not in self.output)
+ self.image_create(self.durl1)
+ self.seed_ta_dir("ta3")
+ self.pkg("set-property signature-policy verify")
+ self.pkg("install example_pkg")
+ self.image_destroy()
+
+ self.dcs[4].set_debug_feature("hash=sha1+sha256")
+ self.dcs[4].start()
+ self.image_create(self.durl4, destroy=True)
+ # pkgrecv to a new repository which enables SHA-2 hashes
+ self.pkgrecv(self.durl1, "-d {0} example_pkg".format(self.durl4))
+ self.pkg("contents -g {0} -m example_pkg".format(self.durl4))
+ # make sure that we don not get multiple hashes
+ self.assertTrue("pkg.hash.sha256" not in self.output)
+ self.seed_ta_dir("ta3")
+ self.pkg("set-property signature-policy verify")
+ # should not invalidate the signature
+ self.pkg("install example_pkg")
+
+ self.dcs[4].stop()
+ self.dcs[4].unset_debug_feature("hash=sha1+sha256")
+
if __name__ == "__main__":
unittest.main()
--- a/src/util/publish/pkgmerge.py Fri Jun 24 10:34:56 2016 -0700
+++ b/src/util/publish/pkgmerge.py Fri Jun 24 18:04:04 2016 -0700
@@ -412,13 +412,9 @@
patterns=processdict[entry]))
continue
- # we're ready to merge
- if not dry_run:
- target_pub = transport.setup_publisher(dest_repo,
- pub.prefix, dest_xport, dest_xport_cfg,
- remote_prefix=True)
- else:
- target_pub = None
+ target_pub = transport.setup_publisher(dest_repo,
+ pub.prefix, dest_xport, dest_xport_cfg,
+ remote_prefix=True)
tracker.republish_set_goal(len(processdict), 0, 0)
# republish packages for this publisher. If we encounter any
@@ -504,10 +500,8 @@
# Determine total bytes to send for this package; this must be
# done using the manifest since retrievals are coalesced based
# on hash, but sends are not.
- sendbytes = sum(
- int(a.attrs.get("pkg.size", 0))
- for a in man.gen_actions()
- )
+ sendbytes = dest_xport.get_transfer_size(target_pub,
+ man.gen_actions())
f = man.fmri