src/tests/cli/t_pkg_image_create.py
changeset 1352 5c92c9d342ef
parent 1143 71becfca5cfd
child 1353 b9535ddac1c6
--- a/src/tests/cli/t_pkg_image_create.py	Thu Sep 10 15:54:20 2009 -0700
+++ b/src/tests/cli/t_pkg_image_create.py	Fri Sep 11 05:01:08 2009 -0500
@@ -30,19 +30,26 @@
         testutils.setup_environment("../../../proto")
 
 import os
+import pkg.portable
+import pkg.catalog
 import shutil
 import unittest
 
 
-class TestPkgImageCreateBasics(testutils.SingleDepotTestCase):
-        # Only start/stop the depot once (instead of for every test)
+class TestPkgImageCreateBasics(testutils.ManyDepotTestCase):
+        # Only start/stop the depots once (instead of for every test)
         persistent_depot = True
 
+        def setUp(self):
+                testutils.ManyDepotTestCase.setUp(self, 2)
+
+                self.durl1 = self.dcs[1].get_depot_url()
+                self.durl2 = self.dcs[2].get_depot_url()
+
         def test_basic(self):
                 """ Create an image, verify it. """
 
-                durl = self.dc.get_depot_url()
-                self.image_create(durl)
+                self.image_create(self.durl1)
                 self.pkg("verify")
 
         def test_image_create_bad_opts(self):
@@ -52,47 +59,64 @@
                 self.pkg("image-create --bozo", exit=2)
                 self.pkg("image-create", exit=2)
 
+        def __add_install_file(self, imgdir, fmri):
+                """Take an image path and fmri. Write a file to disk that
+                indicates that the package named by the fmri has been
+                installed.  Assumes package was installed from a preferred
+                publisher."""
+
+                def install_file(fmri):
+                        return "%s/pkg/%s/installed" % (imgdir,
+                            fmri.get_dir_path())
+
+                f = file(install_file(fmri), "w")
+                f.writelines(["VERSION_1\n_PRE_", fmri.publisher])
+                f.close()
+
+                fi = file("%s/state/installed/%s" % (imgdir,
+                    fmri.get_link_path()), "w")
+                fi.close()
+
         def test_766(self):
                 """Bug 766: image-create without publisher prefix specified."""
 
-                durl = self.dc.get_depot_url()
-
                 pkgsend_data = """
                 open [email protected]
                 close
                 """
-                self.pkgsend_bulk(durl, pkgsend_data)
+                self.pkgsend_bulk(self.durl1, pkgsend_data)
 
                 self.assertRaises(testutils.UnexpectedExitCodeException,
-                    self.image_create, durl, "")
+                    self.image_create, self.durl1, "")
 
         def test_3588(self):
-                """Bug 3588: Make sure we can't create an image where one
-                already exists"""
-                durl = self.dc.get_depot_url()
-                self.pkg("image-create -p mydepot=%s %s/3588_image" % (
-                    durl, self.get_img_path()))
-                self.pkg("image-create -p mydepot=%s %s/3588_image" %
-                                  (durl, self.get_img_path()), exit=1)
+                """Ensure that image creation works as expected when an image
+                already exists."""
 
-        def test_3588_1(self):
-                """Bug 3588: Make sure we can create an image where one
-                already exists with the -f (force) flag"""
-                durl = self.dc.get_depot_url()
+                # These tests are interdependent.
+                #
+                # Bug 3588: Make sure we can't create an image where one
+                # already exists
+                self.pkg("image-create -p mydepot=%s %s/3588_image" % (
+                    self.durl1, self.get_img_path()))
+                self.pkg("image-create -p mydepot=%s %s/3588_image" % (
+                    self.durl1, self.get_img_path()), exit=1)
+
+                # Make sure we can create an image where one
+                # already exists with the -f (force) flag
                 self.pkg("image-create -p mydepot=%s %s/3588_image_1" % (
-                    durl, self.get_img_path()))
+                    self.durl1, self.get_img_path()))
                 self.pkg("image-create -f -p mydepot=%s %s/3588_image_1" %
-                         (durl, self.get_img_path()))
+                         (self.durl1, self.get_img_path()))
 
-        def test_3588_2(self):
-                """Bug 3588: Make sure we can't create an image where a
-                non-empty directory exists"""
-                durl = self.dc.get_depot_url()
+                # Bug 3588: Make sure we can't create an image where a
+                # non-empty directory exists
                 p = os.path.join(self.get_img_path(), "3588_2_image")
                 os.mkdir(p)
                 os.system("touch %s/%s" % (p, "somefile"))
-                self.pkg("image-create -p mydepot=%s %s" % (durl, p), exit=1)
-                self.pkg("image-create -f -p mydepot=%s %s" % (durl, p))
+                self.pkg("image-create -p mydepot=%s %s" % (self.durl1, p),
+                    exit=1)
+                self.pkg("image-create -f -p mydepot=%s %s" % (self.durl1, p))
 
         def test_4_options(self):
                 """Verify that all of the options for specifying publisher
@@ -100,9 +124,8 @@
 
                 img_path = os.path.join(self.get_test_prefix(), "test_4_img")
                 for opt in ("-a", "-p", "--publisher"):
-                        durl = self.dc.get_depot_url()
-                        self.pkg("image-create %s mydepot=%s %s" % (opt, durl,
-                            img_path))
+                        self.pkg("image-create %s mydepot=%s %s" % (opt,
+                            self.durl1, img_path))
                         shutil.rmtree(img_path)
 
         def test_5_bad_values_no_image(self):
@@ -119,7 +142,6 @@
                 created correctly."""
 
                 pwd = os.getcwd()
-                durl = self.dc.get_depot_url()
                 img_path = "test_6_image"
                 abs_img_path = os.path.join(self.get_test_prefix(), img_path)
 
@@ -127,43 +149,118 @@
                 # specified image root if the specified root doesn't already
                 # exist.
                 os.chdir(self.get_test_prefix())
-                self.pkg("image-create -p mydepot=%s %s" % (durl, img_path))
+                self.pkg("image-create -p mydepot=%s %s" % (self.durl1,
+                    img_path))
                 os.chdir(pwd)
-                self.assertFalse(os.path.exists(os.path.join(abs_img_path, img_path)))
+                self.assertFalse(os.path.exists(os.path.join(abs_img_path,
+                    img_path)))
                 shutil.rmtree(abs_img_path)
 
                 # Now verify that the image root isn't duplicated within the
                 # specified image root if the specified root already exists.
                 os.chdir(self.get_test_prefix())
                 os.mkdir(img_path)
-                self.pkg("image-create -p mydepot=%s %s" % (durl, img_path))
+                self.pkg("image-create -p mydepot=%s %s" % (self.durl1,
+                    img_path))
                 os.chdir(pwd)
-                self.assertFalse(os.path.exists(os.path.join(abs_img_path, img_path)))
+                self.assertFalse(os.path.exists(os.path.join(abs_img_path,
+                    img_path)))
                 shutil.rmtree(abs_img_path)
 
         def test_7_image_create_no_refresh(self):
                 """Verify that image-create --no-refresh works as expected.
                 See bug 8777 for related issue."""
 
-                durl = self.dc.get_depot_url()
-
                 pkgsend_data = """
                 open [email protected]
                 close
                 """
-                self.pkgsend_bulk(durl, pkgsend_data)
+                self.pkgsend_bulk(self.durl1, pkgsend_data)
 
                 # First, check to be certain that an image-create --no-refresh
                 # will succeed.
-                self.image_create(durl, prefix="norefresh",
+                self.image_create(self.durl2, prefix="norefresh",
                     additional_args="--no-refresh")
                 self.pkg("list --no-refresh -a | grep baz", exit=1)
 
                 # Finally, verify that using set-publisher will cause a refresh
-                # which in turn should cause 'baz' to be listed.
-                self.pkg("set-publisher -O %s norefresh" % durl)
+                # which in turn should cause 'baz' to be listed *if* the origin
+                # has changed (setting it to the same value again won't work).
+                self.pkg("set-publisher -O %s norefresh" % self.durl1)
                 self.pkg("list --no-refresh -a | grep baz")
 
+        def test_8_image_upgrade(self):
+                """Verify that a version 0 image can be used by a client that
+                normally creates version 1 images, and that it will be upgraded
+                correctly when a privileged user uses it."""
+
+                # Publish some sample packages.
+                pkgsend_data = """
+                open [email protected]
+                close
+                open [email protected]
+                close
+                """
+                self.pkgsend_bulk(self.durl1, pkgsend_data)
+
+                # First, create a new image.
+                self.image_create(self.durl1)
+
+                # Next, install the packages.
+                self.pkg("install quux corge")
+
+                # Next, convert the v1 image to a v0 image.
+                img_root = os.path.join(self.get_img_path(), "var", "pkg")
+                cat_path = os.path.join(img_root, "catalog")
+                pub_path = os.path.join(img_root, "publisher")
+                v1_cat_path = os.path.join(pub_path, "test", "catalog")
+                v0_cat_path = os.path.join(cat_path, "test")
+
+                # For conversion, the v0 catalogs need to be moved to the
+                # v0 location.
+                os.makedirs(v0_cat_path)
+                for fname in ("catalog", "attrs"):
+                        src = os.path.join(v1_cat_path, fname)
+                        dest = os.path.join(v0_cat_path, fname)
+                        pkg.portable.rename(src, dest)
+
+                # The existing installed state has to be converted to v0.
+                state_dir = os.path.join(img_root, "state")
+                inst_state_dir = os.path.join(state_dir, "installed")
+                cat = pkg.catalog.Catalog(meta_root=inst_state_dir)
+                for f in cat.fmris():
+                        self.__add_install_file(img_root, f)
+                cat = None
+
+                # Now dump the new publisher directory, the 'known' state
+                # directory, and any catalog files in the 'installed'
+                # directory.
+                known_state_dir = os.path.join(state_dir, "known")
+                for path in (pub_path, known_state_dir):
+                        shutil.rmtree(path)
+
+                for fname in sorted(os.listdir(inst_state_dir)):
+                        if fname.startswith("catalog"):
+                                pkg.portable.remove(os.path.join(inst_state_dir,
+                                    fname))
+
+                # Next, verify that the new client can read v0 images as an
+                # an unprivileged user.
+                self.pkg("info pkg://test/quux corge", su_wrap=True)
+
+                # Next, verify that the new client can upgrade v0 images to
+                # v1 images.
+                self.pkg("info quux pkg://test/corge")
+
+                # Finally, verify that the old structures and state information
+                # are gone.
+                self.assertFalse(os.path.exists(cat_path))
+                self.assertTrue(os.path.exists(pub_path))
+
+                for pl in sorted(os.listdir(inst_state_dir)):
+                        # If there any other files but catalog files here, then
+                        # the old state information didn't get properly removed.
+                        assert pl.startswith("catalog.")
 
 class TestImageCreateNoDepot(testutils.CliTestCase):
         persistent_depot = True