551 IPS should handle socket errors
authorjohansen <johansen@sun.com>
Thu, 23 Oct 2008 18:34:12 -0700
changeset 621 6c144915eed1
parent 620 9e0f5eeaf2e0
child 622 f3ab1c2e6855
551 IPS should handle socket errors 2894 URLError handlers should watch for retryable socket errors 3441 httplib raises ValueError instead of IncompleteRead 3480 errors while decompressing partial file 3724 manifest truncated during download 3768 Data truncation upon unexpected stream closure 3878 IncompleteRead during network I/O 3917 os.stat results should be accessed as object 4105 Greater rigor required when handling unsolicited content
.hgignore
src/client.py
src/modules/actions/file.py
src/modules/catalog.py
src/modules/client/filelist.py
src/modules/client/image.py
src/modules/client/retrieve.py
src/modules/misc.py
src/modules/search_storage.py
src/modules/server/transaction.py
src/modules/updatelog.py
--- a/.hgignore	Thu Oct 23 16:42:10 2008 -0700
+++ b/.hgignore	Thu Oct 23 18:34:12 2008 -0700
@@ -36,4 +36,5 @@
 ^src/util/distro-import/proto/
 ^src/util/distro-import/redist_cluster$
 ^src/util/distro-import/slim_cluster$
+^src/util/misc/extract_hostid$
 ^webrev
--- a/src/client.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/client.py	Thu Oct 23 18:34:12 2008 -0700
@@ -82,6 +82,7 @@
 from pkg.client.history import RESULT_FAILED_STORAGE
 from pkg.client.retrieve import ManifestRetrievalError
 from pkg.client.retrieve import DatastreamRetrievalError
+from pkg.client.filelist import FileListRetrievalError
 
 CLIENT_API_VERSION = 2
 PKG_CLIENT_NAME = "pkg"
@@ -2137,7 +2138,7 @@
                       (global_settings.PKG_TIMEOUT_MAX, __e))
                 __ret = 1
         except (ManifestRetrievalError,
-            DatastreamRetrievalError), __e:
+            DatastreamRetrievalError, FileListRetrievalError), __e:
                 if __img:
                         __img.history.abort(RESULT_FAILED_TRANSPORT)
                 error(_("An error was encountered while attempting to retrieve"
--- a/src/modules/actions/file.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/modules/actions/file.py	Thu Oct 23 18:34:12 2008 -0700
@@ -1,4 +1,4 @@
-#!/usr/bin/python2.4
+#!/usr/bin/python2.dir=pat4
 #
 # CDDL HEADER START
 #
@@ -34,7 +34,7 @@
 import errno
 import sha
 import tempfile
-from stat import *
+import stat
 import generic
 import pkg.misc as misc
 import pkg.portable as portable
@@ -200,7 +200,7 @@
                 errors = []
 
                 try:
-                        stat = os.lstat(path)
+                        fs = os.lstat(path)
                 except OSError, e:
                         if e.errno == errno.ENOENT:
                                 self.replace_required = True
@@ -219,26 +219,26 @@
                         errors.append("Warning: package may contain bobcat!  "
                             "(http://xkcd.com/325/)")
 
-                if not S_ISREG(stat[ST_MODE]):
+                if not stat.S_ISREG(fs.st_mode):
                         errors.append("%s is not a regular file" % self.attrs["path"])
                         self.replace_required = True
 
-                if stat[ST_UID] != owner:
+                if fs.st_uid != owner:
                         errors.append("Owner: '%s' should be '%s'" % \
-                            (img.get_name_by_uid(stat[ST_UID], True),
+                            (img.get_name_by_uid(fs.st_uid, True),
                              img.get_name_by_uid(owner, True)))
-                if stat[ST_GID] != group:
+                if fs.st_gid != group:
                         errors.append("Group: '%s' should be '%s'" % \
-                            (img.get_name_by_gid(stat[ST_GID], True),
+                            (img.get_name_by_gid(fs.st_gid, True),
                              img.get_name_by_gid(group, True)))
-                if S_IMODE(stat[ST_MODE]) != mode:
+                if stat.S_IMODE(fs.st_mode) != mode:
                         errors.append("Mode: 0%.3o should be 0%.3o" % \
-                            (S_IMODE(stat[ST_MODE]), mode))
+                            (stat.S_IMODE(fs.st_mode), mode))
 
-                if "timestamp" in self.attrs and stat[ST_MTIME] != \
+                if "timestamp" in self.attrs and fs.st_mtime != \
                     misc.timestamp_to_time(self.attrs["timestamp"]):
                         errors.append("Timestamp: %s should be %s" %
-                            (misc.time_to_timestamp(stat[ST_MTIME]), 
+                            (misc.time_to_timestamp(fs.st_mtime), 
                             self.attrs["timestamp"]))
                              
                 # avoid checking pkg.size if elfhash present;
@@ -246,9 +246,9 @@
                 if "preserve" not in self.attrs and \
                     "pkg.size" in self.attrs and    \
                     "elfhash" not in self.attrs and \
-                    stat[ST_SIZE] != int(self.attrs["pkg.size"]):
+                    fs.st_size != int(self.attrs["pkg.size"]):
                         errors.append("Size: %d bytes should be %d" % \
-                            (stat[ST_SIZE], int(self.attrs["pkg.size"])))
+                            (fs.st_size, int(self.attrs["pkg.size"])))
 
                 if "preserve" in self.attrs:
                         return errors
@@ -330,7 +330,7 @@
 
                 try:
                         # Make file writable so it can be deleted
-                        os.chmod(path, S_IWRITE|S_IREAD)
+                        os.chmod(path, stat.S_IWRITE|stat.S_IREAD)
                         portable.remove(path)
                 except OSError,e:
                         if e.errno != errno.ENOENT:
--- a/src/modules/catalog.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/modules/catalog.py	Thu Oct 23 18:34:12 2008 -0700
@@ -38,6 +38,7 @@
 import pkg.misc as misc
 import pkg.version as version
 import pkg.portable as portable
+from pkg.misc import TruncatedTransferException
 
 class CatalogException(Exception):
         def __init__(self, args=None):
@@ -111,6 +112,7 @@
                 self.renamed = None
                 self.pkg_root = pkg_root
                 self.read_only = read_only
+                self.__size = -1
 
                 assert not (read_only and rebuild)
 
@@ -183,6 +185,10 @@
 
                 portable.rename(tmpfile, pathstr)
 
+                # Catalog size has changed, force recalculation on
+                # next send()
+                self.__size = -1
+
                 self.catalog_lock.release()
 
                 self.attrs["npkgs"] += 1
@@ -501,21 +507,31 @@
                 return self.attrs.get("origin", None)
 
         @staticmethod
-        def recv(filep, path, auth=None):
+        def recv(filep, path, auth=None, content_size=-1):
                 """A static method that takes a file-like object and
                 a path.  This is the other half of catalog.send().  It
                 reads a stream as an incoming catalog and lays it down
-                on disk."""
+                on disk. Content_size is the size in bytes, if known,
+                of the transfer that is being received.  The default
+                value of -1 means that the size is not known."""
+
+                size = 0
 
                 if not os.path.exists(path):
                         os.makedirs(path)
 
-                attrf = file(os.path.normpath(
-                    os.path.join(path, "attrs")), "w+")
-                catf = file(os.path.normpath(
-                    os.path.join(path, "catalog")), "w+")
+                afd, attrpath = tempfile.mkstemp(dir=path)
+                cfd, catpath = tempfile.mkstemp(dir=path)
+
+                attrf = os.fdopen(afd, "w")
+                catf = os.fdopen(cfd, "w")
+
+                attrpath_final = os.path.normpath(os.path.join(path, "attrs"))
+                catpath_final = os.path.normpath(os.path.join(path, "catalog"))
 
                 for s in filep:
+                        size += len(s)
+
                         if not s[1].isspace():
                                 continue
                         elif not s[0] in known_prefixes:
@@ -531,6 +547,19 @@
                                 catf.write("%s %s %s %s\n" %
                                     (s[0], "pkg", f.pkg_name, f.version))
 
+                # Check that content was properly received before
+                # modifying any files.
+                if content_size > -1 and size != content_size:
+                        url = None
+                        if hasattr(filep, "geturl") and callable(filep.geturl):
+                                url = filep.geturl()
+                        attrf.close()
+                        catf.close()
+                        os.remove(attrpath)
+                        os.remove(catpath)
+                        raise TruncatedTransferException(url, size,
+                            content_size)
+
                 # Write the authority's origin into our attributes
                 if auth:
                         origstr = "S origin: %s\n" % auth["origin"]
@@ -539,6 +568,9 @@
                 attrf.close()
                 catf.close()
 
+                os.rename(attrpath, attrpath_final)
+                os.rename(catpath, catpath_final)
+
         def rename_package(self, srcname, srcvers, destname, destvers):
                 """Record that the name of package oldname has been changed
                 to newname as of version vers.  Returns a timestamp
@@ -586,6 +618,9 @@
                 pfile.write("%s\n" % rr)
                 pfile.close()
 
+                # Recalculate size on next send()
+                self.__size = -1
+
                 self.renamed.append(rr)
 
                 ts = datetime.datetime.now()
@@ -685,12 +720,18 @@
                         s = "S %s: %s\n" % (a, self.attrs[a])
                         afile.write(s)
 
+                # Recalculate size on next send()
+                self.__size = -1
+
                 afile.close()
 
-        def send(self, filep):
+        def send(self, filep, rspobj=None):
                 """Send the contents of this catalog out to the filep
                 specified as an argument."""
 
+                if rspobj is not None:
+                        rspobj.headers['Content-Length'] = str(self.size())
+
                 def output():
                         # Send attributes first.
                         for line in self.attrs_as_lines():
@@ -732,6 +773,34 @@
 
                 self.save_attrs()
 
+
+        def size(self):
+                """Return the size in bytes of the catalog and attributes."""
+
+                if self.__size < 0:
+                        try:
+                                attr_stat = os.stat(os.path.normpath(
+                                    os.path.join(self.catalog_root, "attrs")))
+                                attr_sz = attr_stat.st_size
+                        except OSError, e:
+                                if e.errno == errno.ENOENT:
+                                        attr_sz = 0
+                                else:
+                                        raise
+                        try:
+                                cat_stat =  os.stat(os.path.normpath(
+                                    os.path.join(self.catalog_root, "catalog")))
+                                cat_sz = cat_stat.st_size
+                        except OSError, e:
+                                if e.errno == errno.ENOENT:
+                                        cat_sz = 0
+                                else:
+                                        raise
+
+                        self.__size = attr_sz + cat_sz
+
+                return self.__size
+
         def valid_new_fmri(self, pfmri):
                 """Check that the fmri supplied as an argument would be
                 valid to add to the catalog.  This checks to make sure that
--- a/src/modules/client/filelist.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/modules/client/filelist.py	Thu Oct 23 18:34:12 2008 -0700
@@ -32,12 +32,14 @@
 import socket
 import time
 import sha
+import zlib
 from tarfile import ReadError
 
 import pkg.pkgtarfile as ptf
 import pkg.portable as portable
 import pkg.fmri
 import pkg.client.api_errors as api_errors
+import pkg.misc as misc
 from pkg.client import global_settings
 from pkg.misc import versioned_urlopen
 from pkg.misc import hash_file_name
@@ -47,7 +49,9 @@
 from pkg.misc import TransferTimedOutException
 from pkg.misc import TransferContentException
 from pkg.misc import InvalidContentException
+from pkg.misc import TruncatedTransferException
 from pkg.misc import retryable_http_errors
+from pkg.misc import retryable_socket_errors
 
 class FileList(object):
         """A FileList maintains mappings between files and Actions.
@@ -278,27 +282,31 @@
 
                 # Now that the file has been successfully extracted, move
                 # it to the cached content directory.
+                dl_path = os.path.join(download_dir, hashval)
                 final_path = os.path.normpath(os.path.join(completed_dir,
                     hash_file_name(hashval)))
 
-                if not os.path.exists(os.path.dirname(final_path)):
-                        os.makedirs(os.path.dirname(final_path))
-
-                portable.rename(os.path.join(download_dir, hashval), final_path)
-
-                # assign opener to actions in the list
+                # Check that hashval is in the list of files we requested
                 try:
                         l = self.fhash[hashval]
                 except KeyError:
                         # If the key isn't in the dictionary, the server sent us
                         # a file we didn't ask for.  In this case, we can't
-                        # create an opener for it, nor should we leave it in the
-                        # cache.
-                        os.remove(final_path)
+                        # create an opener for it, nor should we hold onto it.
+                        os.remove(dl_path)
                         return
 
-                self._verify_content(l[0], final_path)
+                # Verify downloaded content
+                self._verify_content(l[0], dl_path)
+
+                if not os.path.exists(os.path.dirname(final_path)):
+                        os.makedirs(os.path.dirname(final_path))
 
+                # Content has been verified and was requested from server.
+                # Move into content cache
+                portable.rename(dl_path, final_path)
+
+                # assign opener to actions in the list
                 for action in l:
                         action.data = self._make_opener(final_path)
 
@@ -349,19 +357,52 @@
                             data=req_str, ssl_creds=self.ssl_tuple,
                             imgtype=self.image.type, uuid=self.uuid)
                 except RuntimeError:
-                        raise FileListException, "No server-side support" 
+                        raise FileListRetrievalError, "No server-side support" 
                 except urllib2.HTTPError, e:
                         # Must check for HTTPError before URLError
+                        self.image.cleanup_downloads()
                         if e.code in retryable_http_errors:
                                 raise TransferTimedOutException(url_prefix,
-                                    str(e.code))
-                        raise
+                                    "%d - %s" % (e.code, e.msg))
+
+                        raise FileListRetrievalError("Could not retrieve"
+                            " filelist from '%s'\nHTTPError code: %d - %s" % 
+                            (url_prefix, e.code, e.msg))
                 except urllib2.URLError, e:
-                        if len(e.args) == 1 and \
-                            isinstance(e.args[0], socket.timeout):
-                                self.image.cleanup_downloads()
-                                raise TransferTimedOutException(url_prefix)
+                        self.image.cleanup_downloads()
+                        if isinstance(e.args[0], socket.timeout):
+                                raise TransferTimedOutException(url_prefix,
+                                    e.reason)
+                        elif isinstance(e.args[0], socket.error):
+                                sockerr = e.args[0]
+                                if isinstance(sockerr.args, tuple) and \
+                                    sockerr.args[0] in retryable_socket_errors:
+                                        raise TransferContentException(
+                                            url_prefix,
+                                            "Retryable socket error: %s" %
+                                            e.reason)
+                                else:
+                                        raise FileListRetrievalError(
+                                            "Could not retrieve filelist from"
+                                            " '%s'\nURLError, reason: %s" %
+                                            (url_prefix, e.reason))
+
+                        raise FileListRetrievalError("Could not retrieve"
+                            " filelist from '%s'\nURLError reason: %d" % 
+                            (url_prefix, e.reason))
+                except (ValueError, httplib.IncompleteRead):
+                        self.image.cleanup_downloads()
+                        raise TransferContentException(url_prefix,
+                            "Incomplete Read from remote host")
+                except KeyboardInterrupt:
+                        self.image.cleanup_downloads()
                         raise
+                except Exception, e:
+                        self.image.cleanup_downloads()
+                        raise FileListRetrievalError("Could not retrieve"
+                            " filelist from '%s'\nException: str:%s repr:%s" %
+                            (url_prefix, e, repr(e)))
+
 
                 # Exception handling here is a bit complicated.  The finally
                 # block makes sure we always close our file objects.  If we get
@@ -379,10 +420,32 @@
                         except socket.timeout, e:
                                 self.image.cleanup_downloads()
                                 raise TransferTimedOutException(url_prefix)
+                        except socket.error, e:
+                                self.image.cleanup_downloads()
+                                if isinstance(e.args, tuple) and \
+                                    e.args[0] in retryable_socket_errors:
+                                        raise TransferContentException(
+                                            url_prefix,
+                                            "Retryable socket error: %s" % e)
+                                else:
+                                        raise FileListRetrievalError(
+                                            "Could not retrieve filelist from"
+                                            " '%s'\nsocket error, reason: %s" %
+                                            (url_prefix, e))
+                        except (ValueError, httplib.IncompleteRead):
+                                self.image.cleanup_downloads()
+                                raise TransferContentException(url_prefix,
+                                    "Incomplete Read from remote host")
                         except ReadError:
+                                self.image.cleanup_downloads()
                                 raise TransferContentException(url_prefix,
                                     "Read error on tar stream")
-
+                        except EnvironmentError, e:
+                                self.image.cleanup_downloads()
+                                raise FileListRetrievalError(
+                                    "Could not retrieve filelist from '%s'\n"
+                                    "Exception: str:%s repr:%s" %
+                                    (url_prefix, e, repr(e)))
                 finally:
                         if tar_stream:
                                 tar_stream.close()
@@ -439,7 +502,29 @@
                 remove the file and raise an InvalidContentException."""
 
                 chash = action.attrs.get("chash", None)
+                path = action.attrs.get("path", None)
                 if not chash:
+                        # Compressed hash doesn't exist.  Decompress and
+                        # generate hash of uncompressed content.
+                        ifile = open(filepath, "rb")
+                        ofile = open(os.devnull, "wb")
+
+                        try:
+                                hash = misc.gunzip_from_stream(ifile, ofile)
+                        except zlib.error, e:
+                                os.remove(filepath)
+                                raise InvalidContentException(path,
+                                    "zlib.error:%s" %
+                                    (" ".join([str(a) for a in e.args])))
+
+                        ifile.close()
+                        ofile.close()
+
+                        if action.hash != hash:
+                                os.remove(filepath)
+                                raise InvalidContentException(action.path,
+                                    "hash failure:  expected: %s"
+                                    "computed: %s" % (action.hash, hash))
                         return
 
                 cfile = open(filepath, "rb")
@@ -451,12 +536,24 @@
 
                 if chash != newhash:
                        os.remove(filepath)
-                       raise InvalidContentException(action, newhash)
+                       raise InvalidContentException(path,
+                           "chash failure: expected: %s computed: %s" %
+                           (chash, newhash))
 
 
 class FileListException(Exception):
         def __init__(self, args=None):
+                Exception.__init__(self)
                 self.args = args
 
 class FileListFullException(FileListException):
         pass
+
+class FileListRetrievalError(FileListException):
+        """Used when filelist retrieval fails"""
+        def __init__(self, data):
+                FileListException.__init__(self)
+                self.data = data
+
+        def __str__(self):
+                return str(self.data)
--- a/src/modules/client/image.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/modules/client/image.py	Thu Oct 23 18:34:12 2008 -0700
@@ -1275,6 +1275,25 @@
                                 dependents.extend(self.__req_dependents[f])
                 return dependents
 
+        def _do_get_catalog(self, auth, hdr, ts):
+                """An internal method that is a wrapper around get_catalog.
+                This handles retryable exceptions and timeouts."""
+
+                retry_count = global_settings.PKG_TIMEOUT_MAX
+                failures = TransportFailures()
+                success = False
+
+                while not success:
+                        try:
+                                success = retrieve.get_catalog(self, auth,
+                                    hdr, ts)
+                        except TransportException, e:
+                                retry_count -= 1
+                                failures.append(e)
+
+                                if retry_count <= 0:
+                                        raise failures
+
         def retrieve_catalogs(self, full_refresh = False,
             auths = None):
                 failed = []
@@ -1308,38 +1327,11 @@
                         else:
                                 hdr = {}
 
-                        ssl_tuple = self.get_ssl_credentials(auth["prefix"])
-
-                        # XXX Mirror selection and retrieval policy?
                         try:
-                                c, v = versioned_urlopen(auth["origin"],
-                                    "catalog", [0], ssl_creds=ssl_tuple,
-                                    headers=hdr, imgtype=self.type,
-                                    uuid=self.get_uuid(auth["prefix"]))
-                        except urllib2.HTTPError, e:
-                                # Server returns NOT_MODIFIED if catalog is up
-                                # to date
-                                if e.code == httplib.NOT_MODIFIED:
-                                        succeeded += 1
-                                else:
-                                        failed.append((auth, e))
-                                continue
-
-                        except urllib2.URLError, e:
+                                self._do_get_catalog(auth, hdr, ts)
+                        except retrieve.CatalogRetrievalError, e:
                                 failed.append((auth, e))
-                                continue
-                        except ValueError, e:
-                                failed.append((auth, e))
-                                continue
-
-                        # root for this catalog
-                        croot = "%s/catalog/%s" % (self.imgdir, auth["prefix"])
-
-                        try:
-                                updatelog.recv(c, croot, ts, auth)
-                        except IOError, e:
-                                failed.append((auth, e))
-                        except socket.timeout, e:
+                        except TransportFailures, e:
                                 failed.append((auth, e))
                         else:
                                 succeeded += 1
--- a/src/modules/client/retrieve.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/modules/client/retrieve.py	Thu Oct 23 18:34:12 2008 -0700
@@ -24,13 +24,27 @@
 #
 
 import socket
+import httplib
 import urllib2
 
 import pkg.fmri
 import pkg.client.imagestate as imagestate
+import pkg.updatelog as updatelog
 from pkg.misc import versioned_urlopen
 from pkg.misc import TransferTimedOutException
+from pkg.misc import TruncatedTransferException
+from pkg.misc import TransferContentException
 from pkg.misc import retryable_http_errors
+from pkg.misc import retryable_socket_errors
+
+class CatalogRetrievalError(Exception):
+        """Used when catalog retrieval fails"""
+        def __init__(self, data):
+                Exception.__init__(self)
+                self.data = data
+
+        def __str__(self):
+                return str(self.data)
 
 class ManifestRetrievalError(Exception):
         """Used when manifest retrieval fails"""
@@ -52,6 +66,89 @@
 
 # client/retrieve.py - collected methods for retrieval of pkg components
 # from repositories
+
+def get_catalog(img, auth, hdr, ts):
+        """Get a catalog from a remote host.  Img is the image object
+        that we're updating.  Auth is the authority from which the
+        catalog will be retrieved.  Additional headers are contained
+        in hdr.  Ts is the timestamp if we're performing an incremental
+        catalog operation."""
+
+        prefix = auth["prefix"]
+        ssl_tuple = img.get_ssl_credentials(prefix)
+
+        try:
+                c, v = versioned_urlopen(auth["origin"],
+                    "catalog", [0], ssl_creds=ssl_tuple,
+                    headers=hdr, imgtype=img.type,
+                    uuid=img.get_uuid(prefix))
+        except urllib2.HTTPError, e:
+                # Server returns NOT_MODIFIED if catalog is up
+                # to date
+                if e.code == httplib.NOT_MODIFIED:
+                        # success
+                        return True
+                elif e.code in retryable_http_errors:
+                        raise TransferTimedOutException(prefix, "%d - %s" %
+                            (e.code, e.msg))
+
+                raise CatalogRetrievalError("Could not retrieve catalog from"
+                    " '%s'\nHTTPError code: %d - %s" % (prefix, e.code, e.msg))
+        except urllib2.URLError, e:
+                if isinstance(e.args[0], socket.sslerror):
+                        raise RuntimeError, e
+                elif isinstance(e.args[0], socket.timeout):
+                        raise TransferTimedOutException(prefix, e.reason)
+                elif isinstance(e.args[0], socket.error):
+                        sockerr = e.args[0]
+                        if isinstance(sockerr.args, tuple) and \
+                            sockerr.args[0] in retryable_socket_errors:
+                                raise TransferContentException(prefix,
+                                    "Retryable socket error: %s" % e.reason)
+                        else:
+                                raise CatalogRetrievalError(
+                                    "Could not retrieve catalog from"
+                                    " '%s'\nURLError, reason: %s" %
+                                    (prefix, e.reason))
+
+                raise CatalogRetrievalError("Could not retrieve catalog from"
+                    " '%s'\nURLError, reason: %s" % (prefix, e.reason))
+        except (ValueError, httplib.IncompleteRead):
+                raise TransferContentException(prefix,
+                    "Incomplete Read from remote host")
+        except KeyboardInterrupt:
+                raise
+        except Exception, e:
+                raise CatalogRetrievalError("Could not retrieve catalog "
+                    "from '%s'\nException: str:%s repr:%r" % (prefix,
+                    e, e))
+
+        # root for this catalog
+        croot = "%s/catalog/%s" % (img.imgdir, prefix)
+
+        try:
+                updatelog.recv(c, croot, ts, auth)
+        except (ValueError, httplib.IncompleteRead):
+                raise TransferContentException(prefix,
+                    "Incomplete Read from remote host")
+        except socket.timeout, e:
+                raise TransferTimedOutException(prefix)
+        except socket.error, e:
+                if isinstance(e.args, tuple) \
+                     and e.args[0] in retryable_socket_errors:
+                        raise TransferContentException(prefix,
+                            "Retryable socket error: %s" % e)
+                else:
+                        raise CatalogRetrievalError("Could not retrieve catalog"
+                            " from '%s'\nsocket error, reason: %s" %
+                            (prefix, e))
+        except EnvironmentError, e:
+                raise CatalogRetrievalError("Could not retrieve catalog "
+                    "from '%s'\nException: str:%s repr:%r" % (prefix,
+                    e, e))
+ 
+        return True
+
 def __get_intent_str(img, fmri):
         """Returns a string representing the intent of the client in retrieving
         information based on the operation information provided by the image
@@ -173,21 +270,21 @@
                     ssl_creds=ssl_tuple, imgtype=img.type, uuid=uuid)[0]
         except urllib2.HTTPError, e:
                 raise DatastreamRetrievalError("Could not retrieve file '%s'\n"
-                    "from '%s'\nHTTPError, code:%s" %
-                    (hash, url_prefix, e.code))
+                    "from '%s'\nHTTPError, code: %d" %
+                    (fhash, url_prefix, e.code))
         except urllib2.URLError, e:
-                if len(e.args) == 1 and isinstance(e.args[0], socket.sslerror):
+                if isinstance(e.args[0], socket.sslerror):
                         raise RuntimeError, e
 
                 raise DatastreamRetrievalError("Could not retrieve file '%s'\n"
-                    "from '%s'\nURLError args:%s" % (hash, url_prefix,
+                    "from '%s'\nURLError args:%s" % (fhash, url_prefix,
                     " ".join([str(a) for a in e.args])))
         except KeyboardInterrupt:
                 raise
         except Exception, e:
                 raise DatastreamRetrievalError("Could not retrieve file '%s'\n"
-                    "from '%s'\nException: str:%s repr:%s" %
-                    (fmri.get_url_path(), url_prefix, e, repr(e)))
+                    "from '%s'\nException: str:%s repr:%r" %
+                    (fmri.get_url_path(), url_prefix, e, e))
 
         return f
 
@@ -224,34 +321,70 @@
                 m = __get_manifest(img, fmri, "GET")
         except urllib2.HTTPError, e:
                 if e.code in retryable_http_errors:
-                        raise TransferTimedOutException(url_prefix, str(e.code))
+                        raise TransferTimedOutException(url_prefix, "%d - %s" %
+                            (e.code, e.msg))
 
-                raise ManifestRetrievalError("Could not retrieve manifest '%s'\n"
-                    "from '%s'\nHTTPError code:%s" %
-                    (fmri.get_url_path(), url_prefix, e.code))
+                raise ManifestRetrievalError("Could not retrieve manifest"
+                    " '%s' from '%s'\nHTTPError code: %d - %s" % 
+                    (fmri.get_url_path(), url_prefix, e.code, e.msg))
         except urllib2.URLError, e:
-                if len(e.args) == 1 and isinstance(e.args[0], socket.sslerror):
+                if isinstance(e.args[0], socket.sslerror):
                         raise RuntimeError, e
-                elif len(e.args) == 1 and isinstance(e.args[0], socket.timeout):
+                elif isinstance(e.args[0], socket.timeout):
                         raise TransferTimedOutException(url_prefix, e.reason)
+                elif isinstance(e.args[0], socket.error):
+                        sockerr = e.args[0]
+                        if isinstance(sockerr.args, tuple) and \
+                            sockerr.args[0] in retryable_socket_errors:
+                                raise TransferContentException(url_prefix,
+                                    "Retryable socket error: %s" % e.reason)
+                        else:
+                                raise ManifestRetrievalError(
+                                    "Could not retrieve manifest from"
+                                    " '%s'\nURLError, reason: %s" %
+                                    (url_prefix, e.reason))
 
-                raise ManifestRetrievalError("Could not retrieve manifest '%s'\n"
-                    "from '%s'\nURLError, args:%s" % (fmri.get_url_path(),
-                    url_prefix, " ".join([str(a) for a in e.args])))
+                raise ManifestRetrievalError("Could not retrieve manifest"
+                    " '%s' from '%s'\nURLError, reason: %s" %
+                    (fmri.get_url_path(), url_prefix, e.reason))
+        except (ValueError, httplib.IncompleteRead):
+                raise TransferContentException(url_prefix,
+                    "Incomplete Read from remote host")
         except KeyboardInterrupt:
                 raise
         except Exception, e:
-                raise ManifestRetrievalError("Could not retrieve manifest '%s'\n"
-                    "from '%s'\nException: str:%s repr:%s" %
-                    (fmri.get_url_path(), url_prefix, e, repr(e)))
+                raise ManifestRetrievalError("Could not retrieve manifest"
+                    " '%s' from '%s'\nException: str:%s repr:%r" %
+                    (fmri.get_url_path(), url_prefix, e, e))
+
+        cl_size = int(m.info().getheader("Content-Length", "-1"))
 
         try:
-                return m.read()
+                mfst = m.read()
+                mfst_len = len(mfst)
         except socket.timeout, e:
-                raise TransferTimedOutException(url_prefix, str(e))
+                raise TransferTimedOutException(url_prefix)
+        except socket.error, e:
+                if isinstance(e.args, tuple) \
+                     and e.args[0] in retryable_socket_errors:
+                        raise TransferContentException(url_prefix,
+                            "Retryable socket error: %s" % e)
+                else:
+                        raise ManifestRetrievalError("Could not retrieve"
+                            " manifest from '%s'\nsocket error, reason: %s" %
+                            (url_prefix, e))
+        except (ValueError, httplib.IncompleteRead):
+                raise TransferContentException(url_prefix,
+                    "Incomplete Read from remote host")
         except EnvironmentError, e:
-                raise TransferContentException(url_prefix,
-                    "Read error retrieving manifest: %s" % e)
+                raise ManifestRetrievalError("Could not retrieve manifest"
+                    " '%s' from '%s'\nException: str:%s repr:%r" %
+                    (fmri.get_url_path(), url_prefix, e, e))
+
+        if cl_size > -1 and mfst_len != cl_size:
+                raise TruncatedTransferException(m.geturl(), mfst_len, cl_size)
+
+        return mfst
 
 def touch_manifest(img, fmri):
         """Perform a HEAD operation on the manifest for the given fmri.
--- a/src/modules/misc.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/modules/misc.py	Thu Oct 23 18:34:12 2008 -0700
@@ -224,10 +224,10 @@
         # Read the header
         magic = gz.read(2)
         if magic != "\037\213":
-                raise IOError, "Not a gzipped file"
+                raise zlib.error, "Not a gzipped file"
         method = ord(gz.read(1))
         if method != 8:
-                raise IOError, "Unknown compression method"
+                raise zlib.error, "Unknown compression method"
         flag = ord(gz.read(1))
         gz.read(6) # Discard modtime, extraflag, os
 
@@ -461,6 +461,8 @@
 
 
 class TransferTimedOutException(TransportException):
+        """Raised when the transfer times out, or is terminated with a
+        retryable error."""
         def __init__(self, url, reason=None):
                 TransportException.__init__(self)
                 self.url = url
@@ -489,9 +491,12 @@
 # XXX consider moving to pkg.client module
 retryable_http_errors = set((httplib.REQUEST_TIMEOUT, httplib.BAD_GATEWAY,
         httplib.GATEWAY_TIMEOUT))
+retryable_socket_errors = set((errno.ECONNABORTED, errno.ECONNRESET,
+        errno.ECONNREFUSED))
 
 
 class TransferContentException(TransportException):
+        """Raised when there are problems downloading the requested content."""
         def __init__(self, url, reason=None):
                 TransportException.__init__(self)
                 self.url = url
@@ -512,23 +517,56 @@
                         return r
                 return cmp(self.reason, other.reason)
 
+class TruncatedTransferException(TransportException):
+        """Raised when the transfer that was received doesn't match the
+        expected length."""
+        def __init__(self, url, recd=-1, expected=-1):
+                TransportException.__init__(self)
+                self.url = url
+                self.recd = recd
+                self.expected = expected
+
+        def __str__(self):
+                s = "Transfer from '%s' unexpectedly terminated" % self.url
+                if self.recd > -1 and self.expected > -1:
+                        s += ": received %d of %d bytes" % (self.recd,
+                            self.expected)
+                s += "."
+                return s
+
+        def __cmp__(self, other):
+                if not isinstance(other, TruncatedTransferException):
+                        return -1        
+                r = cmp(self.url, other.url)
+                if r != 0:
+                        return r
+                r = cmp(self.expected, other.expected)
+                if r != 0:
+                        return r
+                return cmp(self.recd, other.recd)
+
 
 class InvalidContentException(TransportException):
-        def __init__(self, action, hashval):
+        """Raised when the content's hash/chash doesn't verify, or the
+        content is received in an unreadable format."""
+        def __init__(self, path, data):
                 TransportException.__init__(self)
-                self.action = action
-                self.hashval = hashval
+                self.path = path
+                self.data = data
 
         def __str__(self):
-                s = "Action with path %s should have hash %s. Computed " \
-                    "hash %s instead." % (self.action.attrs["path"],
-                    self.action.attrs["chash"], self.hashval)
+                s = "Invalid content for action with path %s" % self.path
+                if self.data:
+                        s += " %s." % self.data
                 return s
 
         def __cmp__(self, other):
                 if not isinstance(other, InvalidContentException):
                         return -1        
-                return cmp(self.hashval, other.hashval)
+                r = cmp(self.path, other.path)
+                if r != 0:
+                        return r
+                return cmp(self.data, other.data)
 
 
 # Default maximum memory useage during indexing
--- a/src/modules/search_storage.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/modules/search_storage.py	Thu Oct 23 18:34:12 2008 -0700
@@ -28,7 +28,6 @@
 import os
 import errno
 import time
-import stat
 import sha
 import urllib
 
@@ -186,8 +185,8 @@
                 stat_info = os.stat(self._file_path)
                 if self._mtime != stat_info.st_mtime or \
                     self._size != stat_info.st_size:
-                        self._mtime = stat_info[stat.ST_MTIME]
-                        self._size = stat_info[stat.ST_SIZE]
+                        self._mtime = stat_info.st_mtime
+                        self._size = stat_info.st_size
                         return True
                 return False
 
--- a/src/modules/server/transaction.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/modules/server/transaction.py	Thu Oct 23 18:34:12 2008 -0700
@@ -33,7 +33,6 @@
 import datetime
 import calendar
 import urllib
-import stat
 
 import pkg.actions
 import pkg.fmri as fmri
@@ -325,7 +324,7 @@
                         # size and store that as an attribute in the manifest
                         # for the file.
                         fs = os.stat(opath)
-                        action.attrs["pkg.csize"] = str(fs[stat.ST_SIZE])
+                        action.attrs["pkg.csize"] = str(fs.st_size)
 
                         # Compute the SHA hash of the compressed file.
                         # Store this as the chash attribute of the file's
--- a/src/modules/updatelog.py	Thu Oct 23 16:42:10 2008 -0700
+++ b/src/modules/updatelog.py	Thu Oct 23 18:34:12 2008 -0700
@@ -31,6 +31,7 @@
 
 import pkg.fmri as fmri
 import pkg.catalog as catalog
+from pkg.misc import TruncatedTransferException
 
 class UpdateLogException(Exception):
         def __init__(self, args=None):
@@ -70,6 +71,7 @@
 
                 self.rootdir = update_root
                 self.logfd = None
+                self.log_filename = None
                 self.maxfiles = maxfiles
                 self.catalog = cat
                 self.close_time = None
@@ -77,6 +79,7 @@
                 self.last_update = None
                 self.curfiles = 0
                 self.logfiles = []
+                self.logfile_size = {}
 
                 if not os.path.exists(update_root):
                         os.makedirs(update_root)
@@ -120,6 +123,9 @@
                 self.logfd.write(logstr)
                 self.logfd.flush()
 
+                if self.log_filename in self.logfile_size:
+                        del self.logfile_size[self.log_filename]
+
                 self.last_update = ts
 
                 return ts
@@ -146,6 +152,9 @@
                 self.logfd.write(logstr)
                 self.logfd.flush()
 
+                if self.log_filename in self.logfile_size:
+                        del self.logfile_size[self.log_filename]
+
                 self.last_update = ts
 
                 return ts
@@ -162,6 +171,7 @@
 
                 self.close_time = ftime + delta
 
+                self.log_filename = filenm
                 self.logfd = file(os.path.join(self.rootdir, filenm), "a")
 
                 if filenm not in self.logfiles:
@@ -179,6 +189,7 @@
                 if self.logfd and self.close_time < datetime.datetime.now():
                         self.logfd.close()
                         self.logfd = None
+                        self.log_filename = None
                         self.close_time = 0
 
                 if self.curfiles < self.maxfiles:
@@ -192,6 +203,8 @@
                         filepath = os.path.join(self.rootdir, "%s" % r)
                         os.unlink(filepath)
                         self.curfiles -= 1
+                        if r in self.logfile_size:
+                                del self.logfile_size[r]
 
                 del self.logfiles[0:excess]
 
@@ -224,14 +237,16 @@
 
                 update_type = c.info().getheader("X-Catalog-Type", "full")
 
+                cl_size = int(c.info().getheader("Content-Length", "-1"))
+
                 if update_type == 'incremental':
-                        UpdateLog._recv_updates(c, path, ts)
+                        UpdateLog._recv_updates(c, path, ts, cl_size)
                 else:
-                        catalog.recv(c, path, auth)
+                        catalog.recv(c, path, auth, cl_size)
 
 
         @staticmethod
-        def _recv_updates(filep, path, cts):
+        def _recv_updates(filep, path, cts, content_size=-1):
                 """A static method that takes a file-like object,
                 a path, and a timestamp.  This is the other half of
                 send_updates().  It reads a stream as an incoming updatelog and
@@ -250,8 +265,12 @@
                 add_lines = []
                 unknown_lines = []
                 attrs = {}
+                size = 0
 
                 for s in filep:
+
+                        size += len(s)
+
                         l = s.split(None, 3)
                         if len(l) < 4:
                                 continue
@@ -297,6 +316,15 @@
                                                     (l[2], sf, sv, rf, rv)
                                                 add_lines.append(line)
 
+                # Check that content was properly received before
+                # modifying any files.
+                if content_size > -1 and size != content_size:
+                        url = None
+                        if hasattr(filep, "geturl") and callable(filep.geturl):
+                                url = filep.geturl()
+                        raise TruncatedTransferException(url, size,
+                            content_size)
+
                 # Verify that they aren't already in the catalog
                 catf = file(os.path.normpath(
                     os.path.join(path, "catalog")), "a+")
@@ -371,11 +399,11 @@
                         response.headers['Last-Modified'] = \
                             self.catalog.last_modified()
                         response.headers['X-Catalog-Type'] = 'incremental'
-                        return self._send_updates(ts, None)
+                        return self._send_updates(ts, None, response)
                 else:
                         # Not enough history, or full catalog requested
                         response.headers['X-Catalog-Type'] = 'full'
-                        return self.catalog.send(None)
+                        return self.catalog.send(None, response)
 
         def _gen_updates(self, ts):
                 """Look through the logs for updates that have occurred after
@@ -393,13 +421,12 @@
                 # to download full catalog.
                 #
                 # 3. Timestamp falls within a range for which update records
-                # exist.  If the timestamp is in the middle of a log-file, open
-                # that file, send updates newer than timestamp, and then send
-                # all newer files.  Otherwise, just send updates from the newer
-                # log files.
+                # exist.  If the timestamp is in the middle of a log-file, send
+                # the entire file -- the client will pick what entries to add.
+                # Then send all newer files.
 
                 rts = datetime.datetime(ts.year, ts.month, ts.day, ts.hour)
-                assert rts < ts
+                assert rts <= ts
 
                 # send data from logfiles newer or equal to rts
                 for lf in self.logfiles:
@@ -430,7 +457,7 @@
 
                         yield ret
 
-        def _send_updates(self, ts, filep):
+        def _send_updates(self, ts, filep, rspobj=None):
                 """Look through the logs for updates that have occurred after
                 timestamp.  Write these changes to the file-like object
                 supplied in filep."""
@@ -438,6 +465,9 @@
                 if self.up_to_date(ts) or not self.enough_history(ts):
                         return
 
+                if rspobj is not None:
+                        rspobj.headers['Content-Length'] = str(self.size(ts))
+
                 if filep:
                         for line in self._gen_updates(ts):
                                 filep.write(line)
@@ -481,6 +511,31 @@
                 self.first_update = datetime.datetime(
                     *time.strptime(self.logfiles[0], "%Y%m%d%H")[0:6])
 
+        def size(self, ts):
+                """Return the size in bytes of an update starting at time
+                ts."""
+
+                size = 0
+                rts = datetime.datetime(ts.year, ts.month, ts.day, ts.hour)
+                assert rts <= ts
+
+                for lf in self.logfiles:
+
+                        lf_time = datetime.datetime(
+                            *time.strptime(lf, "%Y%m%d%H")[0:6])
+
+                        if lf_time >= rts:
+
+                                if lf in self.logfile_size:
+                                        size += self.logfile_size[lf]
+                                else:
+                                        stat_logf = os.stat(os.path.join(
+                                            self.rootdir, lf))
+                                        entsz = stat_logf.st_size
+                                        self.logfile_size[lf] = entsz
+                                        size += entsz
+                return size
+
         def up_to_date(self, ts):
                 """Returns true if the timestamp is up to date."""