7004000 Install engine enhancement to save original args/kwargs list before calling checkpoint's execute
7011388 resuming from failed checkpoints does not work
--- a/usr/src/lib/install_engine/__init__.py Thu Jul 14 19:43:46 2011 -0700
+++ b/usr/src/lib/install_engine/__init__.py Fri Jul 15 00:22:29 2011 -0700
@@ -29,6 +29,7 @@
'''
import decimal
+import glob
import imp
import inspect
import logging
@@ -149,12 +150,9 @@
CACHE_FILE_NAME_PREFIX = ".data_cache."
CACHE_FILE_NAME = CACHE_FILE_NAME_PREFIX + "%(checkpoint)s"
SNAPSHOT_NAME = ".step_%(checkpoint)s"
+ CP_COMPLETED_SUFFIX = "-completed"
ENGINE_DOC_ROOT = "Engine-DOC-Root-Node"
- #_LAST is the the engine internal checkpoint for keeping state
- _LAST = None
- _LAST_NAME = "latest"
-
NUM_CALLBACK_ARGS = 2
_instance = None
@@ -213,16 +211,6 @@
# because data object cache might need to make logging calls.
self._init_logging(loglevel)
- # The "latest" checkpoint is not really a "regular" checkpoint that
- # gets executed. It's an internal mechanism that the engine uses
- # to keep track of the "latest" successful execution. It is created
- # as a checkpoint so all the DOC and ZFS snapshot operations
- # for regular checkpoints can be done with this too.
- InstallEngine._LAST = CheckpointData(InstallEngine._LAST_NAME,
- InstallEngine._LAST_NAME, None,
- InstallEngine._LAST_NAME, 0,
- None, None)
-
# initialize the data object cache
self.data_object_cache = DataObjectCache()
@@ -404,10 +392,6 @@
LOGGER.debug("log_level: " + str(loglevel))
LOGGER.debug("=============================")
- if checkpoint_name == InstallEngine._LAST.name:
- raise ChkptRegistrationError("Checkpoint name '%s' is reserved" %
- InstallEngine._LAST.name)
-
# Go through list of existing checkpoints, and make sure the name
# has not already been used, and insert_before value, if defined,
# has not been executed.
@@ -607,8 +591,7 @@
if start_from not in resumable:
raise UsageError("'%s' is not a resumable checkpoint" % start_from)
- use_latest = (start_from == resumable[-1])
- self._rollback(start_from, use_latest)
+ self._rollback(start_from, resumable_cp_list=resumable)
self.__resume_exec_cp_ok = False
return self.execute_checkpoints(start_from=start_from, dry_run=dry_run,
@@ -640,10 +623,19 @@
raise FileNotFoundError("Specified dataset %s does not exist" %
self.dataset)
- doc_path = self.get_cache_filename(InstallEngine._LAST.name)
+ doc_list = self._get_doc_snapshots(self.dataset.get("mountpoint"))
+ if not doc_list:
+ # can't find any DOC snapshots, user will have to start from
+ # first registered checkpoint.
+ if self._checkpoints:
+ resumable_cp.append(self._checkpoints[0].name)
+ return tuple(resumable_cp)
- # rollback data object cache to latest snapshot. Latest snapshot
- # should be first file on the list
+ doc_path = doc_list[0]
+
+ LOGGER.debug("Path of last DOC snapshot taken: %s", doc_path)
+
+ # load a temporary instance of DOC cache based on the lastest snapshot
if os.path.exists(doc_path):
LOGGER.debug("Creating temp DOC based off snapshot at: %s",
doc_path)
@@ -809,7 +801,7 @@
# This snapshot, which is associated with the checkpoint's
# name, is for resuming at the named checkpoint.
if status is InstallEngine.EXEC_SUCCESS:
- self.snapshot(cp_data)
+ self.snapshot(cp_data=cp_data)
try:
LOGGER.debug("Executing %s checkpoint", checkpoint.name)
@@ -848,10 +840,10 @@
# keep track of completed percentage
self.__current_completed += cp_data.prog_est_ratio * 100
- # The "latest" snapshot is
- # taken to capture the latest successful state.
+ # Take a snapshot of the state after executing the checkpoint.
+ # if it is successful.
if status is InstallEngine.EXEC_SUCCESS:
- self.snapshot(InstallEngine._LAST)
+ self.snapshot(self._get_completed_name(cp_data.name))
except BaseException as exception:
# Fatal error in InstallEngine - abort regardless of issue
@@ -871,21 +863,29 @@
callback(status, failed_checkpoint_list)
- def snapshot(self, cp_data):
+ def snapshot(self, snapname=None, cp_data=None):
'''Snapshots the current DOC state (and ZFS dataset, if it exists)'''
- filename = self.get_cache_filename(cp_data.name)
+ if cp_data is not None:
+ snapshot_name = cp_data.name
+ else:
+ snapshot_name = snapname
+
+ filename = self.get_cache_filename(snapshot_name)
LOGGER.debug("Snapshotting DOC to %s", filename)
self.data_object_cache.take_snapshot(filename)
- cp_data.data_cache_path = filename
+ if cp_data is not None:
+ cp_data.data_cache_path = filename
if self.dataset is not None and self.dataset.exists:
- snap_name = self.get_zfs_snapshot_name(cp_data.name)
+ snap_name = self.get_zfs_snapshot_name(snapshot_name)
LOGGER.debug("Taking zfs snapshot: %s", snap_name)
self.dataset.snapshot(snap_name, overwrite=True)
- cp_data.zfs_snap = snap_name
+ if cp_data is not None:
+ cp_data.zfs_snap = snap_name
self.zfs_snapshots_modifed = True
else:
- cp_data.zfs_snap = None
+ if cp_data is not None:
+ cp_data.zfs_snap = None
def _load_checkpoints(self, checkpoint_data_list):
'''Load checkpoint modules to get the executable checkpoints
@@ -1024,9 +1024,6 @@
def get_cp_data(self, name):
'''Helper method for retrieving a CheckpointData object by name'''
- if name == InstallEngine._LAST.name:
- return InstallEngine._LAST
-
for cp in self._checkpoints:
if cp.name == name:
return cp
@@ -1095,7 +1092,7 @@
self.doc.persistent.insert_children(node)
return node
- def _rollback(self, before_cp, use_latest=False):
+ def _rollback(self, before_cp, resumable_cp_list=None):
'''Revert the engine to the given Checkpoint (by name):
* If the checkpoint was snapshotted via ZFS, rollback to that
* Additionally, rollback the DOC
@@ -1104,17 +1101,18 @@
RollbackError: If the engine doesn't have a valid ZFS dataset
to do an out-of-process rollback from
'''
- if use_latest:
- cp_data = InstallEngine._LAST
- else:
- cp_data = self.get_cp_data(before_cp)
+
+ cp_data = self.get_cp_data(before_cp)
+
+ LOGGER.debug("Going to rollback to %s", before_cp)
if cp_data.completed:
# In-process resume
cache_file = cp_data.data_cache_path
snap_name = cp_data.zfs_snap
if cache_file is None:
- raise RollbackError(before_cp, "Checkpoint has no data cache")
+ raise RollbackError(rollback_cp,
+ "Checkpoint has no data cache")
else:
# Out of process resume
if self.dataset is None:
@@ -1127,8 +1125,30 @@
long_snap_name = self.dataset.snapname(snap_name)
if long_snap_name not in self.dataset.snapshot_list:
- raise RollbackError(before_cp, "Missing required ZFS snapshot"
- " [%s]" % long_snap_name)
+ if resumable_cp_list and (before_cp == resumable_cp_list[-1]):
+ # Take care of the case where the checkpoint to resume
+ # from is not previously executed. Use the "completed"
+ # snapshot from the previous checkpoint.
+ index = resumable_cp_list.index(before_cp)
+ if index == 0:
+ LOGGER.debug("First checkpoint is different than"
+ "previous first checkpoint, "
+ "no rollback necessary.")
+ return
+ completed_name = self._get_completed_name(\
+ resumable_cp_list[index - 1])
+ LOGGER.debug("Going to use snapshot %s", completed_name)
+ cache_file = self.get_cache_filename(completed_name)
+ snap_name = self.get_zfs_snapshot_name(completed_name)
+ long_snap_name = self.dataset.snapname(snap_name)
+ if long_snap_name not in self.dataset.snapshot_list:
+ raise RollbackError(before_cp,
+ "Missing required ZFS snapshot "
+ "[%s]" % long_snap_name)
+ else:
+ raise RollbackError(before_cp,
+ "Missing required ZFS snapshot "
+ "[%s]" % long_snap_name)
if snap_name is not None:
self.dataset.rollback(snap_name, recursive=True)
@@ -1185,19 +1205,21 @@
# all checkpoints, including those that is not completed
# in this process. If a checkpoint exists, destroy it.
# If it doesn't exist, continue on.
+
+ # first delete the "completed" snapshot for a given
+ # checkpoint, then, delete the snapshot taken prior to
+ # executing the checkpoint.
+ completed_cp_name = self._get_completed_name(cp_data.name)
+ snap_path = self.get_zfs_snapshot_fullpath(completed_cp_name)
+ if snap_path in zfs_snapshots:
+ snap_name = self.get_zfs_snapshot_name(completed_cp_name)
+ self.dataset.destroy(dry_run=False, snapshot=snap_name)
+
snap_path = self.get_zfs_snapshot_fullpath(cp_data.name)
if snap_path in zfs_snapshots:
snap_name = self.get_zfs_snapshot_name(cp_data.name)
self.dataset.destroy(dry_run=False, snapshot=snap_name)
- # Destroy the special engine-internal "last" dataset
- snap_path = \
- self.get_zfs_snapshot_fullpath(InstallEngine._LAST.name)
- if snap_path in zfs_snapshots:
- snap_name = \
- self.get_zfs_snapshot_name(InstallEngine._LAST.name)
- self.dataset.destroy(snap_name)
-
if self._tmp_cache_path is not None:
shutil.rmtree(self._tmp_cache_path)
self._tmp_cache_path = None
@@ -1213,3 +1235,33 @@
return ((self.checkpoint_thread is not None) and
(self.checkpoint_thread.is_alive()))
+
+ def _get_doc_snapshots(self, root_dir):
+ ''' Find all DOC cache snapshot files, and return them in a sorted
+ list with the last created file first.
+
+ args:
+ root_dir: The directory in which to find the snapshot files
+ '''
+
+ if root_dir is None:
+ raise EngineError("Provided root directory to search for DOC "
+ "snapshots is None.")
+
+ file_list_with_time = list()
+ doc_snapshot_wildcard = InstallEngine.CACHE_FILE_NAME_PREFIX + "*"
+ for doc_snap in glob.glob(os.path.join(root_dir,
+ doc_snapshot_wildcard)):
+ stat = os.stat(doc_snap)
+ time_and_file_tuple = stat.st_mtime, doc_snap
+ file_list_with_time.append(time_and_file_tuple)
+
+ file_list_with_time.sort()
+
+ result = [doc_file[1] for doc_file in file_list_with_time]
+
+ return tuple(reversed(result))
+
+ def _get_completed_name(self, cp_name):
+ '''Return the name to use for checkpoint completed snapshot '''
+ return (cp_name + InstallEngine.CP_COMPLETED_SUFFIX)
--- a/usr/src/lib/install_engine/checkpoint_data.py Thu Jul 14 19:43:46 2011 -0700
+++ b/usr/src/lib/install_engine/checkpoint_data.py Fri Jul 15 00:22:29 2011 -0700
@@ -35,6 +35,7 @@
import sys
import warnings
+from copy import deepcopy
from itertools import izip
from osol_install.install_utils import get_argspec
@@ -57,7 +58,7 @@
spec = get_argspec(func)
num_spec_args = len(spec.args)
- arg_value = {}
+ arg_value = dict()
# fill with expected arguments
arg_value.update(izip(spec.args, args))
@@ -116,14 +117,14 @@
self.loglevel = loglevel
if args is None:
- self.args = ()
+ self.reg_args = list()
else:
- self.args = args
+ self.reg_args = deepcopy(args)
if kwargs is None:
- self.kwargs = {}
+ self.reg_kwargs = dict()
else:
- self.kwargs = kwargs
+ self.reg_kwargs = deepcopy(kwargs)
DataObject.__init__(self, name)
@@ -132,8 +133,8 @@
self.mod_name == other.mod_name and \
self.module_path == other.module_path and \
self.checkpoint_class_name == other.checkpoint_class_name and \
- self.args == other.args and \
- self.kwargs == other.kwargs)
+ self.reg_args == other.reg_args and \
+ self.reg_kwargs == other.reg_kwargs)
def __ne__(self, other):
return not self.__eq__(other)
@@ -174,6 +175,15 @@
self.prog_est = decimal.Decimal('0')
self.prog_est_ratio = decimal.Decimal('0')
self.prog_reported = decimal.Decimal('0')
+ if args:
+ self.args = args
+ else:
+ self.args = list()
+
+ if kwargs:
+ self.kwargs = kwargs
+ else:
+ self.kwargs = dict()
def validate_checkpoint_info(self):
''' Validate the information provided for the checkpoint '''
@@ -201,10 +211,10 @@
# add them explicitly.
args_chk = ["self", "checkpoint_name"]
- args_chk.extend(self.cp_info.args)
+ args_chk.extend(self.args)
validate_function_args(self.checkpoint_class, args_chk,
- self.cp_info.kwargs)
+ self.kwargs)
@property
def name(self):
@@ -266,8 +276,8 @@
def load_checkpoint(self):
''' Instantiate the checkpoint '''
checkpoint = self.checkpoint_class(self.cp_info.name,
- *self.cp_info.args,
- **self.cp_info.kwargs)
+ *self.args,
+ **self.kwargs)
return checkpoint
def __str__(self):
--- a/usr/src/lib/install_engine/test/test_engine.py Thu Jul 14 19:43:46 2011 -0700
+++ b/usr/src/lib/install_engine/test/test_engine.py Fri Jul 15 00:22:29 2011 -0700
@@ -283,7 +283,7 @@
cp_data = MockCheckpointData()
- self.engine.snapshot(cp_data)
+ self.engine.snapshot(cp_data=cp_data)
self.assertEqual(cp_data.zfs_snap, None)
self.assertEqual(self.engine.doc.snapshotted,
@@ -298,7 +298,7 @@
cp_data = MockCheckpointData()
- self.engine.snapshot(cp_data)
+ self.engine.snapshot(cp_data=cp_data)
self.assertEqual(cp_data.zfs_snap, None)
self.assertEqual(self.engine.doc.snapshotted,
@@ -313,7 +313,7 @@
cp_data = MockCheckpointData()
- self.engine.snapshot(cp_data)
+ self.engine.snapshot(cp_data=cp_data)
self.assertEqual(cp_data.zfs_snap, ds.snapped[0])
self.assertEqual(self.engine.doc.snapshotted,
@@ -377,13 +377,6 @@
self.engine.register_checkpoint,
self.name_list[1], *self.cp_data_args)
- def test_register_reserved_checkpoint(self):
- '''Assert cannot register checkpoint with reserved name'''
- self.assertRaises(engine.ChkptRegistrationError,
- self.engine.register_checkpoint,
- engine.InstallEngine._LAST.name,
- *self.cp_data_args)
-
def test_get_full_checkpoint_list(self):
'''InstallEngine.get_exec_list - Default args (full list)'''
@@ -1544,7 +1537,7 @@
# Change the parameter of the checkpoint that would wait for the
# cancel, to not do the waiting
cp_data = self.engine.get_cp_data(EngineCancelTests.CANCEL_CP_NAME)
- cp_data.cp_info.kwargs["wait_for_cancel"] = False
+ cp_data.cp_info.reg_kwargs["wait_for_cancel"] = False
# continue execution. Make sure rest of checkpoints are executed.
status, failed_cp = self.engine.execute_checkpoints(dry_run=True)
--- a/usr/src/lib/install_engine/test/test_engine_complex.py Thu Jul 14 19:43:46 2011 -0700
+++ b/usr/src/lib/install_engine/test/test_engine_complex.py Fri Jul 15 00:22:29 2011 -0700
@@ -142,7 +142,7 @@
cp_data = self.engine.get_cp_data(self.name_list[0])
snap = Filesystem(dataset.snapname(".step_" + cp_data.name))
- self.engine.snapshot(cp_data)
+ self.engine.snapshot(cp_data=cp_data)
self.assertTrue(os.path.exists(cp_data.data_cache_path),
"Path doesn't exist: " + cp_data.data_cache_path)
self.assertTrue(snap.exists)
@@ -157,7 +157,7 @@
snap = Filesystem(dataset.snapname(snapname))
- self.engine.snapshot(cp_data)
+ self.engine.snapshot(cp_data=cp_data)
self.assertTrue(os.path.exists(cp_data.data_cache_path),
"Path doesn't exist: " + cp_data.data_cache_path)
self.assertTrue(snap.exists)
@@ -261,7 +261,6 @@
self.engine.execute_checkpoints()
# manually snapshots
- dataset.destroy(dry_run=False, snapshot=self.engine.get_zfs_snapshot_name(self.engine._LAST.name))
dataset.destroy(dry_run=False, snapshot=self.engine.get_zfs_snapshot_name(self.name_list[-1]))
dataset.destroy(dry_run=False, snapshot=self.engine.get_zfs_snapshot_name(self.name_list[-2]))
@@ -279,8 +278,6 @@
# manually remove a snapshot in middle of checkpoint list
snap_path = self.engine.get_zfs_snapshot_name("three")
dataset.destroy(dry_run=False, snapshot=snap_path)
- snap_path = self.engine.get_zfs_snapshot_name(self.engine._LAST.name)
- dataset.destroy(dry_run=False, snapshot=snap_path)
cp_list = self.engine.get_resumable_checkpoints()
self.verify_resumable_cp_result(["one", "two", "three"], cp_list)
@@ -349,20 +346,6 @@
cp_list = self.engine.get_resumable_checkpoints()
self.verify_resumable_cp_result(expected_result, cp_list)
- def test_get_resumable_cp_missing_last_DOC_snapshot(self):
- '''Verify get_resumable_checkpoints() with last DOC snapshot missing'''
- dataset = self.get_dataset()
- self.engine.dataset = dataset
-
- self.engine.execute_checkpoints()
-
- # Remove DOC snapshot of last executed checkpoint
- snapshot_name = self.engine.get_cache_filename(self.engine._LAST)
- os.remove(snapshot_name)
-
- cp_list = self.engine.get_resumable_checkpoints()
- self.verify_resumable_cp_result(["one"], cp_list)
-
def test_get_resumable_cp_no_cp_registered(self):
'''Verify get_resumable_checkpoints() with no checkpoint registered in subsequent run'''