--- a/src/tests/cli/t_pkg_image_create.py Thu Sep 10 15:54:20 2009 -0700
+++ b/src/tests/cli/t_pkg_image_create.py Fri Sep 11 05:01:08 2009 -0500
@@ -30,19 +30,26 @@
testutils.setup_environment("../../../proto")
import os
+import pkg.portable
+import pkg.catalog
import shutil
import unittest
-class TestPkgImageCreateBasics(testutils.SingleDepotTestCase):
- # Only start/stop the depot once (instead of for every test)
+class TestPkgImageCreateBasics(testutils.ManyDepotTestCase):
+ # Only start/stop the depots once (instead of for every test)
persistent_depot = True
+ def setUp(self):
+ testutils.ManyDepotTestCase.setUp(self, 2)
+
+ self.durl1 = self.dcs[1].get_depot_url()
+ self.durl2 = self.dcs[2].get_depot_url()
+
def test_basic(self):
""" Create an image, verify it. """
- durl = self.dc.get_depot_url()
- self.image_create(durl)
+ self.image_create(self.durl1)
self.pkg("verify")
def test_image_create_bad_opts(self):
@@ -52,47 +59,64 @@
self.pkg("image-create --bozo", exit=2)
self.pkg("image-create", exit=2)
+ def __add_install_file(self, imgdir, fmri):
+ """Take an image path and fmri. Write a file to disk that
+ indicates that the package named by the fmri has been
+ installed. Assumes package was installed from a preferred
+ publisher."""
+
+ def install_file(fmri):
+ return "%s/pkg/%s/installed" % (imgdir,
+ fmri.get_dir_path())
+
+ f = file(install_file(fmri), "w")
+ f.writelines(["VERSION_1\n_PRE_", fmri.publisher])
+ f.close()
+
+ fi = file("%s/state/installed/%s" % (imgdir,
+ fmri.get_link_path()), "w")
+ fi.close()
+
def test_766(self):
"""Bug 766: image-create without publisher prefix specified."""
- durl = self.dc.get_depot_url()
-
pkgsend_data = """
open [email protected]
close
"""
- self.pkgsend_bulk(durl, pkgsend_data)
+ self.pkgsend_bulk(self.durl1, pkgsend_data)
self.assertRaises(testutils.UnexpectedExitCodeException,
- self.image_create, durl, "")
+ self.image_create, self.durl1, "")
def test_3588(self):
- """Bug 3588: Make sure we can't create an image where one
- already exists"""
- durl = self.dc.get_depot_url()
- self.pkg("image-create -p mydepot=%s %s/3588_image" % (
- durl, self.get_img_path()))
- self.pkg("image-create -p mydepot=%s %s/3588_image" %
- (durl, self.get_img_path()), exit=1)
+ """Ensure that image creation works as expected when an image
+ already exists."""
- def test_3588_1(self):
- """Bug 3588: Make sure we can create an image where one
- already exists with the -f (force) flag"""
- durl = self.dc.get_depot_url()
+ # These tests are interdependent.
+ #
+ # Bug 3588: Make sure we can't create an image where one
+ # already exists
+ self.pkg("image-create -p mydepot=%s %s/3588_image" % (
+ self.durl1, self.get_img_path()))
+ self.pkg("image-create -p mydepot=%s %s/3588_image" % (
+ self.durl1, self.get_img_path()), exit=1)
+
+ # Make sure we can create an image where one
+ # already exists with the -f (force) flag
self.pkg("image-create -p mydepot=%s %s/3588_image_1" % (
- durl, self.get_img_path()))
+ self.durl1, self.get_img_path()))
self.pkg("image-create -f -p mydepot=%s %s/3588_image_1" %
- (durl, self.get_img_path()))
+ (self.durl1, self.get_img_path()))
- def test_3588_2(self):
- """Bug 3588: Make sure we can't create an image where a
- non-empty directory exists"""
- durl = self.dc.get_depot_url()
+ # Bug 3588: Make sure we can't create an image where a
+ # non-empty directory exists
p = os.path.join(self.get_img_path(), "3588_2_image")
os.mkdir(p)
os.system("touch %s/%s" % (p, "somefile"))
- self.pkg("image-create -p mydepot=%s %s" % (durl, p), exit=1)
- self.pkg("image-create -f -p mydepot=%s %s" % (durl, p))
+ self.pkg("image-create -p mydepot=%s %s" % (self.durl1, p),
+ exit=1)
+ self.pkg("image-create -f -p mydepot=%s %s" % (self.durl1, p))
def test_4_options(self):
"""Verify that all of the options for specifying publisher
@@ -100,9 +124,8 @@
img_path = os.path.join(self.get_test_prefix(), "test_4_img")
for opt in ("-a", "-p", "--publisher"):
- durl = self.dc.get_depot_url()
- self.pkg("image-create %s mydepot=%s %s" % (opt, durl,
- img_path))
+ self.pkg("image-create %s mydepot=%s %s" % (opt,
+ self.durl1, img_path))
shutil.rmtree(img_path)
def test_5_bad_values_no_image(self):
@@ -119,7 +142,6 @@
created correctly."""
pwd = os.getcwd()
- durl = self.dc.get_depot_url()
img_path = "test_6_image"
abs_img_path = os.path.join(self.get_test_prefix(), img_path)
@@ -127,43 +149,118 @@
# specified image root if the specified root doesn't already
# exist.
os.chdir(self.get_test_prefix())
- self.pkg("image-create -p mydepot=%s %s" % (durl, img_path))
+ self.pkg("image-create -p mydepot=%s %s" % (self.durl1,
+ img_path))
os.chdir(pwd)
- self.assertFalse(os.path.exists(os.path.join(abs_img_path, img_path)))
+ self.assertFalse(os.path.exists(os.path.join(abs_img_path,
+ img_path)))
shutil.rmtree(abs_img_path)
# Now verify that the image root isn't duplicated within the
# specified image root if the specified root already exists.
os.chdir(self.get_test_prefix())
os.mkdir(img_path)
- self.pkg("image-create -p mydepot=%s %s" % (durl, img_path))
+ self.pkg("image-create -p mydepot=%s %s" % (self.durl1,
+ img_path))
os.chdir(pwd)
- self.assertFalse(os.path.exists(os.path.join(abs_img_path, img_path)))
+ self.assertFalse(os.path.exists(os.path.join(abs_img_path,
+ img_path)))
shutil.rmtree(abs_img_path)
def test_7_image_create_no_refresh(self):
"""Verify that image-create --no-refresh works as expected.
See bug 8777 for related issue."""
- durl = self.dc.get_depot_url()
-
pkgsend_data = """
open [email protected]
close
"""
- self.pkgsend_bulk(durl, pkgsend_data)
+ self.pkgsend_bulk(self.durl1, pkgsend_data)
# First, check to be certain that an image-create --no-refresh
# will succeed.
- self.image_create(durl, prefix="norefresh",
+ self.image_create(self.durl2, prefix="norefresh",
additional_args="--no-refresh")
self.pkg("list --no-refresh -a | grep baz", exit=1)
# Finally, verify that using set-publisher will cause a refresh
- # which in turn should cause 'baz' to be listed.
- self.pkg("set-publisher -O %s norefresh" % durl)
+ # which in turn should cause 'baz' to be listed *if* the origin
+ # has changed (setting it to the same value again won't work).
+ self.pkg("set-publisher -O %s norefresh" % self.durl1)
self.pkg("list --no-refresh -a | grep baz")
+ def test_8_image_upgrade(self):
+ """Verify that a version 0 image can be used by a client that
+ normally creates version 1 images, and that it will be upgraded
+ correctly when a privileged user uses it."""
+
+ # Publish some sample packages.
+ pkgsend_data = """
+ open [email protected]
+ close
+ open [email protected]
+ close
+ """
+ self.pkgsend_bulk(self.durl1, pkgsend_data)
+
+ # First, create a new image.
+ self.image_create(self.durl1)
+
+ # Next, install the packages.
+ self.pkg("install quux corge")
+
+ # Next, convert the v1 image to a v0 image.
+ img_root = os.path.join(self.get_img_path(), "var", "pkg")
+ cat_path = os.path.join(img_root, "catalog")
+ pub_path = os.path.join(img_root, "publisher")
+ v1_cat_path = os.path.join(pub_path, "test", "catalog")
+ v0_cat_path = os.path.join(cat_path, "test")
+
+ # For conversion, the v0 catalogs need to be moved to the
+ # v0 location.
+ os.makedirs(v0_cat_path)
+ for fname in ("catalog", "attrs"):
+ src = os.path.join(v1_cat_path, fname)
+ dest = os.path.join(v0_cat_path, fname)
+ pkg.portable.rename(src, dest)
+
+ # The existing installed state has to be converted to v0.
+ state_dir = os.path.join(img_root, "state")
+ inst_state_dir = os.path.join(state_dir, "installed")
+ cat = pkg.catalog.Catalog(meta_root=inst_state_dir)
+ for f in cat.fmris():
+ self.__add_install_file(img_root, f)
+ cat = None
+
+ # Now dump the new publisher directory, the 'known' state
+ # directory, and any catalog files in the 'installed'
+ # directory.
+ known_state_dir = os.path.join(state_dir, "known")
+ for path in (pub_path, known_state_dir):
+ shutil.rmtree(path)
+
+ for fname in sorted(os.listdir(inst_state_dir)):
+ if fname.startswith("catalog"):
+ pkg.portable.remove(os.path.join(inst_state_dir,
+ fname))
+
+ # Next, verify that the new client can read v0 images as an
+ # an unprivileged user.
+ self.pkg("info pkg://test/quux corge", su_wrap=True)
+
+ # Next, verify that the new client can upgrade v0 images to
+ # v1 images.
+ self.pkg("info quux pkg://test/corge")
+
+ # Finally, verify that the old structures and state information
+ # are gone.
+ self.assertFalse(os.path.exists(cat_path))
+ self.assertTrue(os.path.exists(pub_path))
+
+ for pl in sorted(os.listdir(inst_state_dir)):
+ # If there any other files but catalog files here, then
+ # the old state information didn't get properly removed.
+ assert pl.startswith("catalog.")
class TestImageCreateNoDepot(testutils.CliTestCase):
persistent_depot = True