--- a/src/modules/client/transport/transport.py Thu Aug 12 09:48:48 2010 -0700
+++ b/src/modules/client/transport/transport.py Mon Aug 16 16:48:50 2010 -0700
@@ -890,12 +890,13 @@
if not self.__engine:
self.__setup()
- for d in self.__gen_repo(pub, retry_count):
+ for d, v in self.__gen_repo(pub, retry_count, operation="file",
+ versions=[0, 1]):
url = d.get_url()
try:
- resp = d.get_datastream(fhash, header,
+ resp = d.get_datastream(fhash, v, header,
ccancel=ccancel)
s = cStringIO.StringIO()
hash_val = misc.gunzip_from_stream(resp, s)
@@ -1385,7 +1386,8 @@
# present.
cache = cache[0]
- for d in self.__gen_repo(pub, retry_count):
+ for d, v in self.__gen_repo(pub, retry_count, operation="file",
+ versions=[0, 1]):
failedreqs = []
repostats = self.stats[d.get_url()]
@@ -1397,7 +1399,7 @@
# unless we want to supress a permanant failure.
try:
errlist = d.get_files(filelist, download_dir,
- progtrack, header)
+ progtrack, v, header)
except tx.ExcessiveTransientFailure, ex:
# If an endpoint experienced so many failures
# that we just gave up, record this for later
@@ -2001,8 +2003,7 @@
@LockedTransport()
def publish_add(self, pub, action=None, trans_id=None):
"""Perform the 'add' publication operation to the publisher
- supplied in pub. The caller should include the action in the
- action argument. The transaction-id is passed in trans_id."""
+ supplied in pub. The transaction-id is passed in trans_id."""
failures = tx.TransportFailures()
retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
@@ -2032,6 +2033,44 @@
raise failures
@LockedTransport()
+ def publish_add_file(self, pub, pth, trans_id=None):
+ """Perform the 'add_file' publication operation to the publisher
+ supplied in pub. The caller should include the action in the
+ action argument. The transaction-id is passed in trans_id."""
+
+ failures = tx.TransportFailures()
+ retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+ header = self.__build_header(uuid=self.__get_uuid(pub))
+
+ # Call setup if the transport isn't configured or was shutdown.
+ if not self.__engine:
+ self.__setup()
+
+ repo_found = False
+ for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
+ single_repository=True, operation="file", versions=[1]):
+ repo_found = True
+ try:
+ d.publish_add_file(pth, header=header,
+ trans_id=trans_id)
+ return
+ except tx.ExcessiveTransientFailure, ex:
+ # If an endpoint experienced so many failures
+ # that we just gave up, grab the list of
+ # failures that it contains
+ failures.extend(ex.failures)
+ except tx.TransportException, e:
+ if e.retryable:
+ failures.append(e)
+ else:
+ raise
+ if not repo_found:
+ raise apx.UnsupportedRepositoryOperation(pub,
+ "file/1")
+
+ raise failures
+
+ @LockedTransport()
def publish_abandon(self, pub, trans_id=None):
"""Perform an 'abandon' publication operation to the
publisher supplied in the pub argument. The caller should
@@ -2141,6 +2180,46 @@
raise failures
@LockedTransport()
+ def publish_append(self, pub, client_release=None, pkg_name=None):
+ """Perform an 'append' transaction to start a publication
+ transaction to the publisher named in pub. The caller should
+ supply the client's OS release in client_release, and the
+ package's name in pkg_name."""
+
+ failures = tx.TransportFailures()
+ retry_count = global_settings.PKG_CLIENT_MAX_TIMEOUT
+ header = self.__build_header(uuid=self.__get_uuid(pub))
+
+ # Call setup if transport isn't configured, or was shutdown.
+ if not self.__engine:
+ self.__setup()
+
+ repo_found = False
+ for d, v in self.__gen_repo(pub, retry_count, origin_only=True,
+ single_repository=True, operation="append", versions=[0]):
+ repo_found = True
+ try:
+ trans_id = d.publish_append(header=header,
+ client_release=client_release,
+ pkg_name=pkg_name)
+ return trans_id
+ except tx.ExcessiveTransientFailure, ex:
+ # If an endpoint experienced so many failures
+ # that we just gave up, grab the list of
+ # failures that it contains
+ failures.extend(ex.failures)
+ except tx.TransportException, e:
+ if e.retryable:
+ failures.append(e)
+ else:
+ raise
+ if not repo_found:
+ raise apx.UnsupportedRepositoryOperation(pub,
+ "append/0")
+
+ raise failures
+
+ @LockedTransport()
def publish_refresh_index(self, pub):
"""Instructs the repositories named by Publisher pub
to refresh their index."""