src/tests/cli/t_pkg_publisher.py
author Bart Smaalders <Bart.Smaalders@Oracle.COM>
Thu, 30 Mar 2017 17:05:02 -0700
changeset 3537 03bba058e598
parent 3400 77d785eb851b
permissions -rw-r--r--
20973899 Installation of zones in parallel may fail with: [Errno 17] File exists

#!/usr/bin/python
#
# CDDL HEADER START
#
# The contents of this file are subject to the terms of the
# Common Development and Distribution License (the "License").
# You may not use this file except in compliance with the License.
#
# You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
# or http://www.opensolaris.org/os/licensing.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# When distributing Covered Code, include this CDDL HEADER in each
# file and include the License file at usr/src/OPENSOLARIS.LICENSE.
# If applicable, add the following below this CDDL HEADER, with the
# fields enclosed by brackets "[]" replaced with your own identifying
# information: Portions Copyright [yyyy] [name of copyright owner]
#
# CDDL HEADER END
#

#
# Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
#

from . import testutils
if __name__ == "__main__":
        testutils.setup_environment("../../../proto")
import pkg5unittest

import hashlib
import os
import pkg.client.image as image
import pkg.misc
import shutil
import six
import tempfile
import unittest


class TestPkgPublisherBasics(pkg5unittest.SingleDepotTestCase):
        # Only start/stop the depot once (instead of for every test)
        persistent_setup = True
        # Tests in this suite use the read only data directory.
        need_ro_data = True

        # Dummy package
        foo1 = """
            open foo@1,5.11-0
            close """

        def setUp(self):
                # This test suite needs actual depots.
                pkg5unittest.SingleDepotTestCase.setUp(self, start_depot=True)

                self.durl1 = self.dcs[1].get_depot_url()
                self.pkgsend_bulk(self.durl1, self.foo1)

        def test_pkg_publisher_bogus_opts(self):
                """ pkg bogus option checks """

                self.image_create(self.rurl)

                self.pkg("set-publisher -@ test3", exit=2)
                self.pkg("publisher -@ test5", exit=2)
                self.pkg("set-publisher -k", exit=2)
                self.pkg("set-publisher -c", exit=2)
                self.pkg("set-publisher -O", exit=2)
                self.pkg("unset-publisher", exit=2)
                self.pkg("unset-publisher -k", exit=2)

        def test_publisher_add_remove(self):
                """pkg: add and remove a publisher"""

                self.image_create(self.rurl)

                self.pkg("set-publisher -O http://{0}1 test1".format(self.bogus_url),
                    exit=1)

                # Verify that a publisher can be added initially disabled.
                self.pkg("set-publisher -d --no-refresh -O http://{0}1 test1".format(
                    self.bogus_url))

                self.pkg("publisher | grep test")
                self.pkg("set-publisher -P -O http://{0}2 test2".format(
                    self.bogus_url), exit=1)
                self.pkg("set-publisher -P --no-refresh -O http://{0}2 test2".format(
                    self.bogus_url))
                self.pkg("publisher | grep test2")
                self.pkg("unset-publisher test1")
                self.pkg("publisher | grep test1", exit=1)

                # Now verify that partial success (3) or complete failure (1)
                # is properly returned if an attempt to remove one or more
                # publishers only results in some of them being removed:

                # ...when one of two provided is unknown.
                self.pkg("set-publisher --no-refresh -O http://{0}2 test3".format(
                    self.bogus_url))
                self.pkg("unset-publisher test3 test4", exit=3)

                # ...when all provided are unknown.
                self.pkg("unset-publisher test3 test4", exit=1)
                self.pkg("unset-publisher test3", exit=1)

                # Now verify that success occurs when attempting to remove
                # one or more publishers:

                # ...when one is provided and not preferred.
                self.pkg("set-publisher --no-refresh -O http://{0}2 test3".format(
                    self.bogus_url))
                self.pkg("unset-publisher test3")

                # ...when two are provided and not preferred.
                self.pkg("set-publisher --no-refresh -O http://{0}2 test3".format(
                    self.bogus_url))
                self.pkg("set-publisher --no-refresh -O http://{0}2 test4".format(
                    self.bogus_url))
                self.pkg("unset-publisher test3 test4")

                # Ensure that some package manifests are cached for the
                # publisher.
                self.pkg("info -r '//test/*'")

                # Now verify that success occurs when attempting to remove a
                # publisher that has already had its private directory removed
                # from $IMG_DIR/pkg.
                api_inst = self.get_img_api_obj()
                pub = api_inst.get_publisher(prefix="test")
                self.assertTrue(os.path.exists(pub.meta_root))
                pub.remove_meta_root()
                self.assertTrue(not os.path.exists(pub.meta_root))

                pub_cache_path = os.path.join(self.img_path(), "var", "pkg",
                    "cache", "publisher", "test")
                self.assertTrue(os.path.exists(pub_cache_path),
                    os.listdir(os.path.join(self.img_path(), "var", "pkg",
                    "cache", "publisher")))
                shutil.rmtree(pub_cache_path)
                self.assertTrue(not os.path.exists(pub_cache_path))

                self.pkg("unset-publisher test")

        def test_publisher_uuid(self):
                """verify uuid is set manually and automatically for a
                publisher"""

                self.image_create(self.rurl)
                self.pkg("set-publisher -O http://{0}1 --no-refresh --reset-uuid test1".format(
                    self.bogus_url))
                self.pkg("set-publisher --no-refresh --reset-uuid test1")
                self.pkg("set-publisher -O http://{0}1 --no-refresh test2".format(
                    self.bogus_url))
                self.pkg("publisher test2 | grep 'Client UUID: '")
                self.pkg("publisher test2 | grep -v 'Client UUID: None'")

        def test_publisher_bad_opts(self):
                """pkg: more insidious option abuse for set-publisher"""

                self.image_create(self.rurl)

                key_path = os.path.join(self.keys_dir, "cs1_ch1_ta3_key.pem")
                cert_path = os.path.join(self.cs_dir, "cs1_ch1_ta3_cert.pem")

                self.pkg(
                    "set-publisher -O http://{0}1 test1 -O http://{1}2 test2".format(
                    self.bogus_url, self.bogus_url), exit=2)

                self.pkg("set-publisher -O http://{0}1 test1".format(self.bogus_url),
                    exit=1)
                self.pkg("set-publisher -O http://{0}2 test2".format(self.bogus_url),
                    exit=1)
                self.pkg("set-publisher --no-refresh -O https://{0}1 test1".format(
                    self.bogus_url))
                self.pkg("set-publisher --no-refresh -O http://{0}2 test2".format(
                    self.bogus_url))

                # Set key for test1.
                self.pkg("set-publisher --no-refresh -k {0} test1".format(key_path))

                # This should fail since test2 doesn't have any SSL origins or
                # mirrors.
                self.pkg("set-publisher --no-refresh -k {0} test2".format(key_path),
                    exit=2)

                # Listing publishers should succeed even if key file is gone.
                # This test relies on using the same implementation used in
                # image.py __store_publisher_ssl() which sets the paths to the
                # SSL keys/certs.
                img_key_path = os.path.join(self.img_path(), "var", "pkg",
                    "ssl", pkg.misc.get_data_digest(key_path,
                    hash_func=hashlib.sha1)[0])
                os.unlink(img_key_path)
                self.pkg("publisher test1")

                # This should fail since key has been removed even though test2
                # has an https origin.
                self.pkg("set-publisher --no-refresh -O https://{0}2 test2".format(
                    self.bogus_url))
                self.pkg("set-publisher --no-refresh -k {0} test2".format(
                    img_key_path), exit=1)

                # Reset for next test.
                self.pkg("set-publisher --no-refresh -k '' test1")
                self.pkg("set-publisher --no-refresh -O http://{0}2 test2".format(
                    self.bogus_url))

                # Set cert for test1.
                self.pkg("set-publisher --no-refresh -c {0} test1".format(cert_path))

                # This should fail since test2 doesn't have any SSL origins or
                # mirrors.
                self.pkg("set-publisher --no-refresh -c {0} test2".format(cert_path),
                    exit=2)

                # Listing publishers should be possible if cert file is gone.
                img_cert_path = os.path.join(self.img_path(), "var", "pkg",
                    "ssl", pkg.misc.get_data_digest(cert_path,
                    hash_func=hashlib.sha1)[0])
                os.unlink(img_cert_path)
                self.pkg("publisher test1", exit=3)

                # This should fail since cert has been removed even though test2
                # has an https origin.
                self.pkg("set-publisher --no-refresh -O https://{0}2 test2".format(
                    self.bogus_url))
                self.pkg("set-publisher --no-refresh -c {0} test2".format(
                    img_cert_path), exit=1)

                # Reset for next test.
                self.pkg("set-publisher --no-refresh -O http://{0}2 test2".format(
                    self.bogus_url))

                # Expect partial failure since cert file is gone for test1.
                self.pkg("publisher test1", exit=3)
                self.pkg("publisher test3", exit=1)
                self.pkg("publisher -H | grep URI", exit=1)

                # Now verify that setting ssl_cert or ssl_key to "" works.
                self.pkg('set-publisher --no-refresh -c "" test1')
                self.pkg('publisher -H test1 | grep "SSL Cert: None"')

                self.pkg('set-publisher --no-refresh -k "" test1')
                self.pkg('publisher -H test1 | grep "SSL Key: None"')

                self.pkg("set-publisher --set-property foo test1", exit=2)
                self.pkg("set-publisher --set-property foo=bar --set-property "
                    "foo=baz test1", exit=2)
                self.pkg("set-publisher --add-property-value foo test1", exit=2)
                self.pkg("set-publisher --remove-property-value foo test1",
                    exit=2)
                self.pkg("set-publisher --approve-ca-cert /shouldnotexist/foo "
                    "test1", exit=1)

                key_fh, key_path = tempfile.mkstemp()
                self.pkg("set-publisher --approve-ca-cert {0} test1".format(key_path),
                    exit=1, su_wrap=True)
                os.unlink(key_path)

                self.pkg("set-publisher --no-refresh --set-property "
                    "signature-policy=ignore test1")
                self.pkg("publisher test1")
                self.assertTrue("signature-policy = ignore" in self.output)
                self.pkg("set-publisher --no-refresh --set-property foo=bar "
                    "test1")
                self.pkg("publisher test1")
                self.assertTrue("foo = bar" in self.output)
                self.pkg("set-publisher --no-refresh  --remove-property-value "
                    "foo=baz test1", exit=1)
                self.pkg("publisher test1")
                self.assertTrue("foo = bar" in self.output)
                self.pkg("set-publisher --no-refresh  --set-property "
                    "signature-policy=require-names test1", exit=1)
                self.pkg("set-publisher --no-refresh --remove-property-value "
                    "bar=baz test1", exit=1)

                self.pkg("publisher")
                self.pkg("set-publisher --no-refresh --search-after foo test1",
                    exit=1)
                self.pkg("set-publisher --no-refresh --search-before foo test1",
                    exit=1)

        def test_publisher_validation(self):
                """Verify that we catch poorly formed auth prefixes and URL"""

                self.image_create(self.rurl, prefix="test")

                self.pkg("set-publisher -O http://{0}1 test1".format(self.bogus_url),
                    exit=1)
                self.pkg("set-publisher --no-refresh -O http://{0}1 test1".format(
                    self.bogus_url))

                self.pkg(("set-publisher -O http://{0}2 ".format(self.bogus_url)) +
                    "$%^8", exit=1)
                self.pkg(("set-publisher -O http://{0}2 ".format(self.bogus_url)) +
                    "8^$%", exit=1)
                self.pkg("set-publisher -O http://*^5$% test2", exit=1)
                self.pkg("set-publisher -O http://{0}1:abcde test2".format(
                    self.bogus_url), exit=1)
                self.pkg("set-publisher -O ftp://{0}2 test2".format(self.bogus_url),
                    exit=1)

                # Verify single character in hostname is valid publisher
                self.pkg("set-publisher --no-refresh -g http://a/ a")
                self.pkg("set-publisher --no-refresh -g http://a.example.com " \
                    "a.example.com")

        def test_missing_perms(self):
                """Verify graceful failure if certificates are unreadable by
                unprivileged users."""

                self.image_create(self.rurl, prefix="test")

                self.pkg("set-publisher --no-refresh -O http://{0}1 test1".format(
                    self.bogus_url), su_wrap=True, exit=1)
                self.pkg("set-publisher --no-refresh -O http://{0}1 foo".format(
                    self.bogus_url))
                self.pkg("publisher | grep foo")
                self.pkg("set-publisher -P --no-refresh -O http://{0}2 test2".format(
                    self.bogus_url), su_wrap=True, exit=1)
                self.pkg("unset-publisher foo", su_wrap=True, exit=1)
                self.pkg("unset-publisher foo")

                self.pkg("set-publisher -m http://{0}1 test".format(self.bogus_url),
                    su_wrap=True, exit=1)
                self.pkg("set-publisher -m http://{0}2 test".format(
                    self.bogus_url))

                self.pkg("set-publisher -M http://{0}2 test".format(
                    self.bogus_url), su_wrap=True, exit=1)
                self.pkg("set-publisher -M http://{0}2 test".format(
                    self.bogus_url))

                # Now change the first publisher to a https URL so that
                # certificate failure cases can be tested.
                key_path = os.path.join(self.keys_dir, "cs1_ch1_ta3_key.pem")
                cert_path = os.path.join(self.cs_dir, "cs1_ch1_ta3_cert.pem")

                self.pkg("set-publisher --no-refresh -O https://{0}1 test1".format(
                    self.bogus_url))
                self.pkg("set-publisher --no-refresh -c {0} test1".format(cert_path))
                self.pkg("set-publisher --no-refresh -k {0} test1".format(key_path))

                # This test relies on using the same implementation used in
                # image.py __store_publisher_ssl() which sets the paths to the
                # SSL keys/certs.
                img_key_path = os.path.join(self.img_path(), "var", "pkg",
                    "ssl", pkg.misc.get_data_digest(key_path,
                    hash_func=hashlib.sha1)[0])
                img_cert_path = os.path.join(self.img_path(), "var", "pkg",
                    "ssl", pkg.misc.get_data_digest(cert_path,
                    hash_func=hashlib.sha1)[0])

                # Make the cert/key unreadable by unprivileged users.
                os.chmod(img_key_path, 0000)
                os.chmod(img_cert_path, 0000)

                # Verify that an unreadable certificate results in a
                # partial failure when displaying publisher information.
                self.pkg("publisher test1", su_wrap=True, exit=3)

                # Corrupt key/cert and verify invalid cert/key results in a
                # partial failure when displaying publisher information.
                open(img_key_path, "wb").close()
                open(img_cert_path, "wb").close()
                self.pkg("publisher test1", exit=3)

        def test_publisher_tsv_format(self):
                """Ensure tsv formatted output is correct."""

                self.image_create(self.rurl)

                self.pkg("set-publisher --no-refresh -O https://{0}1 test1".format(
                    self.bogus_url))
                self.pkg("set-publisher --no-refresh -O http://{0}2 test2".format(
                    self.bogus_url))

                base_string = ("test\ttrue\tfalse\ttrue\torigin\tonline\t"
                    "{0}/\t-\n"
                    "test1\ttrue\tfalse\ttrue\torigin\tonline\t"
                    "https://{1}1/\t-\n"
                    "test2\ttrue\tfalse\ttrue\torigin\tonline\t"
                    "http://{2}2/\t-\n".format(self.rurl, self.bogus_url,
                    self.bogus_url))
                # With headers
                self.pkg("publisher -F tsv")
                expected = "PUBLISHER\tSTICKY\tSYSPUB\tENABLED" \
                    "\tTYPE\tSTATUS\tURI\tPROXY\n" + base_string
                output = self.reduceSpaces(self.output)
                self.assertEqualDiff(expected, output)

                # Without headers
                self.pkg("publisher -HF tsv")
                expected = base_string
                output = self.reduceSpaces(self.output)
                self.assertEqualDiff(expected, output)

        def test_old_publisher_ca_certs(self):
                """Check that approving and revoking CA certs is reflected in
                the output of pkg publisher and that setting the CA certs when
                setting an existing publisher works correctly."""

                cert_dir = os.path.join(self.ro_data_root,
                    "signing_certs", "produced", "chain_certs")

                app1 = os.path.join(cert_dir, "ch1_ta1_cert.pem")
                app2 = os.path.join(cert_dir, "ch1_ta3_cert.pem")
                rev1 = os.path.join(cert_dir, "ch1_ta4_cert.pem")
                rev2 = os.path.join(cert_dir, "ch1_ta5_cert.pem")
                app1_h = self.calc_pem_hash(app1)
                app2_h = self.calc_pem_hash(app2)
                rev1_h = self.calc_pem_hash(rev1)
                rev2_h = self.calc_pem_hash(rev2)
                self.image_create(self.rurl)
                self.pkg("set-publisher "
                    "--approve-ca-cert {0} "
                    "--approve-ca-cert {1} --revoke-ca-cert {2} "
                    "--revoke-ca-cert {3} test ".format(
                    app1, app2, rev1_h, rev2_h))
                self.pkg("publisher test")
                r1 = "         Approved CAs: {0}"
                r2 = "                     : {0}"
                r3 = "          Revoked CAs: {0}"
                ls = self.output.splitlines()
                found_approved = False
                found_revoked = False
                for i in range(0, len(ls)):
                        if "Approved CAs" in ls[i]:
                                found_approved = True
                                if not ((r1.format(app1_h) == ls[i] and
                                    r2.format(app2_h) == ls[i+1] or \
                                    (r1.format(app2_h) == ls[i]) and
                                    r2.format(app1_h) == ls[i+1])):
                                        raise RuntimeError("Expected to see "
                                            "{0} and {1} as approved certs. "
                                            "Output was:\n{2}".format(app1_h,
                                            app2_h, self.output))
                        elif "Revoked CAs" in ls[i]:
                                found_approved = True
                                if not ((r3.format(rev1_h) == ls[i] and
                                    r2.format(rev2_h) == ls[i+1]) or \
                                    (r3.format(rev2_h) == ls[i] and
                                    r2.format(rev1_h) == ls[i+1])):
                                        raise RuntimeError("Expected to see "
                                            "{0} and {1} as revoked certs. "
                                            "Output was:\n{2}".format(rev1_h,
                                            rev2_h, self.output))

        def test_publisher_properties(self):
                """Test that properties set on publishers are correctly
                displayed in the output of pkg publisher <publisher name>."""

                self.image_create(self.rurl)
                self.pkg("publisher test")
                self.assertTrue("Properties:" not in self.output)

                self.pkg("set-publisher --set-property foo=bar test")
                self.pkg("publisher test")
                self.assertTrue("Properties:" in self.output)
                self.assertTrue("foo = bar" in self.output)
                self.pkg("set-publisher --add-property-value foo=bar1 test",
                    exit=1)

                self.pkg("set-publisher --set-property "
                    "signature-policy=require-names --add-property-value "
                    "signature-required-names=n1 test")
                self.pkg("publisher test")
                self.assertTrue("signature-policy = require-names" in self.output)
                self.assertTrue("signature-required-names = n1" in self.output)

                self.pkg("set-publisher --add-property-value "
                    "signature-required-names=n2 test")
                self.pkg("publisher test")
                self.assertTrue("signature-required-names = n1, n2" in self.output)

        def test_bug_7141684(self):
                """Test that pkg(1) client does not traceback if no publisher
                configured and we try to get preferred publisher"""

                self.image_create()
                self.pkg("publisher -P", exit=0)

                self.pkg("set-publisher -g {0} test".format(self.durl1))
                self.pkg("install foo")
                self.pkg("unset-publisher test")
                self.pkg("publisher -P", exit=0)

        def test_publisher_proxies(self):
                """Tests that set-publisher can add and remove proxy values
                per origin."""

                self.image_create(self.rurl)
                self.pkg("publisher test")
                self.assertTrue("Proxy:" not in self.output)

                # check origin and mirror addition/removal when proxies are used
                for add, remove in [("-g", "-G"), ("-m", "-M")]:
                        self.image_create(self.rurl)
                        # we can't proxy file repositories
                        self.pkg("set-publisher --no-refresh {add} {url} "
                            "--proxy http://foo test".format(
                            add=add, url=self.rurl), exit=1)
                        self.pkg("publisher test")
                        self.assertTrue("Proxy:" not in self.output)

                        # we can set the proxy for http repos
                        self.pkg("set-publisher --no-refresh {add} {url} "
                            "--proxy http://foo test".format(
                            add=add, url=self.durl))

                        self.pkg("publisher test")
                        self.assertTrue("Proxy: http://foo" in self.output)

                        # remove the file-based repository and ensure we still
                        # have a proxied http-based publisher
                        self.pkg("set-publisher --no-refresh -G {0} test".format(
                            self.rurl))
                        self.pkg("publisher -F tsv")
                        self.assertTrue("{0}/\thttp://foo".format(
                            self.durl in self.output))
                        self.assertTrue(self.rurl not in self.output)

                        # ensure we can't add duplicate proxied or unproxied
                        # repositories
                        self.pkg("set-publisher --no-refresh {add} {url} "
                            "--proxy http://foo test".format(
                            add=add, url=self.durl), exit=1)
                        self.pkg("set-publisher --no-refresh {add} {url} "
                            "test".format(add=add, url=self.durl), exit=1)

                        # we should have 1 proxied occurrence of our http url
                        self.pkg("publisher -F tsv")
                        self.assertTrue("{0}/\thttp://foo".format(
                            self.durl in self.output))
                        self.assertTrue("\t\t{0}".format(self.durl not in self.output))

                        # when removing a proxied url, then adding the same url
                        # unproxied, the proxy configuration does get removed
                        self.pkg("set-publisher --no-refresh {remove} {url}"
                            " test".format(remove=remove, url=self.durl),
                            exit=0)
                        self.pkg("set-publisher --no-refresh {add} {url} "
                            "test".format(add=add, url=self.durl), exit=0)
                        self.pkg("publisher -F tsv")
                        self.assertTrue("{0}/\thttp://foo".format(
                            self.durl not in self.output))
                        self.pkg("set-publisher --no-refresh {remove} "
                            "{url} test".format(
                            remove=remove, url=self.durl))

                        # when we add multiple urls, and they all get the same
                        # proxy value, leaving an existing non-proxied url
                        # as non-proxied.
                        self.pkg("set-publisher --no-refresh {add} http://a "
                            "test".format(add=add))
                        self.pkg("set-publisher --no-refresh {add} http://b "
                            "{add} http://c --proxy http://foo test".format(
                            add=add))
                        self.pkg("publisher -F tsv")
                        self.assertTrue("http://a/\t-" in self.output)
                        self.assertTrue("http://b/\thttp://foo" in self.output)
                        self.assertTrue("http://c/\thttp://foo" in self.output)

        def test_publisher_special_repo_name(self):
                """Tests that set-publisher can use the repository name with
                special characters."""

                self.image_create()

                # "%" is a special character in SafeConfigParser, but needs
                # to be supported anyway.
                repo_dir = os.path.join(self.test_root, "repotest{0}ymbol")
                self.create_repo(repo_dir, properties={ "publisher": {
                    "prefix": "test1" } })
                self.pkg("set-publisher -p {0} test1".format(repo_dir))
                shutil.rmtree(repo_dir)

                # "+" will be converted into "%2B" by URL quoting routines.
                # "%" is a special character in SafeConfigParser, but we need
                # to support it here.
                repo_dir = os.path.join(self.test_root, "repotest+symbol")
                self.create_repo(repo_dir, properties={ "publisher": {
                    "prefix": "test2" } })
                self.pkg("set-publisher -g {0} test2".format(repo_dir))
                shutil.rmtree(repo_dir)

                # "%()" is the syntax of expansion language in SafeConfigParser
                # but needs to be raw characters here.
                repo_dir = os.path.join(self.test_root, "{junkrepo}")
                self.create_repo(repo_dir, properties={ "publisher": {
                    "prefix": "test3" } })
                self.pkg("set-publisher -g {0} test3".format(repo_dir))
                shutil.rmtree(repo_dir)

        def test_remove_unused_cert_key(self):
                """Ensure that unused client certificate files are removed."""

                self.image_create(self.rurl)

                # Set the first publisher to a https URL
                key_path = os.path.join(self.keys_dir, "cs1_ch1_ta3_key.pem")
                cert_path = os.path.join(self.cs_dir, "cs1_ch1_ta3_cert.pem")

                img_key_path = os.path.join(self.img_path(), "var", "pkg",
                    "ssl", pkg.misc.get_data_digest(key_path,
                    hash_func=hashlib.sha1)[0])
                img_cert_path = os.path.join(self.img_path(), "var", "pkg",
                    "ssl", pkg.misc.get_data_digest(cert_path,
                    hash_func=hashlib.sha1)[0])

                self.pkg("set-publisher --no-refresh -O https://{0} -c {1} \
                    -k {2} test1" .format(self.bogus_url, cert_path, key_path))

                # cert and key should exist
                self.assertTrue(os.path.exists(img_key_path))
                self.assertTrue(os.path.exists(img_cert_path))

                # Now change test1 to http URL to check whether
                # certificate and key are removed
                self.pkg("set-publisher --no-refresh -O http://{0} \
                    test1".format(self.bogus_url))

                # cert and key should not exist.
                self.assertTrue(not os.path.exists(img_key_path))
                self.assertTrue(not os.path.exists(img_cert_path))

                # Now test if cert and key is still in use
                # we should not remove them
                self.pkg("set-publisher --no-refresh -O https://{0} -c {1} \
                    -k {2} test1".format(self.bogus_url, cert_path, key_path))

                self.pkg("set-publisher --no-refresh -O https://{0} -c {1} \
                    -k {2} foo".format(self.bogus_url, cert_path, key_path))

                # cert and key should exist
                self.assertTrue(os.path.exists(img_key_path))
                self.assertTrue(os.path.exists(img_cert_path))

                # Remove ssl for test1
                self.pkg("set-publisher --no-refresh -O http://{0} \
                    foo".format(self.bogus_url))

                # cert and key should still exist.
                self.assertTrue(os.path.exists(img_key_path))
                self.assertTrue(os.path.exists(img_cert_path))

                # Test unset-publisher
                self.pkg("set-publisher --no-refresh -O https://{0} -c {1} \
                    -k {2} foo".format(self.bogus_url, cert_path, key_path))

                # cert and key should exist.
                self.assertTrue(os.path.exists(img_key_path))
                self.assertTrue(os.path.exists(img_cert_path))

                self.pkg("unset-publisher foo")
                # cert and key should still exist.
                self.assertTrue(os.path.exists(img_key_path))
                self.assertTrue(os.path.exists(img_cert_path))

                self.pkg("unset-publisher test1")
                # cert and key should not exist.
                self.assertTrue(not os.path.exists(img_key_path))
                self.assertTrue(not os.path.exists(img_cert_path))


class TestPkgPublisherMany(pkg5unittest.ManyDepotTestCase):
        # Only start/stop the depot once (instead of for every test)
        persistent_setup = True
        # Tests in this suite use the read only data directory.
        need_ro_data = True

        foo1 = """
            open foo@1,5.11-0
            close """

        bar1 = """
            open bar@1,5.11-0
            close """

        baz1 = """
            open baz@1,5.11-0
            close """

        origin_1 = """
            open origin1@1,5.11-0
            add file tmp/cat mode=0444 owner=root group=bin path=etc/cat
            close """

        origin_2 = """
            open origin2@1,5.11-0
            close """

        test3_pub_cfg = {
            "publisher": {
                "alias": "t3",
                "prefix": "test3",
            },
            "repository": {
                "collection_type": "supplemental",
                "description": "This repository serves packages for test3.",
                "legal_uris": [
                    "http://www.opensolaris.org/os/copyrights",
                    "http://www.opensolaris.org/os/tou",
                    "http://www.opensolaris.org/os/trademark"
                ],
                "name": "The Test3 Repository",
                "refresh_seconds": 86400,
                "registration_uri": "",
                "related_uris": [
                    "http://pkg.opensolaris.org/contrib",
                    "http://jucr.opensolaris.org/pending",
                    "http://jucr.opensolaris.org/contrib"
                ],
            },
        }

        misc_files = [ "tmp/cat" ]

        def setUp(self):
                # This test suite needs actual depots.
                pkg5unittest.ManyDepotTestCase.setUp(self, ["test1", "test2",
                    "test3",  "test1", "test1", "test3", "test4", "test5",
                    "test4"], start_depots=True)
                self.make_misc_files(self.misc_files)

                self.durl1 = self.dcs[1].get_depot_url()
                self.pkgsend_bulk(self.durl1, self.foo1)

                self.durl2 = self.dcs[2].get_depot_url()
                self.pkgsend_bulk(self.durl2, self.bar1)

                self.durl3 = self.dcs[3].get_depot_url()
                self.pkgsend_bulk(self.durl3, self.baz1)

                self.image_create(self.durl1, prefix="test1")
                self.pkg("set-publisher -O " + self.durl2 + " test2")
                self.pkg("set-publisher -O " + self.durl3 + " test3")

        def __test_mirror_origin(self, etype, add_opt, remove_opt):
                durl1 = self.dcs[1].get_depot_url()
                durl4 = self.dcs[4].get_depot_url()
                durl5 = self.dcs[5].get_depot_url()

                # Test single add; --no-refresh must be used here since the URI
                # being added is for a non-existent repository.
                self.pkg("set-publisher --no-refresh {0} http://{1}1 test1".format(
                    add_opt, self.bogus_url))
                self.pkg("set-publisher --no-refresh {0} http://{1}2 test1".format(
                    add_opt, self.bogus_url))
                self.pkg("set-publisher --no-refresh {0} http://{1}5".format(add_opt,
                    self.bogus_url), exit=2)
                self.pkg("set-publisher {0} test1".format(add_opt), exit=2)
                self.pkg("set-publisher --no-refresh {0} http://{1}1 test1".format(
                    add_opt, self.bogus_url), exit=1)
                self.pkg("set-publisher {0} http://{1}5 test11".format(add_opt,
                    self.bogus_url), exit=1)
                if etype == "origin":
                        self.pkg("set-publisher {0} {1}7 test1".format(
                            add_opt, self.bogus_url), exit=1)

                # Test single remove.
                self.pkg("set-publisher --no-refresh {0} http://{1}1 test1".format(
                    remove_opt, self.bogus_url))
                self.pkg("set-publisher --no-refresh {0} http://{1}2 test1".format(
                    remove_opt, self.bogus_url))
                # URIs to remove not specified using options, so they are seen
                # as publisher names -- only one publisher name may be
                # specified at a time.
                self.pkg("set-publisher {0} test11 http://{1}2 http://{2}4".format(
                    remove_opt, self.bogus_url, self.bogus_url), exit=2)
                self.pkg("set-publisher {0} http://{1}5".format(remove_opt,
                    self.bogus_url), exit=2)
                # publisher name specified to remove as URI.
                self.pkg("set-publisher {0} test1".format(remove_opt), exit=2)
                # URI already removed or never existed.
                self.pkg("set-publisher {0} http://{1}5 test11".format(remove_opt,
                    self.bogus_url), exit=1)
                self.pkg("set-publisher {0} http://{1}6 test1".format(remove_opt,
                    self.bogus_url), exit=1)
                self.pkg("set-publisher {0} {1}7 test1".format(remove_opt,
                    self.bogus_url), exit=1)

                # Test a combined add and remove.
                self.pkg("set-publisher {0} {1} test1".format(add_opt, durl4))
                self.pkg("set-publisher {0} {1} {2} {3} test1".format(add_opt, durl5,
                    remove_opt, durl4))
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl5))
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl4), exit=1)
                self.pkg("set-publisher {0} {1} test1".format(remove_opt, durl5))
                self.pkg("set-publisher {0} {1} {2} {3} {4} \* test1".format(add_opt,
                    durl4, add_opt, durl5, remove_opt))
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl4))
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl5))
                self.pkg("set-publisher {0} \* test1".format(remove_opt))
                if etype == "origin":
                        self.pkg("set-publisher {0} {1} test1".format(add_opt, durl1))
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl4), exit=1)
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl5), exit=1)

                # Verify that if one of multiple URLs is not a valid URL, pkg
                # will exit with an error, and does not add the valid one.
                self.pkg("set-publisher {0} {1} {2} http://b^^^/ogus test1".format(
                    add_opt, durl4, add_opt), exit=1)
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl4), exit=1)

                # Verify that multiple can be added at one time.
                self.pkg("set-publisher {0} {1} {2} {3} test1".format(add_opt, durl4,
                    add_opt, durl5))
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl4))
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl5))

                # Verify that multiple can be removed at one time.
                self.pkg("set-publisher {0} {1} {2} {3} test1".format(remove_opt, durl4,
                    remove_opt, durl5))
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl4), exit=1)
                self.pkg("publisher | grep {0}.*{1}".format(etype, durl5), exit=1)

        def __verify_pub_cfg(self, prefix, pub_cfg):
                """Private helper method to verify publisher configuration."""

                # pretend like the Image object is being allocated from
                # a pkg command run from within the target image.
                cmdpath = os.path.join(self.get_img_path(), "pkg")

                img = image.Image(self.get_img_path(), should_exist=True,
                    user_provided_dir=True, cmdpath=cmdpath)
                pub = img.get_publisher(prefix=prefix)
                for section in pub_cfg:
                        for prop, val in six.iteritems(pub_cfg[section]):
                                if section == "publisher":
                                        pub_val = getattr(pub, prop)
                                else:
                                        pub_val = getattr(
                                            pub.repository, prop)

                                if prop in ("legal_uris", "mirrors", "origins",
                                    "related_uris"):
                                        # The publisher will have these as lists,
                                        # so transform both sets of data first
                                        # for reliable comparison.  Remove any
                                        # trailing slashes so comparison can
                                        # succeed.
                                        if not val:
                                                val = set()
                                        else:
                                                val = set(val)
                                        new_pub_val = set()
                                        for u in pub_val:
                                                uri = u.uri
                                                if uri.endswith("/"):
                                                        uri = uri[:-1]
                                                new_pub_val.add(uri)
                                        pub_val = new_pub_val
                                self.assertEqual(val, pub_val)

        def __update_repo_pub_cfg(self, dc, pubcfg):
                """Private helper method to update a repository's publisher
                configuration based on the provided dictionary structure."""

                rpath = dc.get_repodir()
                props = ""
                for sname in pubcfg:
                        for pname, pval in six.iteritems(pubcfg[sname]):
                                if sname == "publisher" and pname == "prefix":
                                        continue
                                pname = pname.replace("_", "-")
                                if isinstance(pval, list):
                                        props += "{0}/{1}='({2})' ".format(
                                            sname, pname, " ".join(pval))
                                else:
                                        props += "{0}/{1}='{2}' ".format(
                                            sname, pname, pval)

                pfx = pubcfg["publisher"]["prefix"]
                self.pkgrepo("set -s {0} -p {1} {2}".format(rpath, pfx, props))
                self.pkgrepo("get -p all -s {0}".format(rpath))

        def test_set_auto(self):
                """Verify that set-publisher -p works as expected."""

                # Test the single add/update case first.
                durl1 = self.dcs[1].get_depot_url()
                durl3 = self.dcs[3].get_depot_url()
                durl4 = self.dcs[4].get_depot_url()
                self.image_create(durl1, prefix="test1")

                # Should fail because test3 publisher does not exist.
                self.pkg("publisher test3", exit=1)

                # Should fail because repository is for test3 not test2.
                self.pkg("set-publisher -p {0} test2".format(durl3), exit=1)

                # Verify that a publisher can be configured even if the
                # the repository's publisher configuration does not
                # include origin information.  In this case, the client
                # will assume that the provided repository URI for
                # auto-configuration is also the origin to use for
                # all configured publishers.
                t3cfg = {
                    "publisher": {
                        "prefix": "test3",
                    },
                    "repository": {
                        "origins": [durl3],
                    },
                }
                self.pkg("set-publisher -p {0}".format(durl3))

                # Load image configuration to verify publisher was configured
                # as expected.
                self.__verify_pub_cfg("test3", t3cfg)

                # Update configuration of just this depot with more information
                # for comparison basis.
                self.dcs[3].stop()

                # Origin and mirror info wasn't known until this point, so add
                # it to the test configuration.
                t3cfg = self.test3_pub_cfg.copy()
                t3cfg["repository"]["origins"] = [durl3]
                t3cfg["repository"]["mirrors"] = [durl1, durl3, durl4]
                self.__update_repo_pub_cfg(self.dcs[3], t3cfg)
                self.dcs[3].start()

                # Should succeed and configure test3 publisher.
                self.pkg("set-publisher -p {0}".format(durl3))

                # Load image configuration to verify publisher was configured
                # as expected.
                self.__verify_pub_cfg("test3", t3cfg)

                # Now test the update case.  This verifies that the existing,
                # configured origins and mirrors will not be lost (only added
                # to) and that new data will be accepted.
                durl6 = self.dcs[6].get_depot_url()
                self.dcs[6].stop()
                t6cfg = {}
                for section in t3cfg:
                        t6cfg[section] = {}
                        for prop in t3cfg[section]:
                                val = t3cfg[section][prop]
                                if prop == "refresh_seconds":
                                        val = 1800
                                elif prop == "collection_type":
                                        val = "core"
                                elif prop not in ("alias", "prefix"):
                                        # Clear all other props.
                                        val = ""
                                t6cfg[section][prop] = val
                t6cfg["repository"]["origins"] = [durl3, durl6]
                t6cfg["repository"]["mirrors"] = [durl1, durl3, durl4, durl6]
                self.__update_repo_pub_cfg(self.dcs[6], t6cfg)
                self.dcs[6].start()

                self.pkg("set-publisher -p {0}".format(durl6))

                # Load image configuration to verify publisher was configured
                # as expected.
                self.__verify_pub_cfg("test3", t6cfg)

                # Test multi-publisher add case.
                self.pkgrepo("set -s {0} -p test2 publisher/alias=''".format(
                    self.dcs[6].get_repodir()))
                self.pkg("unset-publisher test3")
                self.dcs[6].refresh()
                self.pkg("set-publisher -P -p {0}".format(durl6))

                # Determine publisher order from output and then verify it
                # matches expected.
                def get_pubs():
                        self.pkg("publisher -HF tsv")
                        pubs = []
                        for l in self.output.splitlines():
                                pub, ignored = l.split("\t", 1)
                                if pub not in pubs:
                                        pubs.append(pub)
                        return pubs

                # Since -P was used, new publishers should be set first in
                # search order alphabetically.
                self.assertEqual(get_pubs(), ["test2", "test3", "test1"])

                # Now change search order and verify that using -P and -p again
                # won't change it since publishers already exist.
                self.pkg("set-publisher --search-after=test1 test2")
                self.pkg("set-publisher --search-after=test2 test3")
                self.assertEqual(get_pubs(), ["test1", "test2", "test3"])
                self.pkg("set-publisher -P -p {0}".format(durl6))
                self.assertEqual(get_pubs(), ["test1", "test2", "test3"])

                # Check that --proxy arguments are set on all auto-configured
                # publishers.  We use $no_proxy='*' in the environment so that
                # we can persist a dummy --proxy value to the image
                # configuration, yet still reach the test depot to obtain the
                # publisher/ response.
                self.pkg("unset-publisher test3")
                self.pkg("unset-publisher test2")
                self.pkg("set-publisher -P --proxy http://myproxy -p {0}".format(
                    durl6), env_arg={"no_proxy": "*"})
                self.assertEqual(get_pubs(), ["test2", "test3", "test1"])

                # Verify that only test2 and test3 have proxies set, since
                # test1 already existed, it should not use a proxy. The proxy
                # column is the last one printed on each line.
                self.pkg("publisher -HF tsv")
                for l in self.output.splitlines():
                        if l.startswith("test2") or l.startswith("test3"):
                                self.assertEqual("http://myproxy",
                                    l.split()[-1])
                        else:
                                self.assertEqual("-", l.split()[-1])

        def test_set_mirrors_origins(self):
                """Test set-publisher functionality for mirrors and origins."""

                durl1 = self.dcs[1].get_depot_url()
                rurl1 = self.dcs[1].get_repo_url()
                self.image_create(durl1, prefix="test1")

                # Verify that https origins can be mixed with other types
                # of origins.
                self.pkg("set-publisher -g {0} test1".format(rurl1))
                self.pkg("set-publisher --no-refresh -g https://test.invalid1 "
                    "test1")

                # Verify that a cert and key can be set even when non-https
                # origins are present.
                key_path = os.path.join(self.keys_dir, "cs1_ch1_ta3_key.pem")
                cert_path = os.path.join(self.cs_dir, "cs1_ch1_ta3_cert.pem")

                self.pkg("set-publisher --no-refresh -k {0} -c {1} test1".format(
                    key_path, cert_path))
                self.pkg("publisher test1")

                # This test relies on using the same implementation used in
                # image.py __store_publisher_ssl() which sets the paths to the
                # SSL keys/certs.
                img_key_path = os.path.join(self.img_path(), "var", "pkg",
                    "ssl", pkg.misc.get_data_digest(key_path,
                    hash_func=hashlib.sha1)[0])
                img_cert_path = os.path.join(self.img_path(), "var", "pkg",
                    "ssl", pkg.misc.get_data_digest(cert_path,
                    hash_func=hashlib.sha1)[0])
                self.assertTrue(img_key_path in self.output)
                self.assertTrue(img_cert_path in self.output)

                # Verify that removing all SSL origins does not leave key
                # and cert information intact.
                self.pkg("set-publisher -G '*' -g {0} test1".format(durl1))
                self.pkg("publisher test1")
                self.assertTrue(img_key_path not in self.output)
                self.assertTrue(img_cert_path not in self.output)

                # Verify that https mirrors can be mixed with other types of
                # origins.
                self.pkg("set-publisher -m {0} test1".format(rurl1))
                self.pkg("set-publisher --no-refresh -m https://test.invalid1 "
                    "test1")
                self.pkg("set-publisher --no-refresh -k {0} -c {1} test1".format(
                    key_path, cert_path))

                # Verify that removing all SSL mirrors does not leave key
                # and cert information intact.
                self.pkg("set-publisher -M '*' -m {0} test1".format(durl1))
                self.pkg("publisher test1")
                self.assertTrue(img_key_path not in self.output)
                self.assertTrue(img_cert_path not in self.output)

                # Test short options for mirrors.
                self.__test_mirror_origin("mirror", "-m", "-M")

                # Test long options for mirrors.
                self.__test_mirror_origin("mirror", "--add-mirror",
                    "--remove-mirror")

                # Test short options for origins.
                self.__test_mirror_origin("origin", "-g", "-G")

                # Test long options for origins.
                self.__test_mirror_origin("origin", "--add-origin",
                    "--remove-origin")

                # Verify that if multiple origins are present that -O will
                # discard all others.
                durl4 = self.dcs[4].get_depot_url()
                durl5 = self.dcs[5].get_depot_url()
                self.pkg("set-publisher -g {0} -g {1} test1".format(durl4, durl5))
                self.pkg("set-publisher -O {0} test1".format(durl4))
                self.pkg("publisher | grep origin.*{0}".format(durl1), exit=1)
                self.pkg("publisher | grep origin.*{0}".format(durl5), exit=1)

                # Verify that if a publisher is set to use a file repository
                # that removing that repository will not prevent the pkg(1)
                # command from operating or the set-publisher commands
                # from working.
                repo_path = os.path.join(self.test_root, "badrepo")
                repo_uri = "file:{0}".format(repo_path)
                self.create_repo(repo_path, properties={ "publisher": {
                    "prefix": "test1" } })
                self.pkg("set-publisher -O {0} test1".format(repo_uri))
                shutil.rmtree(repo_path)

                self.pkg("publisher")
                self.pkg("set-publisher -O {0} test1".format(
                    self.dcs[1].get_repo_url()))

                # Now verify that publishers using origins or mirrors that have
                # IPv6 addresses can be added and removed.
                self.pkg("set-publisher -g http://[::1] "
                    "-m http://[::FFFF:129.144.52.38]:80 "
                    "-m http://[2010:836B:4179::836B:4179] "
                    "-g http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:80 "
                    "--no-refresh testipv6")
                self.pkg("publisher | "
                    "grep 'http://\[::FFFF:129.144.52.38\]:80/'")
                self.pkg("set-publisher -G http://[::1] "
                    "-M http://[::FFFF:129.144.52.38]:80 "
                    "-M http://[2010:836B:4179::836B:4179] "
                    "-G http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:80 "
                    "-g http://[::192.9.5.5]/dev "
                    "--no-refresh testipv6")
                self.pkg("publisher | "
                    "grep 'http://\[::FFFF:129.144.52.38\]:80/'", exit=1)
                self.pkg("unset-publisher testipv6")

        def test_enable_disable(self):
                """Test enable and disable."""

                self.pkg("publisher")
                self.pkg("publisher | grep test1")
                self.pkg("publisher | grep test2")

                self.pkg("set-publisher -d test2")
                self.pkg("publisher | grep test2") # always show
                self.pkg("publisher -n | grep test2", exit=1) # unless -n

                self.pkg("list -a bar", exit=1)
                self.pkg("set-publisher -P test2")
                self.pkg("publisher test2")
                self.pkg("set-publisher -e test2")
                self.pkg("publisher -n | grep test2")
                self.pkg("list -a bar")

                self.pkg("set-publisher --disable test2")
                self.pkg("publisher | grep test2")
                self.pkg("publisher -n | grep test2", exit=1)
                self.pkg("list -a bar", exit=1)
                self.pkg("set-publisher --enable test2")
                self.pkg("publisher -n | grep test2")
                self.pkg("list -a bar")

        def test_origins_enable_disable(self):
                """Test enable and disable origins."""

                self.durl7 = self.dcs[7].get_depot_url()
                self.durl8 = self.dcs[8].get_depot_url()
                self.durl9 = self.dcs[9].get_depot_url()
                self.pkgsend_bulk(self.durl7, self.origin_1)
                self.pkgsend_bulk(self.durl8, self.origin_2)
                self.pkgsend_bulk(self.durl9, self.origin_2)

                # Test invalid usages.
                self.pkg("set-publisher -g '*' test4", exit=2)
                # Test adding an enabled unknown origin.
                self.pkg("set-publisher -g {0} --enable test1".format(
                    "http://unknown"), exit=1)
                # Test adding a disabled origin. Since we do not try to
                # contact the repo, it should succeed.
                self.pkg("set-publisher -g {0} --disable test1".format(
                    "http://unknown"))
                # Try to enable it will fail.
                self.pkg("set-publisher -g {0} --enable test1".format(
                    "http://unknown"), exit=1)
                self.pkg("publisher -F tsv | grep unknown")
                output = self.output.split()
                self.assertTrue(output[3] == "false")

                # Test adding an enabled origin.
                self.pkg("set-publisher --enable -g " + self.durl7 + " test4")
                self.pkg("publisher -F tsv | grep test4")
                output = self.output.split()
                self.assertTrue(output[3] == "true")
                # List and info will succeed.
                self.pkg("list -af origin1")
                self.pkg("info -r origin1")
                # Install will succeed.
                self.pkg("install origin1")
                self.pkg("uninstall origin1")

                # Disable that origin.
                self.pkg("set-publisher --disable -g " + self.durl7 + " test4")
                self.pkg("publisher -F tsv | grep test4")
                output = self.output.split()
                self.assertTrue(output[3] == "false")
                self.pkg("publisher | grep test4")
                self.assertTrue("(disabled)" not in self.output)
                self.pkg("list -af origin1", exit=1)
                self.pkg("install origin1", exit=1)

                # Enable it again.
                self.pkg("set-publisher --enable -g " + self.durl7 + " test4")
                self.pkg("list -af origin1")

                # Disable the entire publisher.
                self.pkg("set-publisher --disable test4")
                self.pkg("publisher | grep test4")
                self.assertTrue("(disabled)" in self.output)

                # The status of the origin should still be enabled.
                self.pkg("publisher -F tsv | grep test4")
                output = self.output.split()
                self.assertTrue(output[3] == "true")

                self.pkg("list -af origin1", exit=1)
                self.pkg("info origin1", exit=1)
                self.pkg("install origin1", exit=1)
                self.pkg("unset-publisher test4")

                # Test adding a disabled origin.
                self.pkg("set-publisher --disable -g " + self.durl8 + " test5")
                self.pkg("publisher -F tsv | grep test5")
                output = self.output.split()
                self.assertTrue(output[3] == "false")
                self.pkg("publisher | grep test5")
                self.assertTrue("(disabled)" not in self.output)
                self.pkg("list -af origin2", exit=1)
                self.pkg("info -r origin2", exit=1)
                # Install will fail.
                self.pkg("install origin2", exit=1)

                # Remove the origin but do not remove the publisher.
                self.pkg("set-publisher -G " + self.durl8 + " test5")
                # Then add the origin back and disable it.
                self.pkg("set-publisher --disable -g " + self.durl8 + " test5")
                self.pkg("publisher -F tsv | grep test5")
                output = self.output.split()
                self.assertTrue(output[3] == "false")
                self.pkg("publisher | grep test5")
                self.assertTrue("(disabled)" not in self.output)
                self.pkg("list -af origin2", exit=1)
                self.pkg("info -r origin2", exit=1)
                # Install will fail.
                self.pkg("install origin2", exit=1)

                # Test wildcard.
                self.pkg("set-publisher --enable -g '*' test5")
                self.pkg("publisher -F tsv | grep test5")
                output = self.output.split()
                self.assertTrue(output[3] == "true")
                self.pkg("publisher | grep test5")
                self.assertTrue("(disabled)" not in self.output)
                self.pkg("list -af origin2")
                self.pkg("info -r origin2")
                self.pkg("install -nv origin2")

                self.pkg("set-publisher --disable -g '*' test5")
                self.pkg("publisher -F tsv | grep test5")
                output = self.output.split()
                self.assertTrue(output[3] == "false")
                self.pkg("publisher | grep test5")
                self.assertTrue("(disabled)" not in self.output)
                self.pkg("list -af origin2", exit=1)
                self.pkg("info -r origin2", exit=1)
                # Install will fail.
                self.pkg("install origin2", exit=1)

                # Test two origins case for one publisher.
                self.pkg("set-publisher -g {0} test4".format(
                    self.durl9))
                # Disable one of origins for test4.
                self.pkg("set-publisher -g {0} --disable test4".format(
                    self.durl7))
                self.pkg("publisher -HF tsv | grep test4")
                outputs = self.output.split("\n")
                output = outputs[0].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "false")
                output = outputs[1].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "true")
                # (disabled) indicating publisher level disable should not be
                # in the output.
                self.pkg("publisher | grep test4")
                self.assertTrue("(disabled)" not in self.output)

                # Test installing package from a disabled origin should fail.
                self.pkg("install origin1", exit=1)
                self.pkg("list -af origin1", exit=1)
                self.pkg("info -r origin1", exit=1)
                # Even after refresh, contents for disabled origin should still
                # remain unavailable.
                self.pkg("refresh --full")
                self.pkg("install origin1", exit=1)
                self.pkg("list -af origin1", exit=1)
                self.pkg("info -r origin1", exit=1)

                # Test installing package from the other enabled origin should
                # succeed.
                self.pkg("install origin2")
                self.pkg("uninstall origin2")

                # Disable the other origin.
                self.pkg("set-publisher -g {0} --disable test4".format(
                    self.durl9))
                self.pkg("publisher | grep test4")
                self.assertTrue("(disabled)" not in self.output)
                self.pkg("publisher -HF tsv | grep test4")
                outputs = self.output.split("\n")
                output = outputs[0].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "false")
                output = outputs[1].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "false")
                self.pkg("publisher -Hn | grep test4")
                self.assertTrue("test4" in self.output)
                # Install origin2 will fail.
                self.pkg("install origin2", exit=1)
                # List and info will also fail.
                self.pkg("list -af origin2", exit=1)
                self.pkg("info -r origin2", exit=1)

                # Enable both origins.
                self.pkg("set-publisher -g '*' --enable test4")
                self.pkg("publisher | grep test4")
                self.assertTrue("(disabled)" not in self.output)
                self.pkg("publisher -HF tsv | grep test4")
                outputs = self.output.split("\n")
                output = outputs[0].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "true")
                output = outputs[1].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "true")
                self.pkg("publisher -Hn | grep test4")
                self.pkg("install -nv origin1")
                self.pkg("install -nv origin2")
                self.pkg("list -af origin1")
                self.pkg("info -r origin1")
                self.pkg("list -af origin2")
                self.pkg("info -r origin2")

                # Installing package from an enabled unreachable origin will
                # still fail.
                self.dcs[7].stop()
                self.pkg("install origin1", exit=1)
                self.dcs[7].start()

                # Disable both origins.
                self.pkg("set-publisher -g '*' --disable test4")
                self.pkg("publisher -HF tsv | grep test4")
                outputs = self.output.split("\n")
                output = outputs[0].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "false")
                output = outputs[1].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "false")
                self.pkg("install origin1", exit=1)
                self.pkg("list -af origin1", exit=1)
                self.pkg("info -r origin1", exit=1)
                self.pkg("install origin2", exit=1)
                self.pkg("list -af origin2", exit=1)
                self.pkg("info -r origin2", exit=1)
                # Even after refresh, contents for disabled origin should still
                # remain unavailable.
                self.pkg("refresh --full")
                self.pkg("install origin1", exit=1)
                self.pkg("list -af origin1", exit=1)
                self.pkg("info -r origin1", exit=1)
                self.pkg("install origin2", exit=1)
                self.pkg("list -af origin2", exit=1)
                self.pkg("info -r origin2", exit=1)

                # Test -g, -G and --disable.
                self.pkg("set-publisher -G " + self.durl9  + " -g " + self.durl7
                    + " --disable test4")
                self.pkg("publisher -HF tsv | grep test4")
                outputs = self.output.split("\n")
                output = outputs[0].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "false")
                self.assertTrue(self.durl7 in self.output)
                self.assertTrue(self.durl9 not in self.output)

                # Removing an non-existing origin fails the operation.
                self.pkg("set-publisher -G http://unknown  -g " + self.durl7
                    + " --enable test4", exit=1)
                self.pkg("publisher -HF tsv | grep test4")
                outputs = self.output.split("\n")
                output = outputs[0].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "false")
                self.assertTrue(self.durl7 in self.output)
                self.assertTrue(self.durl9 not in self.output)

                # Remove all origins and add a new one.
                self.pkg("set-publisher -G '*'  -g " + self.durl9
                    + " --disable test4")
                self.pkg("publisher -HF tsv | grep test4")
                outputs = self.output.split("\n")
                output = outputs[0].split("\t")
                self.assertTrue(output[0] == "test4" and output[3] == "false")
                self.assertTrue(self.durl9 in self.output)
                self.assertTrue(self.durl7 not in self.output)
                self.pkg("publisher | grep test4")
                self.assertTrue("(disabled)" not in self.output)

                # Removing an unknown origin for a publisher not set yet will
                # fail.
                self.pkg("unset-publisher test4")
                self.pkg("set-publisher -G " + self.durl7 + " -g " +
                    self.durl9 + " --disable " + "test4", exit=1)

                # Add a new publisher with a disabled origin, plus removing any
                # possible origins (actually no origin exists).
                self.pkg("unset-publisher test5")
                self.pkg("set-publisher -G '*' -g " + self.durl8 + " --disable "
                    + "test5")
                self.pkg("publisher -HF tsv | grep test5")
                outputs = self.output.split("\n")
                output = outputs[0].split("\t")
                self.assertTrue(output[0] == "test5" and output[3] == "false")
                self.assertTrue(self.durl8 in self.output)
                self.pkg("publisher | grep test5")
                self.assertTrue("(disabled)" not in self.output)

        def test_search_order(self):
                """Test moving search order around"""

                # The expected publisher order is test1, test2, test3, with all
                # publishers enabled and sticky.
                self.pkg("set-publisher -e -P test1")
                self.pkg("set-publisher -e --search-after test1 test2")
                self.pkg("set-publisher -e --search-after test2 test3")
                self.pkg("publisher") # ease debugging
                self.pkg("publisher -H | head -1 | egrep test1")
                self.pkg("publisher -H | head -2 | egrep test2")
                self.pkg("publisher -H | head -3 | egrep test3")
                # make test2 disabled, make sure order is preserved
                self.pkg("set-publisher --disable test2")
                self.pkg("publisher") # ease debugging
                self.pkg("publisher -H | head -1 | egrep test1")
                self.pkg("publisher -H | head -2 | egrep test2")
                self.pkg("publisher -H | head -3 | egrep test3")
                self.pkg("set-publisher --enable test2")
                # make test3 preferred
                self.pkg("set-publisher -P test3")
                self.pkg("publisher") # ease debugging
                self.pkg("publisher -H | head -1 | egrep test3")
                self.pkg("publisher -H | head -2 | egrep test1")
                self.pkg("publisher -H | head -3 | egrep test2")
                # move test3 after test1
                self.pkg("set-publisher --search-after=test1 test3")
                self.pkg("publisher") # ease debugging
                self.pkg("publisher -H | head -1 | egrep test1")
                self.pkg("publisher -H | head -2 | egrep test3")
                self.pkg("publisher -H | head -3 | egrep test2")
                # move test2 before test3
                self.pkg("set-publisher --search-before=test3 test2")
                self.pkg("publisher") # ease debugging
                self.pkg("publisher -H | head -1 | egrep test1")
                self.pkg("publisher -H | head -2 | egrep test2")
                self.pkg("publisher -H | head -3 | egrep test3")
                # make sure we cannot get ahead or behind of ourselves
                self.pkg("set-publisher --search-before=test3 test3", exit=1)
                self.pkg("set-publisher --search-after=test3 test3", exit=1)

                # make sure that setting search order while adding a publisher
                # works
                self.pkg("unset-publisher test2")
                self.pkg("unset-publisher test3")
                self.pkg("set-publisher --search-before=test1 test2")
                self.pkg("set-publisher --search-after=test2 test3")
                self.pkg("publisher") # ease debugging
                self.pkg("publisher -H | head -1 | egrep test2")
                self.pkg("publisher -H | head -2 | egrep test3")
                self.pkg("publisher -H | head -3 | egrep test1")

        def test_publishers_only_from_installed_packages(self):
                """Test that get_highest_rank_publisher works when there are
                installed packages but no configured publishers."""

                self.pkg("install foo bar baz")
                self.pkg("unset-publisher test1")
                self.pkg("unset-publisher test2")
                self.pkg("unset-publisher test3")
                self.pkg("publisher")

                # set publishers to expected configuration
                self.pkg("set-publisher -p {0}".format(self.durl1))
                self.pkg("set-publisher -p {0}".format(self.durl2))
                self.pkg("set-publisher -p {0}".format(self.durl3))

        def test_bug_18283(self):
                """Test that having a unset publisher with packages installed
                doesn't break adding a publisher with the -P option."""

                # Test what happens when another publisher is configured.
                self.pkg("unset-publisher test2")
                self.pkg("install foo")
                self.pkg("unset-publisher test1")
                self.pkg("set-publisher -P -p {0}".format(self.durl2))

                # Test what happens when no publishers are configured
                self.pkg("unset-publisher test2")
                self.pkg("unset-publisher test3")
                self.pkg("set-publisher -P -p {0}".format(self.durl2))

                # set publishers to expected configuration
                self.pkg("set-publisher -P -p {0}".format(self.durl1))
                self.pkg("set-publisher -p {0}".format(self.durl3))


class TestMultiPublisherRepo(pkg5unittest.ManyDepotTestCase):

        foo1 = """
            open foo@1,5.11-0
            close """

        bar1 = """
            open bar@1,5.11-0
            close """

        baz1 = """
            open pkg://another-pub/baz@1,5.11-0
            close """


        def setUp(self):
                # This test suite needs actual depots.
                pkg5unittest.ManyDepotTestCase.setUp(self, ["test1", "test2"])

                self.rurl1 = self.dcs[1].get_repo_url()
                self.pkgsend_bulk(self.rurl1, self.foo1 + self.baz1)

                self.rurl2 = self.dcs[2].get_repo_url()
                self.pkgsend_bulk(self.rurl2, self.bar1)

        def test_multi_publisher_search_before(self):
                """Test that using search before and -p on a multipublisher
                repository works."""

                self.image_create(self.rurl2)
                self.pkg("set-publisher --search-before test2 -p {0}".format(
                    self.rurl1))
                self.pkg("publisher -HF tsv")
                lines = self.output.splitlines()
                self.assertEqual(lines[0].split()[0], "another-pub")
                self.assertEqual(lines[1].split()[0], "test1")
                self.assertEqual(lines[2].split()[0], "test2")

        def test_multiple_p_option(self):
                """Verify that providing multiple repositories using
                -p option fails"""
                self.image_create()
                self.pkg("set-publisher -p {0} -p {1}".format(self.rurl1,
                    self.rurl2), exit=2)


class TestPkgPublisherCACerts(pkg5unittest.ManyDepotTestCase):
        # Tests in this suite use the read only data directory.
        need_ro_data = True

        def setUp(self):
                # This test suite needs actual depots.
                pkg5unittest.ManyDepotTestCase.setUp(self, ["test1", "test2"])

                self.rurl1 = self.dcs[1].get_repo_url()
                self.rurl2 = self.dcs[2].get_repo_url()
                self.image_create(self.rurl1, prefix="test1")

        def test_new_publisher_ca_certs_with_refresh(self):
                """Check that approving and revoking CA certs is reflected in
                the output of pkg publisher and that setting the CA certs when
                setting a new publisher works correctly."""

                cert_dir = os.path.join(self.ro_data_root,
                    "signing_certs", "produced", "chain_certs")

                app1 = os.path.join(cert_dir, "ch4_ta1_cert.pem")
                app2 = os.path.join(cert_dir, "ch1_ta3_cert.pem")
                rev1 = os.path.join(cert_dir, "ch1_ta4_cert.pem")
                rev2 = os.path.join(cert_dir, "ch1_ta5_cert.pem")
                app1_h = self.calc_pem_hash(app1)
                app2_h = self.calc_pem_hash(app2)
                rev1_h = self.calc_pem_hash(rev1)
                rev2_h = self.calc_pem_hash(rev2)
                self.pkg("set-publisher -O {0} "
                    "--approve-ca-cert {1} "
                    "--approve-ca-cert {2} --revoke-ca-cert {3} "
                    "--revoke-ca-cert {4} test2 ".format(self.dcs[2].get_repo_url(),
                    app1, app2, rev1_h, rev2_h))
                self.pkg("publisher test2")
                r1 = "         Approved CAs: {0}"
                r2 = "                     : {0}"
                r3 = "          Revoked CAs: {0}"
                ls = self.output.splitlines()
                found_approved = False
                found_revoked = False
                for i in range(0, len(ls)):
                        if "Approved CAs" in ls[i]:
                                found_approved = True
                                if not ((r1.format(app1_h) == ls[i] and
                                    r2.format(app2_h) == ls[i+1]) or \
                                    (r1.format(app2_h) == ls[i] and
                                    r2.format(app1_h) == ls[i+1])):
                                        raise RuntimeError("Expected to see "
                                            "{0} and {1} as approved certs. "
                                            "Output was:\n{2}".format(app1_h,
                                            app2_h, self.output))
                        elif "Revoked CAs" in ls[i]:
                                found_approved = True
                                if not ((r3.format(rev1_h) == ls[i] and
                                    r2.format(rev2_h) == ls[i+1]) or \
                                    (r3.format(rev2_h) == ls[i] and
                                    r2.format(rev1_h) == ls[i+1])):
                                        raise RuntimeError("Expected to see "
                                            "{0} and {1} as revoked certs. "
                                            "Output was:\n{2}".format(rev1_h,
                                            rev2_h, self.output))

        def test_new_publisher_ca_certs_no_refresh(self):
                """Check that approving and revoking CA certs is reflected in
                the output of pkg publisher and that setting the CA certs when
                setting a new publisher works correctly."""

                cert_dir = os.path.join(self.ro_data_root,
                    "signing_certs", "produced", "chain_certs")

                app1 = os.path.join(cert_dir, "ch3_ta1_cert.pem")
                app2 = os.path.join(cert_dir, "ch1_ta3_cert.pem")
                rev1 = os.path.join(cert_dir, "ch1_ta4_cert.pem")
                rev2 = os.path.join(cert_dir, "ch1_ta5_cert.pem")
                app1_h = self.calc_pem_hash(app1)
                app2_h = self.calc_pem_hash(app2)
                rev1_h = self.calc_pem_hash(rev1)
                rev2_h = self.calc_pem_hash(rev2)
                self.pkg("set-publisher -O {0} --no-refresh "
                    "--approve-ca-cert {1} "
                    "--approve-ca-cert {2} --revoke-ca-cert {3} "
                    "--revoke-ca-cert {4} test2 ".format(self.dcs[2].get_repo_url(),
                    app1, app2, rev1_h, rev2_h))
                self.pkg("publisher test2")
                r1 = "         Approved CAs: {0}"
                r2 = "                     : {0}"
                r3 = "          Revoked CAs: {0}"
                ls = self.output.splitlines()
                found_approved = False
                found_revoked = False
                for i in range(0, len(ls)):
                        if "Approved CAs" in ls[i]:
                                found_approved = True
                                if not ((r1.format(app1_h) == ls[i] and
                                    r2.format(app2_h) == ls[i+1]) or \
                                    (r1.format(app2_h) == ls[i] and
                                    r2.format(app1_h) == ls[i+1])):
                                        raise RuntimeError("Expected to see "
                                            "{0} and {1} as approved certs. "
                                            "Output was:\n{2}".format(app1_h,
                                            app2_h, self.output))
                        elif "Revoked CAs" in ls[i]:
                                found_approved = True
                                if not ((r3.format(rev1_h) == ls[i] and
                                    r2.format(rev2_h) == ls[i+1]) or \
                                    (r3.format(rev2_h) == ls[i] and
                                    r2.format(rev1_h) == ls[i+1])):
                                        raise RuntimeError("Expected to see "
                                            "{0} and {1} as revoked certs. "
                                            "Output was:\n{2}".format(rev1_h,
                                            rev2_h, self.output))


if __name__ == "__main__":
        unittest.main()