usr/src/lib/install_target/test/test_shadow_list.py
author Drew Fisher <drew.fisher@oracle.com>
Tue, 16 Aug 2011 10:06:20 -0600
changeset 1390 b05ba57bf117
parent 1355 7e6507b2a970
permissions -rw-r--r--
7079464 Add math functions to the Size class

#!/usr/bin/python
#
# CDDL HEADER START
#
# The contents of this file are subject to the terms of the
# Common Development and Distribution License (the "License").
# You may not use this file except in compliance with the License.
#
# You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
# or http://www.opensolaris.org/os/licensing.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# When distributing Covered Code, include this CDDL HEADER in each
# file and include the License file at usr/src/OPENSOLARIS.LICENSE.
# If applicable, add the following below this CDDL HEADER, with the
# fields enclosed by brackets "[]" replaced with your own identifying
# information: Portions Copyright [yyyy] [name of copyright owner]
#
# CDDL HEADER END
#

#
# Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
#

""" test_shadow_list.py - collection of unittests for testing target validation
through shadow list manipulation
"""
import copy
import platform
import unittest

from nose.plugins.skip import SkipTest

import osol_install.errsvc as errsvc

from osol_install.liberrsvc import ES_DATA_EXCEPTION

from solaris_install import Popen
from solaris_install.target import Target
from solaris_install.target.physical import Disk, DiskGeometry, DiskProp, \
    Slice, Partition
from solaris_install.target.libadm.const import MAX_EXT_PARTS, V_NUMPAR
from solaris_install.target.logical import BE, Logical, Vdev, Zpool
from solaris_install.target.shadow.physical import LOGICAL_ADJUSTMENT, \
    ShadowPhysical
from solaris_install.target.shadow.logical import ShadowLogical
from solaris_install.target.shadow.zpool import ShadowZpool
from solaris_install.target.size import Size
from solaris_install.target.vdevs import _get_vdev_mapping


GBSECTOR = long(1024 * 1024 * 1024 / 512)  # 1 GB of sectors
BLOCKSIZE = 512
CYLSIZE = 16065


class TestZpool(unittest.TestCase):
    """ test case to test manipulation of Zpool objects via the API and shadow
    list interfaces
    """
    def setUp(self):
        # create DOC objects
        self.target = Target("target")
        self.logical = Logical("logical")
        self.target.insert_children(self.logical)

        # reset the errsvc
        errsvc.clear_error_list()

    def tearDown(self):
        self.target.delete_children()
        self.target.delete()

        # reset the errsvc
        errsvc.clear_error_list()

    def test_add_simple_zpool(self):
        # create a DOC tree
        self.logical.add_zpool("test_zpool")
        self.assertFalse(errsvc._ERRORS)

    def test_add_duplicate_zpool_names(self):
        self.logical.add_zpool("test_zpool")
        self.logical.add_zpool("test_zpool")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowZpool.DuplicateZpoolNameError))

    def test_add_two_root_pools(self):
        self.logical.add_zpool("test_zpool", is_root=True)
        self.logical.add_zpool("test_zpool2", is_root=True)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowZpool.TooManyRootPoolsError))

    def test_add_overlapping_mountpoints(self):
        self.logical.add_zpool("test_zpool", mountpoint="/foo")
        self.logical.add_zpool("test_zpool2", mountpoint="/foo/bar")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowZpool.DuplicateMountpointError))

    def test_delete_zpool(self):
        zpool1 = self.logical.add_zpool("test_zpool")
        zpool2 = self.logical.add_zpool("test_zpool2")

        # verify there are 2 entries in the _children list
        self.assertEqual(len(self.logical._children), 2)

        # delete zpool2
        zpool2.delete()

        # verify there is only one entry in the _children list and the proper
        # zpool was removed
        self.assertEqual(len(self.logical._children), 1)
        self.assertEqual(self.logical._children[0].name, "test_zpool")

        # delete zpool1
        zpool1.delete()

        # verify there are no entries in the _children list
        self.assertFalse(self.logical._children)

    def test_delete_zpool_twice(self):
        zpool = self.logical.add_zpool("test_zpool")
        self.logical.delete_zpool(zpool)
        self.assertFalse(self.logical._children)

        # try to delete the zpool again.
        self.logical.delete_zpool(zpool)
        self.assertFalse(self.logical._children)

    def test_delete_zpool_from_logical(self):
        zpool1 = self.logical.add_zpool("test_zpool")
        zpool2 = self.logical.add_zpool("test_zpool2")

        # verify there are 2 entries in the _children list
        self.assertEqual(len(self.logical._children), 2)

        # delete zpool2
        self.logical.delete_zpool(zpool2)

        # verify there is only one entry in the _children list and the proper
        # zpool was removed
        self.assertEqual(len(self.logical._children), 1)
        self.assertEqual(self.logical._children[0].name, "test_zpool")

    def test_add_vdev(self):
        # create a zpool and a vdev
        zpool = self.logical.add_zpool("test_zpool")
        zpool.add_vdev(label="test_vdev", redundancy="mirror")
        self.assertTrue(zpool.get_descendants(class_type=Vdev))

    def test_delete_vdev(self):
        # create a zpool and two vdevs
        zpool = self.logical.add_zpool("test_zpool")
        vdev1 = zpool.add_vdev(label="test_vdev1", redundancy="mirror")
        vdev2 = zpool.add_vdev(label="test_vdev2", redundancy="raidz")

        # verify there are two vdevs
        self.assertEqual(len(zpool.get_descendants(class_type=Vdev)), 2)

        # delete vdev2
        zpool.delete_vdev(vdev2)

        # verify there is only one vdev
        self.assertEqual(len(zpool.get_descendants(class_type=Vdev)), 1)

        # delete vdev1
        zpool.delete_vdev(vdev1)

        # verify there are no vdevs
        self.assertFalse(zpool.get_descendants(class_type=Vdev))


class TestZFSFilesystem(unittest.TestCase):
    """ test case to test the manipulation of ZFS filesystems via the API and
    shadow list interface.
    """
    def setUp(self):
        self.zpool = Zpool("zpool")

        # reset the errsvc
        errsvc.clear_error_list()

    def tearDown(self):
        self.zpool.delete_children()

        # reset the errsvc
        errsvc.clear_error_list()

    def test_add_simple_filesystem(self):
        self.zpool.add_filesystem("test_filesystem")
        self.assertFalse(errsvc._ERRORS)

    def test_add_simple_filesystem_with_mountpoint(self):
        self.zpool.add_filesystem("test_filesystem", "/mountpoint")
        self.assertFalse(errsvc._ERRORS)

    def test_add_duplicate_filesystem_names(self):
        self.zpool.add_filesystem("test_filesystem")
        self.assertFalse(errsvc._ERRORS)

        self.zpool.add_filesystem("test_filesystem")
        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowLogical.DuplicateDatasetNameError))

    def test_add_duplicate_mountpoints(self):
        self.zpool.add_filesystem("test_filesystem", "/a")
        self.assertFalse(errsvc._ERRORS)

        self.zpool.add_filesystem("test_filesystem_2", "/a")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowLogical.DuplicateMountpointError))

    def test_delete_filesystem(self):
        fs1 = self.zpool.add_filesystem("test_filesystem1")
        fs2 = self.zpool.add_filesystem("test_filesystem2")

        # verify there are 2 entries in _children
        self.assertEqual(len(self.zpool._children), 2)

        # delete fs2
        fs2.delete()

        # verify there is only one entry in the _children list and the proper
        # fs was removed
        self.assertEqual(len(self.zpool._children), 1)
        self.assertEqual(self.zpool._children[0].name, "test_filesystem1")

        # delete fs1
        fs1.delete()

        # verify there are no entries in _children
        self.assertFalse(self.zpool._children)

    def test_delete_filesystem_from_zpool(self):
        fs1 = self.zpool.add_filesystem("test_filesystem1")
        fs2 = self.zpool.add_filesystem("test_filesystem2")

        # verify there are 2 entries in the _children list
        self.assertEqual(len(self.zpool._children), 2)

        # delete zpool2
        self.zpool.delete_filesystem(fs2)

        # verify there is only one entry in the _children list and the proper
        # zpool was removed
        self.assertEqual(len(self.zpool._children), 1)
        self.assertEqual(self.zpool._children[0].name, "test_filesystem1")

        # delete zpool2 again
        self.zpool.delete_filesystem(fs2)

        # verify there is only one entry in the _children list and the proper
        # zpool was removed
        self.assertEqual(len(self.zpool._children), 1)
        self.assertEqual(self.zpool._children[0].name, "test_filesystem1")


class TestZvol(unittest.TestCase):
    """ test case to test the manipulation of Zvols via the API and
    shadow list interface.
    """
    def setUp(self):
        self.target = Target("target")

        # create a disk object
        self.disk = Disk("test disk")
        self.disk.in_zpool = "zpool"
        self.disk.ctd = "c12345t0d0"
        self.disk.geometry = DiskGeometry(BLOCKSIZE, CYLSIZE)

        # 250GB disk
        self.disk.disk_prop = DiskProp()
        self.disk.disk_prop.dev_size = Size(
            str(GBSECTOR * 250) + Size.sector_units)
        self.disk.disk_prop.blocksize = BLOCKSIZE

        self.logical = Logical("logical")
        self.zpool = Zpool("zpool")
        self.logical.insert_children(self.zpool)

        self.target.insert_children([self.disk, self.logical])

        # reset the errsvc
        errsvc.clear_error_list()

    def tearDown(self):
        self.logical.delete_children()
        self.logical.delete()

        # reset the errsvc
        errsvc.clear_error_list()

    def test_add_simple_zvol(self):
        self.zpool.add_zvol("test_zvol", "10")
        self.assertFalse(errsvc._ERRORS)

    def test_add_duplicate_zvol_names(self):
        self.zpool.add_zvol("test_zvol", "10")
        self.assertFalse(errsvc._ERRORS)

        self.zpool.add_zvol("test_zvol", "10")
        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowLogical.DuplicateDatasetNameError))

    def test_delete_zvol(self):
        zvol1 = self.zpool.add_zvol("zvol1", "10")
        zvol2 = self.zpool.add_zvol("zvol2", "10")

        # verify there are 2 entries in the _children list
        self.assertEqual(len(self.zpool._children), 2)

        # delete zvol2
        zvol2.delete()

        # verify there is only one entry in the _children list and the proper
        # zvol was removed
        self.assertEqual(len(self.zpool._children), 1)
        self.assertEqual(self.zpool._children[0].name, "zvol1")

    def test_delete_zvol_from_zpool(self):
        zvol1 = self.zpool.add_zvol("zvol1", "10")
        zvol2 = self.zpool.add_zvol("zvol2", "10")

        # verify there are 2 entries in the _children list
        self.assertEqual(len(self.zpool._children), 2)

        # delete zvol2
        self.zpool.delete_zvol(zvol2)

        # verify there is only one entry in the _children list and the proper
        # zvol was removed
        self.assertEqual(len(self.zpool._children), 1)
        self.assertEqual(self.zpool._children[0].name, "zvol1")

    def test_resize_zvol(self):
        zvol = self.zpool.add_zvol("zvol1", "10")

        # verify the size is 10gb
        self.assertEqual(self.zpool._children[0].size, "10g")

        # resize it to 20gb
        zvol.resize("20", Size.gb_units)

        # verify the size is 20gb
        self.assertEqual(self.zpool._children[0].size, "20g")

    def test_swap_zvol_with_noswap_set(self):
        self.logical.noswap = True
        zvol = self.zpool.add_zvol("swap_zvol", "10", Size.gb_units, "swap")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowLogical.NoswapMismatchError))

    def test_dump_zvol_with_nodump_set(self):
        self.logical.nodump = True
        zvol = self.zpool.add_zvol("dump_zvol", "10", Size.gb_units, "dump")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowLogical.NodumpMismatchError))


class TestPartition(unittest.TestCase):
    def setUp(self):
        self.disk = Disk("test disk")
        self.disk.ctd = "c12345t0d0"
        self.disk.geometry = DiskGeometry(BLOCKSIZE, CYLSIZE)
        self.disk.label = "VTOC"

        # 100GB disk
        self.disk.disk_prop = DiskProp()
        self.disk.disk_prop.dev_size = Size(
            str(GBSECTOR * 100) + Size.sector_units)
        self.disk.disk_prop.blocksize = BLOCKSIZE

        # reset the errsvc
        errsvc.clear_error_list()

    def tearDown(self):
        self.disk.delete_children()
        self.disk.delete()

        # reset the errsvc
        errsvc.clear_error_list()

    def test_add_single_partition(self):
        self.disk.add_partition(1, 0, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)

    def test_duplicate_partition(self):
        # add 2 1GB partition with the same index but different starting sector
        self.disk.add_partition(1, 0, 1, Size.gb_units)

        # account for the first silce moving from a start_sector of 0 to
        # CYLSIZE
        self.disk.add_partition(1, CYLSIZE + GBSECTOR + 1, 1,
                                Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.DuplicatePartitionNameError))

    def test_overlapping_starting_sector(self):
        # add a 10 GB disk
        self.disk.add_partition(0, CYLSIZE, 10, Size.gb_units)
        # add an 11 GB disk
        self.disk.add_partition(1, CYLSIZE, 11, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingPartitionError))

    def test_overlapping_ending_sector(self):
        # add a 9 GB disk on the second cylinder boundary
        self.disk.add_partition(0, CYLSIZE * 2, 9, Size.gb_units)
        # add a 10 GB disk on the first cylinder boundary
        self.disk.add_partition(1, CYLSIZE, 10, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingPartitionError))

    def test_add_two_extended_partitions(self):
        # add 2 1GB partitions
        self.disk.add_partition(1, 0, 1, Size.gb_units,
            partition_type=Partition.name_to_num("WIN95 Extended(LBA)"))

        # account for the first silce moving from a start_sector of 0 to
        # CYLSIZE
        self.disk.add_partition(2, CYLSIZE + GBSECTOR + 1, 1, Size.gb_units,
            partition_type=Partition.name_to_num("WIN95 Extended(LBA)"))

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.TooManyExtPartitionsError))

    def test_delete_partition(self):
        p1 = self.disk.add_partition(1, 0, 1, Size.gb_units)
        p2 = self.disk.add_partition(2, GBSECTOR + 1, 1, Size.gb_units)

        # verify there are 2 entries in the _children
        self.assertEqual(len(self.disk._children), 2)

        # delete partition 2
        p2.delete()

        # verify there is only one entry in the _children and the proper
        # partition was removed.
        self.assertEqual(len(self.disk._children), 1)
        self.assertEqual(self.disk._children[0].name, 1)

    def test_delete_partition_with_slice_children(self):
        # add a 10gb partition
        p1 = self.disk.add_partition(1, 0, 10, Size.gb_units)

        # add two slices
        p1.add_slice(0, CYLSIZE, 1, Size.gb_units)
        p1.add_slice(1, GBSECTOR + CYLSIZE + 1, 1, Size.gb_units)

        # delete the partition
        p1.delete()

        # verify there are no errors
        self.assertFalse(errsvc._ERRORS)

        # verify the _children is empty
        self.assertFalse(self.disk._children)

    def test_delete_partition_from_disk(self):
        p1 = self.disk.add_partition(1, 0, 1, Size.gb_units)
        p2 = self.disk.add_partition(2, GBSECTOR + 1, 1, Size.gb_units)

        # verify there are 2 entries in the _children
        self.assertEqual(len(self.disk._children), 2)

        # delete partition 2
        self.disk.delete_partition(p2)

        # verify there is only one entry in the _children and the proper
        # partition was removed.
        self.assertEqual(len(self.disk._children), 1)
        self.assertEqual(self.disk._children[0].name, 1)

    def test_resize_partition(self):
        # create a 1 GB partition
        p1 = self.disk.add_partition(1, CYLSIZE, 1, Size.gb_units)
        self.disk.insert_children(p1)

        # verify the size of the slice is 1 GB of sectors, rounded for CYLSIZE
        self.assertEqual(p1.size.sectors, (GBSECTOR / CYLSIZE) * CYLSIZE)

        # resize it to 5 GB
        new_p1 = p1.resize(5, Size.gb_units)

        # verify the size of the partition is 5 GB of sectors
        self.assertEqual(new_p1.size.sectors,
            ((5 * GBSECTOR) / CYLSIZE * CYLSIZE))

    def test_in_zpool_conflict_with_parent(self):
        # set the in_zpool attribute in the disk
        self.disk.in_zpool = "tank"

        # create a 1 GB partition
        p1 = self.disk.add_partition(1, 0, 1, Size.gb_units, in_zpool="tank")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.OverlappingPartitionZpoolError))

    def test_in_vdev_conflict_with_parent(self):
        # set the in_vdev attribute in the disk
        self.disk.in_vdev = "a label"

        # create a 1 GB partition
        self.disk.add_partition(1, 0, 1, Size.gb_units, in_vdev="a label")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.OverlappingPartitionVdevError))

    def test_change_type(self):
        p = self.disk.add_partition(1, 0, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(p.part_type, Partition.name_to_num("Solaris2"))

        new_p = p.change_type(Partition.name_to_num("WIN95 Extended(LBA)"))
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(new_p.part_type, \
            Partition.name_to_num("WIN95 Extended(LBA)"))

    def test_extended_partition_too_small(self):
        self.disk.add_partition(1, CYLSIZE, 10, Size.sector_units,
            partition_type=Partition.name_to_num("WIN95 Extended(LBA)"))

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.ExtPartitionTooSmallError))

    def test_fat16_partition_too_large(self):
        self.disk.add_partition(1, 0, 10, Size.gb_units,
                                partition_type=Partition.name_to_num(
                                "FAT16 (Upto 32M)"))

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.FAT16PartitionTooLargeError))

    def test_whole_disk_is_true(self):
        self.disk.whole_disk = True
        self.disk.add_partition(1, 0, 10, Size.gb_units,
            partition_type=Partition.name_to_num("Solaris2"))

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.WholeDiskIsTrueError))

    def test_whole_disk_is_false(self):
        self.disk.whole_disk = False
        self.disk.add_partition(1, 0, 10, Size.gb_units,
            partition_type=Partition.name_to_num("Solaris2"))

        # verify that there are no errors
        self.assertFalse(errsvc._ERRORS)

    def test_holey_object_no_children(self):
        holey_list = self.disk.get_gaps()
        self.assertEqual(holey_list[0].size, self.disk.disk_prop.dev_size)

        # verify the logical list is empty as well
        self.assertFalse(self.disk.get_logical_partition_gaps())

    def test_holey_object_one_child_start_at_nonzero(self):
        # add a single partition, starting at start sector CYLSIZE
        p = self.disk.add_partition(1, CYLSIZE, 25, Size.gb_units)

        disksize = self.disk.disk_prop.dev_size.sectors

        holey_list = self.disk.get_gaps()

        # verify the holey list is correct
        self.assertEqual(len(holey_list), 1)

        # round the expected value up to the next cylinder boundary
        rounded_value = (((CYLSIZE + p.size.sectors + 1) / CYLSIZE) * \
            CYLSIZE) + CYLSIZE
        difference = rounded_value - (p.start_sector + p.size.sectors)

        rounded_size = \
            ((disksize - CYLSIZE - p.size.sectors - difference) / CYLSIZE) * \
                CYLSIZE

        self.assertEqual(holey_list[0].start_sector, rounded_value)
        self.assertEqual(holey_list[0].size.sectors, rounded_size)

        # verify the logical list is empty as well
        self.assertFalse(self.disk.get_logical_partition_gaps())

    def test_holey_object_logical_partitions(self):
        # add a single extended partition
        extended_part = self.disk.add_partition(1, CYLSIZE, 25, Size.gb_units,
            partition_type=15)

        # add a single logical partition
        logical_part = self.disk.add_partition(5, CYLSIZE, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)

        disksize = self.disk.disk_prop.dev_size.sectors

        logical_holey_list = self.disk.get_logical_partition_gaps()

        # verify the disk's holey lists are correct
        self.assertEqual(len(logical_holey_list), 1)

        self.assertEqual(logical_holey_list[0].start_sector, \
            extended_part.start_sector + LOGICAL_ADJUSTMENT + \
            logical_part.size.sectors + LOGICAL_ADJUSTMENT)
        self.assertEqual(logical_holey_list[0].size.sectors,
            extended_part.size.sectors - (2 * LOGICAL_ADJUSTMENT) - \
                logical_part.size.sectors - 1)

    def test_add_two_active_partitions(self):
        self.disk.add_partition(1, 0, 5, Size.gb_units,
                                bootid=Partition.ACTIVE)
        self.disk.add_partition(2, 10 * GBSECTOR, 1, Size.gb_units,
                                bootid=Partition.ACTIVE)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.MultipleActivePartitionsError))

    def test_change_bootid(self):
        p = self.disk.add_partition(1, 0, 5, Size.gb_units)
        p.change_bootid(Partition.ACTIVE)
        self.assertFalse(errsvc._ERRORS)

        # change the bootid to something not allowed
        self.assertRaises(RuntimeError, p.change_bootid, "bad value")

    def test_disk_remaining_space(self):
        p = self.disk.add_partition(1, 0, 5, Size.gb_units)
        self.assertEqual(self.disk.remaining_space.sectors, \
            self.disk.disk_prop.dev_size.sectors - p.size.sectors)

    def test_defect_17624(self):
        """ test case specifically designed to validate defect 17624
        """
        # reset the disk size to 500gb
        self.disk.disk_prop.dev_size = Size(
            str(GBSECTOR * 500) + Size.sector_units)

        # primary partitions.  Extended partition is partition 3.  Set the
        # sizes explictly to sector units to ensure the math is exact
        self.disk.add_partition(1, 2048, 2054272, Size.sector_units,
            partition_type=Partition.name_to_num("IFS: NTFS"))
        self.disk.add_partition(2, 2056320, 65802240, Size.sector_units,
            partition_type=Partition.name_to_num("Solaris2"))
        self.disk.add_partition(3, 67858560, 908893440, Size.sector_units,
            partition_type=Partition.name_to_num("WIN95 Extended(LBA)"))
        self.disk.add_partition(4, 976752000, 16065, Size.sector_units,
            partition_type=Partition.name_to_num("OS/2 Boot/Coherent swap"),
            bootid=Partition.ACTIVE)

        # verify there are no errors with the primary/extended partitions
        self.assertFalse(errsvc._ERRORS)

        # logical partitions
        self.disk.add_partition(5, 67858623, 4016, Size.mb_units,
            partition_type=Partition.name_to_num("Linux native"))
        self.disk.add_partition(6, 76083903, 2008, Size.mb_units,
            partition_type=Partition.name_to_num("Solaris/Linux swap"))
        self.disk.add_partition(7, 80196543, 4016, Size.mb_units,
            partition_type=Partition.name_to_num("Linux native"))
        self.disk.add_partition(8, 88421823, 4016, Size.mb_units,
            partition_type=Partition.name_to_num("Linux native"))
        self.disk.add_partition(9, 96647103, 4016, Size.mb_units,
            partition_type=Partition.name_to_num("Linux native"))
        self.disk.add_partition(10, 104872383, 4016, Size.mb_units,
            partition_type=Partition.name_to_num("Linux native"))
        self.disk.add_partition(11, 113097663, 32129, Size.mb_units,
            partition_type=Partition.name_to_num("Linux native"))
        self.disk.add_partition(12, 178899903, 1004, Size.mb_units,
            partition_type=Partition.name_to_num("WIN95 FAT16(LBA)"))
        self.disk.add_partition(13, 180956223, 1004, Size.mb_units,
            partition_type=Partition.name_to_num("WIN95 FAT16(LBA)"))
        self.disk.add_partition(14, 183012543, 2008, Size.mb_units,
            partition_type=Partition.name_to_num("WIN95 FAT16(LBA)"))
        self.disk.add_partition(15, 187125183, 8032, Size.mb_units,
            partition_type=Partition.name_to_num("Linux native"))
        self.disk.add_partition(16, 203575743, 4016, Size.mb_units,
            partition_type=Partition.name_to_num("Solaris/Linux swap"))
        self.disk.add_partition(17, 211801023, 4016, Size.mb_units,
            partition_type=Partition.name_to_num("Solaris/Linux swap"))
        self.disk.add_partition(18, 220026303, 2008, Size.mb_units,
            partition_type=Partition.name_to_num("Solaris/Linux swap"))
        self.disk.add_partition(19, 224138943, 2008, Size.mb_units,
            partition_type=Partition.name_to_num("Solaris/Linux swap"))
        self.disk.add_partition(20, 228251583, 48194, Size.mb_units,
            partition_type=Partition.name_to_num("IFS: NTFS"))
        self.disk.add_partition(21, 326954943, 16064, Size.mb_units,
            partition_type=Partition.name_to_num("IFS: NTFS"))
        self.disk.add_partition(22, 956188863, 2008, Size.mb_units,
            partition_type=Partition.name_to_num("Solaris/Linux swap"))
        self.disk.add_partition(23, 960301503, 8032, Size.mb_units,
            partition_type=Partition.name_to_num("IFS: NTFS"))

        # verify there are no errors with the logical partitions
        self.assertFalse(errsvc._ERRORS)

    def test_invalid_start_sector(self):
        self.disk.add_partition(1, -16065, 1, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.InvalidPartitionStartSectorError))

    def test_invalid_partition_id(self):
        # create a partition with an invalid partition id.  name_to_num() of an
        # invalid partition ID will return None
        self.disk.add_partition(1, 1, 1, Size.gb_units,
            partition_type=Partition.name_to_num("INVALID PARTITION ID"))

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.PartitionTypeMissingError))

    def test_is_primary(self):
        p = self.disk.add_partition(1, 0, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertTrue(p.is_primary)

    def test_is_extended(self):
        p = self.disk.add_partition(1, 0, 1, Size.gb_units, partition_type=5)
        self.assertFalse(errsvc._ERRORS)
        self.assertTrue(p.is_extended)

    def test_is_logical(self):
        # add a single extended partition
        extended_part = self.disk.add_partition(1, CYLSIZE, 25, Size.gb_units,
                                        partition_type=15)

        # add a single logical partition
        logical_part = self.disk.add_partition(5, CYLSIZE, 1, Size.gb_units)

        self.assertFalse(errsvc._ERRORS)
        self.assertTrue(logical_part.is_logical)

    def test_no_validate_children(self):
        self.disk.validate_children = False
        start_sector = 12345
        p = self.disk.add_partition(1, 12345, 5, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(start_sector, p.start_sector)
        self.assertEqual(Size("5gb"), p.size)

    def test_get_next_partition_name(self):
        self.assertEqual(self.disk.get_next_partition_name(primary=True), "1")
        self.assertEqual(self.disk.get_next_partition_name(primary=False), "5")

        # add an extended partition
        extended_part = self.disk.add_partition(1, CYLSIZE, 10, Size.gb_units,
           partition_type=15)

        self.assertFalse(errsvc._ERRORS)
        self.assertTrue(extended_part.is_primary)
        self.assertTrue(extended_part.is_extended)

        # add a logical partition
        logical_part = self.disk.add_partition(5, CYLSIZE, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertTrue(logical_part.is_logical)

        self.assertEqual(len(self.disk.primary_partitions), 1)
        self.assertEqual(len(self.disk.logical_partitions), 1)

        self.assertEqual(self.disk.get_next_partition_name(primary=True), "2")
        self.assertEqual(self.disk.get_next_partition_name(primary=False), "6")

        # ignore any insertion errors for testing get_next_partition_name
        self.disk.add_partition(2, CYLSIZE, 1, Size.gb_units)
        self.disk.add_partition(3, CYLSIZE, 1, Size.gb_units)
        self.disk.add_partition(4, CYLSIZE, 1, Size.gb_units)

        # verify get_next_partition_name returns None if there are no available
        # primary partitions
        self.assertEqual(self.disk.get_next_partition_name(primary=True), None)

    def test_cr_7055489(self):
        self.disk.add_partition("0", CYLSIZE, 10, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.InvalidPartitionNameError))

    def test_insertion_of_partition_over_deleted_partition_of_same_name(self):
        # add a 1GB partition at start_sector CYLSIZE and set the action to
        # delete
        p = self.disk.add_partition(1, CYLSIZE, 1, Size.gb_units)
        p.action = "delete"

        # insert another partition with the same name
        self.disk.add_partition(1, CYLSIZE, 2, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)


class TestSliceInDisk(unittest.TestCase):
    def setUp(self):
        self.disk = Disk("test disk")
        self.disk.ctd = "c12345t0d0"
        self.disk.geometry = DiskGeometry(BLOCKSIZE, CYLSIZE)
        self.disk.label = "VTOC"

        # 100GB disk
        self.disk.disk_prop = DiskProp()
        self.disk.disk_prop.dev_size = Size(
            str(GBSECTOR * 100) + Size.sector_units)
        self.disk.disk_prop.blocksize = BLOCKSIZE

        # reset the errsvc
        errsvc.clear_error_list()

    def tearDown(self):
        self.disk.delete_children()
        self.disk.delete()

        # reset the errsvc
        errsvc.clear_error_list()

    def test_add_single_slice(self):
        # add a 1GB slice at start_sector CYLSIZE
        self.disk.add_slice(0, CYLSIZE, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)

    def test_cylinder_zero_boundary_adjustment(self):
        # add a 1GB slice at start_sector 0
        self.disk.add_slice(0, 0, 1, Size.gb_units)

        # test that it moved to the first cylinder boundary
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(self.disk._children[0].start_sector, CYLSIZE)

    def test_add_too_many_slices(self):
        for i in range(V_NUMPAR + 2):
            start_sector = i * GBSECTOR + i
            self.disk.add_slice(i, start_sector, 1)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.TooManySlicesError))

    def test_duplicate_slice(self):
        # add 2 1GB slices with the same index but different starting sector
        self.disk.add_slice(0, 0, 1, Size.gb_units)

        # account for the first silce moving from a start_sector of 0 to
        # CYLSIZE
        self.disk.add_slice(0, CYLSIZE + GBSECTOR + 1, 1, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.DuplicateSliceNameError))

    def test_overlapping_starting_sector(self):
        # add a 10 GB slice
        self.disk.add_slice(0, CYLSIZE, 10, Size.gb_units)
        # add an 11 GB slice
        self.disk.add_slice(1, CYLSIZE, 11, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingSliceError))

    def test_overlapping_ending_sector(self):
        # add a 9 GB slice on the second cylinder boundary
        self.disk.add_slice(0, CYLSIZE * 2, 9, Size.gb_units)
        # add a 10 GB slice on the first cylinder boundary
        self.disk.add_slice(1, CYLSIZE, 10, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingSliceError))

    def test_delete_slice(self):
        s1 = self.disk.add_slice(1, 0, 1, Size.gb_units)
        s2 = self.disk.add_slice(2, GBSECTOR + 1, 1, Size.gb_units)

        # verify there are 2 entries in _children
        self.assertEqual(len(self.disk._children), 2)

        # delete slice 2
        s2.delete()

        # verify there is only one entry in the _children and the proper
        # slice was removed.
        self.assertEqual(len(self.disk._children), 1)
        self.assertEqual(self.disk._children[0].name, 1)

    def test_delete_slice_in_disk(self):
        s1 = self.disk.add_slice(1, 0, 1, Size.gb_units)
        s2 = self.disk.add_slice(2, GBSECTOR + 1, 1, Size.gb_units)

        # verify there are 2 entries in the _children
        self.assertEqual(len(self.disk._children), 2)

        # delete slice 2
        self.disk.delete_slice(s2)

        # verify there is only one entry in the _children and the proper
        # slice was removed.
        self.assertEqual(len(self.disk._children), 1)
        self.assertEqual(self.disk._children[0].name, 1)

    def test_resize_slice(self):
        # create a 1 GB slice
        s1 = self.disk.add_slice(1, CYLSIZE, 1, Size.gb_units)

        # verify the size of the slice is 1 GB of sectors, rounded for CYLSIZE
        self.assertEqual(s1.size.sectors, (GBSECTOR / CYLSIZE) * CYLSIZE)

        # resize it to 5 GB
        new_s1 = s1.resize(5, Size.gb_units)

        # verify the size of the slice is 5 GB of sectors
        self.assertTrue(new_s1.size.sectors, 5 * GBSECTOR - 1)

    def test_in_zpool_confilict_with_parent(self):
        # set the in_zpool attribute in the disk
        self.disk.in_zpool = "tank"

        # create a 1 GB partition
        self.disk.add_slice(1, 0, 1, Size.gb_units, in_zpool="tank")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingSliceZpoolError))

    def test_in_vdev_confilict_with_parent(self):
        # set the in_vdev attribute in the disk
        self.disk.in_vdev = "a label"

        # create a 1 GB partition
        self.disk.add_slice(1, 0, 1, Size.gb_units, in_vdev="a label")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingSliceVdevError))

    def test_whole_disk_is_true(self):
        self.disk.whole_disk = True
        # add a 100 MB slice
        self.disk.add_slice(0, 0, 100, Size.mb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.WholeDiskIsTrueError))

    def test_whole_disk_is_false(self):
        self.disk.whole_disk = False
        # add a 100 MB slice
        self.disk.add_slice(0, 0, 100, Size.mb_units)

        # verify that there are no errors
        self.assertFalse(errsvc._ERRORS)

    def test_holey_object_no_children(self):
        holey_list = self.disk.get_gaps()
        self.assertTrue(holey_list[0].size, self.disk.disk_prop.dev_size)

    def test_holey_object_one_child_start_at_cylsize(self):
        # add a single slice, starting at start sector CLYSIZE
        s = self.disk.add_slice(1, CYLSIZE, 25, Size.gb_units)
        disksize = self.disk.disk_prop.dev_size.sectors

        holey_list = self.disk.get_gaps()

        # verify the holey list is correct
        self.assertEqual(len(holey_list), 1)

        # round the expected value up to the next cylinder boundary
        rounded_value = (((CYLSIZE + s.size.sectors + 1) / CYLSIZE) * \
            CYLSIZE) + CYLSIZE
        difference = rounded_value - (s.start_sector + s.size.sectors)

        rounded_size = \
            ((disksize - CYLSIZE - s.size.sectors - difference) / \
                CYLSIZE) * CYLSIZE

        self.assertEqual(holey_list[0].start_sector, rounded_value)
        self.assertEqual(holey_list[0].size.sectors, rounded_size)

    def test_holey_object_one_child_start_at_nonzero(self):
        # add a single slice, starting at start sector CYLSIZE * 10
        start = CYLSIZE * 10

        s = self.disk.add_slice(1, start, 25, Size.gb_units)
        disksize = self.disk.disk_prop.dev_size.sectors

        holey_list = self.disk.get_gaps()

        # verify the holey list is correct
        self.assertEqual(len(holey_list), 2)

        self.assertEqual(holey_list[0].start_sector, CYLSIZE)
        self.assertEqual(holey_list[0].size.sectors,
            (((start - CYLSIZE - 1) / CYLSIZE) * CYLSIZE))

        # round the expected value up to the next cylinder boundary
        rounded_value = (((CYLSIZE * 10 + s.size.sectors + 1) / CYLSIZE) * \
            CYLSIZE) + CYLSIZE
        difference = rounded_value - (s.start_sector + s.size.sectors)

        rounded_size = \
            ((disksize - (CYLSIZE * 10) - s.size.sectors - difference) / \
                CYLSIZE) * CYLSIZE

        self.assertEqual(holey_list[1].start_sector, rounded_value)
        self.assertEqual(holey_list[1].size.sectors, rounded_size)

    def test_invalid_start_sector(self):
        self.disk.add_slice(0, -16065, 1, Size.gb_units)
        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.InvalidSliceStartSectorError))

    def test_slice_entire_size_of_disk(self):
        disksize = self.disk.disk_prop.dev_size.sectors

        # add a non V_BACKUP slice the entire size of the disk
        s = self.disk.add_slice(0, CYLSIZE, disksize, Size.sector_units)

        # verify there are no errors
        self.assertFalse(errsvc._ERRORS)

        # verify the size of the slice, rounding for cylinder size and
        # maximum size limits
        if platform.processor() == "i386":
            self.assertEqual(((disksize / CYLSIZE) - 4) * CYLSIZE,
                             s.size.sectors)
        else:
            self.assertEqual(((disksize / CYLSIZE)) * CYLSIZE, s.size.sectors)

    def test_no_validate_children(self):
        self.disk.validate_children = False
        start_sector = 12345
        s = self.disk.add_slice(1, 12345, 5, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(start_sector, s.start_sector)
        self.assertEqual(Size("5gb"), s.size)

    def test_insertion_of_slice_over_deleted_slice_of_same_name(self):
        # add a 1GB slice at start_sector CYLSIZE and set the action to delete
        s = self.disk.add_slice(0, CYLSIZE, 1, Size.gb_units)
        s.action = "delete"

        # insert another slice with the same name
        self.disk.add_slice(0, CYLSIZE, 2, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)


class TestSliceInPartition(unittest.TestCase):
    def setUp(self):
        self.disk = Disk("test disk")
        self.disk.ctd = "c12345t0d0"
        self.disk.geometry = DiskGeometry(BLOCKSIZE, CYLSIZE)
        self.disk.label = "VTOC"

        # 100GB disk
        self.disk.disk_prop = DiskProp()
        self.disk.disk_prop.dev_size = Size(
            str(GBSECTOR * 100) + Size.sector_units)
        self.disk.disk_prop.blocksize = BLOCKSIZE

        # create a single 50 GB partition inside the disk
        self.partition = self.disk.add_partition(1, CYLSIZE, 50, Size.gb_units)

        # reset the errsvc
        errsvc.clear_error_list()

    def tearDown(self):
        self.disk.delete_children()
        self.disk.delete()

        # reset the errsvc
        errsvc.clear_error_list()

    def test_add_single_slice(self):
        # add a 1GB slice at start_sector CYLSIZE
        self.partition.add_slice(0, CYLSIZE, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)

    def test_cylinder_zero_boundary_adjustment(self):
        # add a 1GB slice at start_sector 0
        self.partition.add_slice(0, 0, 1, Size.gb_units)

        # test that it moved to the first cylinder boundary
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(self.partition._children[0].start_sector, CYLSIZE)

    def test_add_too_many_slices(self):
        for i in range(V_NUMPAR + 2):
            start_sector = i * GBSECTOR + i
            self.partition.add_slice(i, start_sector, 1)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.TooManySlicesError))

    def test_duplicate_slice(self):
        # add 2 1GB slices with the same index but different starting sector
        self.partition.add_slice(0, 0, 1, Size.gb_units)

        # account for the first silce moving from a start_sector of 0 to
        # CYLSIZE
        self.partition.add_slice(0, CYLSIZE + GBSECTOR + 1, 1,
                                 Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.DuplicateSliceNameError))

    def test_overlapping_starting_sector(self):
        # add a 10 GB disk
        self.partition.add_slice(0, CYLSIZE, 10, Size.gb_units)
        # add an 11 GB disk
        self.partition.add_slice(1, CYLSIZE, 11, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingSliceError))

    def test_overlapping_ending_sector(self):
        # add a 9 GB disk on the second cylinder boundary
        self.partition.add_slice(0, CYLSIZE * 2, 9, Size.gb_units)
        # add a 10 GB disk on the first cylinder boundary
        self.partition.add_slice(1, CYLSIZE, 10, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingSliceError))

    def test_delete_slice(self):
        s1 = self.partition.add_slice(1, 0, 1, Size.gb_units)
        s2 = self.partition.add_slice(2, GBSECTOR + 1, 1, Size.gb_units)

        # verify there are 2 entries in the _children
        self.assertEqual(len(self.partition._children), 2)

        # delete slice 2
        s2.delete()

        # verify there is only one entry in the _children and the proper
        # slice was removed.
        self.assertEqual(len(self.partition._children), 1)
        self.assertEqual(self.partition._children[0].name, 1)

    def test_delete_slice_in_partition(self):
        s1 = self.partition.add_slice(1, 0, 1, Size.gb_units)
        s2 = self.partition.add_slice(2, GBSECTOR + 1, 1, Size.gb_units)

        # verify there are 2 entries in the _children
        self.assertEqual(len(self.partition._children), 2)

        # delete slice 2
        self.partition.delete_slice(s2)

        # verify there is only one entry in the _children and the proper
        # slice was removed.
        self.assertEqual(len(self.partition._children), 1)
        self.assertEqual(self.partition._children[0].name, 1)

    def test_resize_slice(self):
        # create a 1 GB slice
        s1 = self.partition.add_slice(0, CYLSIZE, 1, Size.gb_units)

        # verify the size of the slice is 1 GB of sectors, rounded for CYLSIZE
        self.assertEqual(s1.size.sectors, (GBSECTOR / CYLSIZE) * CYLSIZE)

        # resize it to 5 GB
        new_s1 = s1.resize(5, Size.gb_units)

        # verify the size of the slice is 5 GB of sectors
        self.assertTrue(new_s1.size.sectors, 5 * GBSECTOR)

    def test_a_in_zpool_confilict_with_disk(self):
        # set the in_zpool attribute in the disk
        self.disk.in_zpool = "tank"

        # create a 1 GB partition
        self.partition.add_slice(1, 0, 1, Size.gb_units, in_zpool="tank")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingSliceZpoolError))

    def test_in_vdev_confilict_with_disk(self):
        # set the in_vdev attribute in the disk
        self.disk.in_vdev = "a label"

        # create a 1 GB partition
        self.partition.add_slice(1, 0, 1, Size.gb_units, in_vdev="a label")

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.OverlappingSliceVdevError))

    def test_holey_object_no_children(self):
        holey_list = self.partition.get_gaps()
        self.assertEqual(holey_list[0].size, self.partition.size)

    def test_holey_object_one_child_start_at_nonzero(self):
        # add a single partition, starting at start sector CYLSIZE * 10
        start = CYLSIZE * 10
        s = self.partition.add_slice(1, start, 25, Size.gb_units)

        holey_list = self.partition.get_gaps()

        # verify the holey list is correct
        self.assertEqual(len(holey_list), 2)

        self.assertEqual(holey_list[0].start_sector, 0)
        self.assertEqual(holey_list[0].size.sectors, start - 1)
        self.assertEqual(holey_list[1].start_sector, \
            10 * CYLSIZE + s.size.sectors + 1)
        self.assertEqual(holey_list[1].size.sectors, \
            self.partition.size.sectors - s.size.sectors - 10 * CYLSIZE - 1)

    def test_partition_remaining_space(self):
        s = self.partition.add_slice(0, CYLSIZE, 5, Size.gb_units)
        self.assertEqual(self.partition.remaining_space.sectors,
                         self.partition.size.sectors - s.size.sectors)

    def test_invalid_start_sector(self):
        self.partition.add_slice(0, -16065, 1, Size.gb_units)
        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.InvalidSliceStartSectorError))

    def test_slice_entire_size_of_partition(self):
        # add a non V_BACKUP slice the entire size of the partition
        s = self.partition.add_slice(0, CYLSIZE, self.partition.size.sectors,
                                     Size.sector_units)

        # verify there are no errors
        self.assertFalse(errsvc._ERRORS)

        # verify the size of the slice
        self.assertEqual(self.partition.size.sectors, s.size.sectors)

    def test_no_validate_children(self):
        self.partition.validate_children = False
        start_sector = 12345
        s = self.partition.add_slice(1, 12345, 5, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(start_sector, s.start_sector)
        self.assertEqual(Size("5gb"), s.size)

    def test_insertion_of_slice_over_deleted_slice_of_same_name(self):
        # add a 1GB slice at start_sector CYLSIZE and set the action to delete
        s = self.partition.add_slice(0, CYLSIZE, 1, Size.gb_units)
        s.action = "delete"

        # insert another slice with the same name
        self.partition.add_slice(0, CYLSIZE, 2, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)


class TestLogicalPartition(unittest.TestCase):

    def setUp(self):
        self.disk = Disk("test disk")
        self.disk.ctd = "c12345t0d0"
        self.disk.geometry = DiskGeometry(BLOCKSIZE, CYLSIZE)
        self.disk.label = "VTOC"

        # 100GB disk
        self.disk.disk_prop = DiskProp()
        self.disk.disk_prop.dev_size = Size(
            str(GBSECTOR * 100) + Size.sector_units)
        self.disk.disk_prop.blocksize = BLOCKSIZE

        # reset the errsvc
        errsvc.clear_error_list()

    def tearDown(self):
        self.disk.delete_children()
        self.disk.delete()

        # reset the errsvc
        errsvc.clear_error_list()

    def test_add_single_logical_partition(self):
        # add a 10 GB extended partition (type is 0xf or "15")
        self.disk.add_partition(1, 0, 10, Size.gb_units,
                                Partition.name_to_num("WIN95 Extended(LBA)"))
        self.assertFalse(errsvc._ERRORS)

        # add a logical partition to the disk
        self.disk.add_partition(5, 0, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)

    def test_add_three_logical_partitions_in_order(self):
        # add a 10 GB extended partition (type is 0xf or "15")
        ep = self.disk.add_partition(1, CYLSIZE, 10, Size.gb_units,
            Partition.name_to_num("WIN95 Extended(LBA)"))
        self.assertFalse(errsvc._ERRORS)

        # add a logical partition to the disk
        l1 = self.disk.add_partition(5, CYLSIZE, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(l1.start_sector, ep.start_sector + LOGICAL_ADJUSTMENT)
        self.assertEqual(l1.size.sectors, GBSECTOR - LOGICAL_ADJUSTMENT)

        # add a second logical partition to the disk
        l2 = self.disk.add_partition(6, CYLSIZE + GBSECTOR + 64, 1,
                                     Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(l2.start_sector, \
            l1.start_sector + l1.size.sectors + LOGICAL_ADJUSTMENT + 1)
        self.assertEqual(l2.size.sectors, GBSECTOR)

        # add a third logical partition to the disk
        l3 = self.disk.add_partition(8, CYLSIZE + (GBSECTOR * 2) + 128, 1,
                                     Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(l3.start_sector, \
            l2.start_sector + l2.size.sectors + LOGICAL_ADJUSTMENT + 1)
        self.assertEqual(l3.size.sectors, GBSECTOR)

    def test_add_three_logical_partitions_out_of_order(self):
        # add a 10 GB extended partition (type is 0xf or "15")
        ep = self.disk.add_partition(1, CYLSIZE, 10, Size.gb_units,
            Partition.name_to_num("WIN95 Extended(LBA)"))
        self.assertFalse(errsvc._ERRORS)

        # add a logical partition to the disk (at the 'end')
        l1 = self.disk.add_partition(5, CYLSIZE + (GBSECTOR * 2) + 128, 1,
                                     Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(l1.start_sector, CYLSIZE + (GBSECTOR * 2) + 128)
        self.assertEqual(l1.size.sectors, GBSECTOR)

        # add a second logical partition to the disk (at the 'beginning')
        l2 = self.disk.add_partition(6, CYLSIZE, 1, Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(l2.start_sector, ep.start_sector + LOGICAL_ADJUSTMENT)
        self.assertEqual(l2.size.sectors, GBSECTOR - LOGICAL_ADJUSTMENT)

        # add a third logical partition to the disk (in the 'middle')
        l3 = self.disk.add_partition(8, CYLSIZE + GBSECTOR + 64, 1,
                                     Size.gb_units)
        self.assertFalse(errsvc._ERRORS)
        self.assertEqual(l3.start_sector, \
            l2.start_sector + l2.size.sectors + LOGICAL_ADJUSTMENT + 1)
        self.assertEqual(l3.size.sectors, GBSECTOR)

    def test_add_logical_without_extended(self):
        # add a logical partition to the disk
        self.disk.add_partition(5, 0, 1, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
                                   ShadowPhysical.NoExtPartitionsError))

    def test_add_too_many_logical_partitions(self):
        # add a 50 GB extended partition (type is 0xf or "15")
        self.disk.add_partition(1, 0, 50, Size.gb_units,
                                Partition.name_to_num("WIN95 Extended(LBA)"))
        self.assertFalse(errsvc._ERRORS)

        # add MAX_EXT_PARTS
        for i in range(5, 5 + MAX_EXT_PARTS + 2):
            start_sector = i * GBSECTOR
            self.disk.add_partition(i, start_sector, 1, Size.gb_units)

        # verify there are two errors in the errsvc list.  One for an invalid
        # name and one for exceeding the number of logical partitions
        self.assertEqual(len(errsvc._ERRORS), 2)
        invalid_error = errsvc._ERRORS[0]
        toomany_error = errsvc._ERRORS[1]

        self.assertTrue(isinstance(invalid_error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.InvalidPartitionNameError))
        self.assertTrue(isinstance(toomany_error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.TooManyLogicalPartitionsError))

    def test_add_too_large_logical_partition(self):
        # add a 10 GB extended partition
        self.disk.add_partition(1, CYLSIZE, 10, Size.gb_units,
                                Partition.name_to_num("WIN95 Extended(LBA)"))
        self.assertFalse(errsvc._ERRORS)

        # add 20 GB logical partition starting at the same place
        self.disk.add_partition(5, CYLSIZE, 20, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.LogicalPartitionOverlapError))


class TestFinalValidation(unittest.TestCase):

    def setUp(self):
        self.target = Target(Target.DESIRED)
        self.logical = Logical("logical")

        self.disk1 = Disk("disk")
        self.disk1.ctd = "c12345t0d0"
        self.disk1.geometry = DiskGeometry(BLOCKSIZE, CYLSIZE)
        self.disk1.label = "VTOC"

        # 100GB disk
        self.disk1.disk_prop = DiskProp()
        self.disk1.disk_prop.dev_size = Size(
            str(GBSECTOR * 100) + Size.sector_units)
        self.disk1.disk_prop.blocksize = BLOCKSIZE

        # reset the errsvc
        errsvc.clear_error_list()

    def tearDown(self):
        self.target.delete()

        # reset the errsvc
        errsvc.clear_error_list()

    def test_simple(self):
        # insert the smallest required objects:   one disk, one active solaris2
        # partition and one slice with in_zpool set to the root pool name, one
        # BE under the root pool

        # 10 gb partition, 1 gb slice
        p = self.disk1.add_partition(1, 0, 10, Size.gb_units,
                                     bootid=Partition.ACTIVE)
        p.add_slice(0, 0, 1, in_zpool="rpool")

        # "rpool" boot pool with one BE
        zpool = self.logical.add_zpool("rpool", is_root=True)
        zpool.insert_children(BE())

        self.target.insert_children([self.disk1, self.logical])

        self.assertTrue(self.target.final_validation())

    def test_simple_with_solaris2_on_logical(self):
        # insert the smallest required objects:   one disk, one active solaris2
        # logical partition and one slice with in_zpool set to the root pool
        # name, one BE under the root pool

        # 30 gb primary partition, 10 gb logical solaris2 partition, and 1 gb
        # slice
        primary_part = self.disk1.add_partition(1, 0, 30, Size.gb_units,
            Partition.name_to_num("Win95 Extended(LBA)"))
        logical_part = self.disk1.add_partition(5, 0, 10, Size.gb_units)
        logical_part.add_slice(0, 0, 1, in_zpool="rpool")

        # "rpool" boot pool with one BE
        zpool = self.logical.add_zpool("rpool", is_root=True)
        zpool.insert_children(BE())

        self.target.insert_children([self.disk1, self.logical])

        self.assertTrue(self.target.final_validation())

    def test_missing_active_solaris2_partition_x86(self):
        if platform.processor() != "i386":
            raise SkipTest("test not supported on sparc")

        # 10 gb partition defaults to INACTIVE, 1 gb slice
        p = self.disk1.add_partition(1, 0, 10, Size.gb_units)
        p.add_slice(0, 0, 1, in_zpool="rpool")

        # "rpool" boot pool with one BE
        zpool = self.logical.add_zpool("rpool", is_root=True)
        zpool.insert_children(BE())

        self.target.insert_children([self.disk1, self.logical])

        self.assertFalse(self.target.final_validation())

    def test_no_root_zpool(self):
        # 10 gb partition, 1 gb slice
        p = self.disk1.add_partition(1, 0, 10, Size.gb_units,
                                     bootid=Partition.ACTIVE)
        p.add_slice(0, 0, 1, in_zpool="rpool")

        # "rpool" boot pool with one BE
        zpool = self.logical.add_zpool("rpool", is_root=False)
        zpool.insert_children(BE())

        self.target.insert_children([self.disk1, self.logical])

        self.assertFalse(self.target.final_validation())

    def test_whole_disk_set(self):
        self.disk1.whole_disk = True

        # 10 gb partition, 1 gb slice
        p = self.disk1.add_partition(1, 0, 10, Size.gb_units,
                                     bootid=Partition.ACTIVE)
        p.add_slice(0, 0, 1)

        # "rpool" boot pool with one BE
        zpool = self.logical.add_zpool("rpool", is_root=False)
        zpool.insert_children(BE())

        self.target.insert_children([self.disk1, self.logical])

        self.assertFalse(self.target.final_validation())

    def test_no_BE(self):
        # 10 gb partition, 1 gb slice
        p = self.disk1.add_partition(1, 0, 10, Size.gb_units,
                                     bootid=Partition.ACTIVE)
        p.add_slice(0, 0, 1)

        # "rpool" boot pool with one BE
        zpool = self.logical.add_zpool("rpool", is_root=False)

        self.target.insert_children([self.disk1, self.logical])

        self.assertFalse(self.target.final_validation())

    def test_simple_failure_two_slices_under_partition(self):
        # create a single 10gb partition
        partition = self.disk1.add_partition("1", 0, 10, Size.gb_units,
                                             bootid=Partition.ACTIVE)

        # create two slices that overlap
        partition.add_slice("0", CYLSIZE, 1, Size.gb_units)
        partition.add_slice("1", CYLSIZE, 1, Size.gb_units)

        # "rpool" boot pool with one BE
        zpool = self.logical.add_zpool("rpool", is_root=False)
        zpool.insert_children(BE())

        self.target.insert_children([self.disk1, self.logical])
        self.assertFalse(self.target.final_validation())

    def test_invalid_parent_with_valid_child(self):
        # 10 gb logical partition (with no extended partition), 1 gb slice
        p = self.disk1.add_partition(5, 0, 10, Size.gb_units,
                                     bootid=Partition.ACTIVE)
        p.add_slice(0, 0, 1, in_zpool="rpool")

        # "rpool" boot pool with one BE
        zpool = self.logical.add_zpool("rpool", is_root=True)
        zpool.insert_children(BE())

        self.target.insert_children([self.disk1, self.logical])

        self.assertFalse(self.target.final_validation())


class TestInUse(unittest.TestCase):

    def setUp(self):
        # find the list of vdevs for the root pool
        cmd = ["/usr/sbin/zpool", "list", "-H", "-o", "name,bootfs"]
        p = Popen.check_call(cmd, stdout=Popen.STORE, stderr=Popen.STORE)
        for line in p.stdout.splitlines():
            (name, bootfs) = line.split()
            if bootfs != "-":
                root_pool = name
                break
        else:
            raise SkipTest("unable to find root pool name")

        # find the list of vdevs that compose the 'rpool' zpool
        rpool_map = _get_vdev_mapping(root_pool)

        # take the first key in the rpool_map and use the first slice that
        # makes up that key's mapping
        self.in_use_slice = rpool_map[rpool_map.keys()[0]][0]
        (self.ctd, _none, self.index) = \
            self.in_use_slice.split("/")[-1].partition("s")

        # construct a disk DOC object from the in_use_slice
        self.disk = Disk("test disk")
        self.disk.ctd = self.ctd
        self.disk.geometry = DiskGeometry(BLOCKSIZE, CYLSIZE)
        self.disk.label = "VTOC"

        # 100GB disk
        self.disk.disk_prop = DiskProp()
        self.disk.disk_prop.dev_size = Size(
            str(GBSECTOR * 100) + Size.sector_units)
        self.disk.disk_prop.blocksize = BLOCKSIZE

    def tearDown(self):
        self.disk.delete_children()
        self.disk.delete()

        # reset the errsvc
        errsvc.clear_error_list()

    def test_add_slice_in_use(self):
        self.disk.add_slice(self.index, 0, 1, Size.gb_units)

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.SliceInUseError))

    def test_delete_slice_in_use(self):
        # the add slice will set an error, but we want to test delete.
        s = self.disk.add_slice(self.index, 0, 1, Size.gb_units)
        errsvc.clear_error_list()
        s.delete()

        # verify there is only one error in the errsvc list and that it is the
        # proper error
        self.assertEqual(len(errsvc._ERRORS), 1)
        error = errsvc._ERRORS[0]
        self.assertTrue(isinstance(error.error_data[ES_DATA_EXCEPTION],
            ShadowPhysical.SliceInUseError))

    def test_force_delete_slice_in_use(self):
        # the add slice will set an error, but we want to test delete.
        s = self.disk.add_slice(self.index, 0, 1, Size.gb_units)
        s.force = True
        errsvc.clear_error_list()
        s.delete()
        self.assertFalse(errsvc._ERRORS)


class TestSize(unittest.TestCase):

    def test_add_size(self):
        size1 = Size("1024mb")
        size2 = Size("1024mb")
        self.assertEqual(size1 + size2, Size("2gb"))
        size1 += Size("3gb")
        self.assertEqual(size1, Size("4gb"))

    def test_sub_size(self):
        size1 = Size("4096mb")
        size2 = Size("2048mb")
        self.assertEqual(size1 - size2, Size("2gb"))