7004000 Install engine enhancement to save original args/kwargs list before calling checkpoint's execute
authorKaren Tung <Karen.Tung@oracle.com>
Fri, 15 Jul 2011 00:22:29 -0700
changeset 1290 cfaa7349f221
parent 1289 2f13bdfdf70e
child 1291 af77d1e476b6
7004000 Install engine enhancement to save original args/kwargs list before calling checkpoint's execute 7011388 resuming from failed checkpoints does not work
usr/src/lib/install_engine/__init__.py
usr/src/lib/install_engine/checkpoint_data.py
usr/src/lib/install_engine/test/test_engine.py
usr/src/lib/install_engine/test/test_engine_complex.py
--- a/usr/src/lib/install_engine/__init__.py	Thu Jul 14 19:43:46 2011 -0700
+++ b/usr/src/lib/install_engine/__init__.py	Fri Jul 15 00:22:29 2011 -0700
@@ -29,6 +29,7 @@
 '''
 
 import decimal
+import glob
 import imp
 import inspect
 import logging
@@ -149,12 +150,9 @@
     CACHE_FILE_NAME_PREFIX = ".data_cache."
     CACHE_FILE_NAME = CACHE_FILE_NAME_PREFIX + "%(checkpoint)s"
     SNAPSHOT_NAME = ".step_%(checkpoint)s"
+    CP_COMPLETED_SUFFIX = "-completed"
     ENGINE_DOC_ROOT = "Engine-DOC-Root-Node"
 
-    #_LAST is the the engine internal checkpoint for keeping state
-    _LAST = None
-    _LAST_NAME = "latest"
-
     NUM_CALLBACK_ARGS = 2
 
     _instance = None
@@ -213,16 +211,6 @@
         # because data object cache might need to make logging calls.
         self._init_logging(loglevel)
 
-        # The "latest" checkpoint is not really a "regular" checkpoint that
-        # gets executed.  It's an internal mechanism that the engine uses
-        # to keep track of the "latest" successful execution.  It is created
-        # as a checkpoint so all the DOC and ZFS snapshot operations
-        # for regular checkpoints can be done with this too.
-        InstallEngine._LAST = CheckpointData(InstallEngine._LAST_NAME,
-                                             InstallEngine._LAST_NAME, None,
-                                             InstallEngine._LAST_NAME, 0,
-                                             None, None)
-
         # initialize the data object cache
         self.data_object_cache = DataObjectCache()
 
@@ -404,10 +392,6 @@
         LOGGER.debug("log_level: " + str(loglevel))
         LOGGER.debug("=============================")
 
-        if checkpoint_name == InstallEngine._LAST.name:
-            raise ChkptRegistrationError("Checkpoint name '%s' is reserved" %
-                                         InstallEngine._LAST.name)
-
         # Go through list of existing checkpoints, and make sure the name
         # has not already been used, and insert_before value, if defined,
         # has not been executed.
@@ -607,8 +591,7 @@
         if start_from not in resumable:
             raise UsageError("'%s' is not a resumable checkpoint" % start_from)
 
-        use_latest = (start_from == resumable[-1])
-        self._rollback(start_from, use_latest)
+        self._rollback(start_from, resumable_cp_list=resumable)
 
         self.__resume_exec_cp_ok = False
         return self.execute_checkpoints(start_from=start_from, dry_run=dry_run,
@@ -640,10 +623,19 @@
             raise FileNotFoundError("Specified dataset %s does not exist" %
                                     self.dataset)
 
-        doc_path = self.get_cache_filename(InstallEngine._LAST.name)
+        doc_list = self._get_doc_snapshots(self.dataset.get("mountpoint"))
+        if not doc_list:
+            # can't find any DOC snapshots, user will have to start from
+            # first registered checkpoint.
+            if self._checkpoints:
+                resumable_cp.append(self._checkpoints[0].name)
+            return tuple(resumable_cp)
 
-        # rollback data object cache to latest snapshot.  Latest snapshot
-        # should be first file on the list
+        doc_path = doc_list[0]
+
+        LOGGER.debug("Path of last DOC snapshot taken: %s", doc_path)
+
+        # load a temporary instance of DOC cache based on the lastest snapshot
         if os.path.exists(doc_path):
             LOGGER.debug("Creating temp DOC based off snapshot at: %s",
                          doc_path)
@@ -809,7 +801,7 @@
                 # This snapshot, which is associated with the checkpoint's 
                 # name, is for resuming at the named checkpoint.  
                 if status is InstallEngine.EXEC_SUCCESS:
-                    self.snapshot(cp_data)
+                    self.snapshot(cp_data=cp_data)
 
                 try:
                     LOGGER.debug("Executing %s checkpoint", checkpoint.name)
@@ -848,10 +840,10 @@
                 # keep track of completed percentage
                 self.__current_completed += cp_data.prog_est_ratio * 100
 
-                # The "latest" snapshot is
-                # taken to capture the latest successful state.  
+                # Take a snapshot of the state after executing the checkpoint.
+                # if it is successful.  
                 if status is InstallEngine.EXEC_SUCCESS:
-                    self.snapshot(InstallEngine._LAST)
+                    self.snapshot(self._get_completed_name(cp_data.name))
 
         except BaseException as exception:
             # Fatal error in InstallEngine - abort regardless of issue
@@ -871,21 +863,29 @@
 
         callback(status, failed_checkpoint_list)
 
-    def snapshot(self, cp_data):
+    def snapshot(self, snapname=None, cp_data=None):
         '''Snapshots the current DOC state (and ZFS dataset, if it exists)'''
-        filename = self.get_cache_filename(cp_data.name)
+        if cp_data is not None:
+            snapshot_name = cp_data.name
+        else:
+            snapshot_name = snapname
+
+        filename = self.get_cache_filename(snapshot_name)
         LOGGER.debug("Snapshotting DOC to %s", filename)
         self.data_object_cache.take_snapshot(filename)
-        cp_data.data_cache_path = filename
+        if cp_data is not None:
+            cp_data.data_cache_path = filename
 
         if self.dataset is not None and self.dataset.exists:
-            snap_name = self.get_zfs_snapshot_name(cp_data.name)
+            snap_name = self.get_zfs_snapshot_name(snapshot_name)
             LOGGER.debug("Taking zfs snapshot: %s", snap_name)
             self.dataset.snapshot(snap_name, overwrite=True)
-            cp_data.zfs_snap = snap_name
+            if cp_data is not None:
+                cp_data.zfs_snap = snap_name
             self.zfs_snapshots_modifed = True
         else:
-            cp_data.zfs_snap = None
+            if cp_data is not None:
+                cp_data.zfs_snap = None
 
     def _load_checkpoints(self, checkpoint_data_list):
         '''Load checkpoint modules to get the executable checkpoints
@@ -1024,9 +1024,6 @@
     def get_cp_data(self, name):
         '''Helper method for retrieving a CheckpointData object by name'''
 
-        if name == InstallEngine._LAST.name:
-            return InstallEngine._LAST
-
         for cp in self._checkpoints:
             if cp.name == name:
                 return cp
@@ -1095,7 +1092,7 @@
             self.doc.persistent.insert_children(node)
         return node
 
-    def _rollback(self, before_cp, use_latest=False):
+    def _rollback(self, before_cp, resumable_cp_list=None):
         '''Revert the engine to the given Checkpoint (by name):
             * If the checkpoint was snapshotted via ZFS, rollback to that
             * Additionally, rollback the DOC
@@ -1104,17 +1101,18 @@
             RollbackError: If the engine doesn't have a valid ZFS dataset
                            to do an out-of-process rollback from
         '''
-        if use_latest:
-            cp_data = InstallEngine._LAST
-        else:
-            cp_data = self.get_cp_data(before_cp)
+
+        cp_data = self.get_cp_data(before_cp)
+
+        LOGGER.debug("Going to rollback to %s", before_cp)
 
         if cp_data.completed:
             # In-process resume
             cache_file = cp_data.data_cache_path
             snap_name = cp_data.zfs_snap
             if cache_file is None:
-                raise RollbackError(before_cp, "Checkpoint has no data cache")
+                raise RollbackError(rollback_cp,
+                                    "Checkpoint has no data cache")
         else:
             # Out of process resume
             if self.dataset is None:
@@ -1127,8 +1125,30 @@
             long_snap_name = self.dataset.snapname(snap_name)
 
             if long_snap_name not in self.dataset.snapshot_list:
-                raise RollbackError(before_cp, "Missing required ZFS snapshot"
-                                    " [%s]" % long_snap_name)
+                if resumable_cp_list and (before_cp == resumable_cp_list[-1]):
+                    # Take care of the case where the checkpoint to resume
+                    # from is not previously executed.  Use the "completed"
+                    # snapshot from the previous checkpoint.
+                    index = resumable_cp_list.index(before_cp)
+                    if index == 0:
+                        LOGGER.debug("First checkpoint is different than"
+                                     "previous first checkpoint, "
+                                     "no rollback necessary.")
+                        return
+                    completed_name = self._get_completed_name(\
+                        resumable_cp_list[index - 1])
+                    LOGGER.debug("Going to use snapshot %s", completed_name)
+                    cache_file = self.get_cache_filename(completed_name)
+                    snap_name = self.get_zfs_snapshot_name(completed_name)
+                    long_snap_name = self.dataset.snapname(snap_name)
+                    if long_snap_name not in self.dataset.snapshot_list:
+                        raise RollbackError(before_cp,
+                                            "Missing required ZFS snapshot "
+                                            "[%s]" % long_snap_name)
+                else:
+                    raise RollbackError(before_cp,
+                                        "Missing required ZFS snapshot "
+                                        "[%s]" % long_snap_name)
         if snap_name is not None:
             self.dataset.rollback(snap_name, recursive=True)
 
@@ -1185,19 +1205,21 @@
                 # all checkpoints, including those that is not completed
                 # in this process.  If a checkpoint exists, destroy it.
                 # If it doesn't exist, continue on.
+
+                # first delete the "completed" snapshot for a given
+                # checkpoint, then, delete the snapshot taken prior to
+                # executing the checkpoint.
+                completed_cp_name = self._get_completed_name(cp_data.name)
+                snap_path = self.get_zfs_snapshot_fullpath(completed_cp_name)
+                if snap_path in zfs_snapshots:
+                    snap_name = self.get_zfs_snapshot_name(completed_cp_name)
+                    self.dataset.destroy(dry_run=False, snapshot=snap_name)
+
                 snap_path = self.get_zfs_snapshot_fullpath(cp_data.name)
                 if snap_path in zfs_snapshots:
                     snap_name = self.get_zfs_snapshot_name(cp_data.name)
                     self.dataset.destroy(dry_run=False, snapshot=snap_name)
 
-            # Destroy the special engine-internal "last" dataset
-            snap_path = \
-                self.get_zfs_snapshot_fullpath(InstallEngine._LAST.name)
-            if snap_path in zfs_snapshots:
-                snap_name = \
-                    self.get_zfs_snapshot_name(InstallEngine._LAST.name)
-                self.dataset.destroy(snap_name)
-
         if self._tmp_cache_path is not None:
             shutil.rmtree(self._tmp_cache_path)
             self._tmp_cache_path = None
@@ -1213,3 +1235,33 @@
 
         return ((self.checkpoint_thread is not None) and
                 (self.checkpoint_thread.is_alive()))
+
+    def _get_doc_snapshots(self, root_dir):
+        ''' Find all DOC cache snapshot files, and return them in a sorted
+            list with the last created file first.
+
+            args:
+                root_dir: The directory in which to find the snapshot files
+        '''
+
+        if root_dir is None:
+            raise EngineError("Provided root directory to search for DOC "
+                              "snapshots is None.")
+
+        file_list_with_time = list()
+        doc_snapshot_wildcard = InstallEngine.CACHE_FILE_NAME_PREFIX + "*"
+        for doc_snap in glob.glob(os.path.join(root_dir,
+                                               doc_snapshot_wildcard)):
+            stat = os.stat(doc_snap)
+            time_and_file_tuple = stat.st_mtime, doc_snap
+            file_list_with_time.append(time_and_file_tuple)
+
+        file_list_with_time.sort()
+
+        result = [doc_file[1] for doc_file in file_list_with_time]
+
+        return tuple(reversed(result))
+
+    def _get_completed_name(self, cp_name):
+        '''Return the name to use for checkpoint completed snapshot '''
+        return (cp_name + InstallEngine.CP_COMPLETED_SUFFIX)
--- a/usr/src/lib/install_engine/checkpoint_data.py	Thu Jul 14 19:43:46 2011 -0700
+++ b/usr/src/lib/install_engine/checkpoint_data.py	Fri Jul 15 00:22:29 2011 -0700
@@ -35,6 +35,7 @@
 import sys
 import warnings
 
+from copy import deepcopy
 from itertools import izip
 
 from osol_install.install_utils import get_argspec
@@ -57,7 +58,7 @@
     spec = get_argspec(func)
 
     num_spec_args = len(spec.args)
-    arg_value = {}
+    arg_value = dict()
 
     # fill with expected arguments
     arg_value.update(izip(spec.args, args))
@@ -116,14 +117,14 @@
         self.loglevel = loglevel
 
         if args is None:
-            self.args = ()
+            self.reg_args = list()
         else:
-            self.args = args
+            self.reg_args = deepcopy(args)
 
         if kwargs is None:
-            self.kwargs = {}
+            self.reg_kwargs = dict()
         else:
-            self.kwargs = kwargs
+            self.reg_kwargs = deepcopy(kwargs)
 
         DataObject.__init__(self, name)
 
@@ -132,8 +133,8 @@
                self.mod_name == other.mod_name and \
                self.module_path == other.module_path and \
                self.checkpoint_class_name == other.checkpoint_class_name and \
-               self.args == other.args and \
-               self.kwargs == other.kwargs)
+               self.reg_args == other.reg_args and \
+               self.reg_kwargs == other.reg_kwargs)
 
     def __ne__(self, other):
         return not self.__eq__(other)
@@ -174,6 +175,15 @@
         self.prog_est = decimal.Decimal('0')
         self.prog_est_ratio = decimal.Decimal('0')
         self.prog_reported = decimal.Decimal('0')
+        if args:
+            self.args = args
+        else:
+            self.args = list()
+
+        if kwargs:
+            self.kwargs = kwargs
+        else:
+            self.kwargs = dict()
 
     def validate_checkpoint_info(self):
         ''' Validate the information provided for the checkpoint '''
@@ -201,10 +211,10 @@
         # add them explicitly.
 
         args_chk = ["self", "checkpoint_name"]
-        args_chk.extend(self.cp_info.args)
+        args_chk.extend(self.args)
 
         validate_function_args(self.checkpoint_class, args_chk,
-                               self.cp_info.kwargs)
+                               self.kwargs)
 
     @property
     def name(self):
@@ -266,8 +276,8 @@
     def load_checkpoint(self):
         ''' Instantiate the checkpoint '''
         checkpoint = self.checkpoint_class(self.cp_info.name,
-                                           *self.cp_info.args,
-                                           **self.cp_info.kwargs)
+                                           *self.args,
+                                           **self.kwargs)
         return checkpoint
 
     def __str__(self):
--- a/usr/src/lib/install_engine/test/test_engine.py	Thu Jul 14 19:43:46 2011 -0700
+++ b/usr/src/lib/install_engine/test/test_engine.py	Fri Jul 15 00:22:29 2011 -0700
@@ -283,7 +283,7 @@
         
         cp_data = MockCheckpointData()
         
-        self.engine.snapshot(cp_data)
+        self.engine.snapshot(cp_data=cp_data)
         
         self.assertEqual(cp_data.zfs_snap, None)
         self.assertEqual(self.engine.doc.snapshotted,
@@ -298,7 +298,7 @@
         
         cp_data = MockCheckpointData()
         
-        self.engine.snapshot(cp_data)
+        self.engine.snapshot(cp_data=cp_data)
         
         self.assertEqual(cp_data.zfs_snap, None)
         self.assertEqual(self.engine.doc.snapshotted,
@@ -313,7 +313,7 @@
         
         cp_data = MockCheckpointData()
         
-        self.engine.snapshot(cp_data)
+        self.engine.snapshot(cp_data=cp_data)
         
         self.assertEqual(cp_data.zfs_snap, ds.snapped[0])
         self.assertEqual(self.engine.doc.snapshotted,
@@ -377,13 +377,6 @@
                           self.engine.register_checkpoint,
                           self.name_list[1], *self.cp_data_args)
     
-    def test_register_reserved_checkpoint(self):
-        '''Assert cannot register checkpoint with reserved name'''
-        self.assertRaises(engine.ChkptRegistrationError,
-                          self.engine.register_checkpoint,
-                          engine.InstallEngine._LAST.name,
-                          *self.cp_data_args)
-    
     def test_get_full_checkpoint_list(self):
         '''InstallEngine.get_exec_list - Default args (full list)'''
         
@@ -1544,7 +1537,7 @@
         # Change the parameter of the checkpoint that would wait for the
         # cancel, to not do the waiting
         cp_data = self.engine.get_cp_data(EngineCancelTests.CANCEL_CP_NAME)
-        cp_data.cp_info.kwargs["wait_for_cancel"] = False
+        cp_data.cp_info.reg_kwargs["wait_for_cancel"] = False
 
         # continue execution.  Make sure rest of checkpoints are executed.
         status, failed_cp = self.engine.execute_checkpoints(dry_run=True)
--- a/usr/src/lib/install_engine/test/test_engine_complex.py	Thu Jul 14 19:43:46 2011 -0700
+++ b/usr/src/lib/install_engine/test/test_engine_complex.py	Fri Jul 15 00:22:29 2011 -0700
@@ -142,7 +142,7 @@
         cp_data = self.engine.get_cp_data(self.name_list[0])
         snap = Filesystem(dataset.snapname(".step_" + cp_data.name))
         
-        self.engine.snapshot(cp_data)
+        self.engine.snapshot(cp_data=cp_data)
         self.assertTrue(os.path.exists(cp_data.data_cache_path),
                         "Path doesn't exist: " + cp_data.data_cache_path)
         self.assertTrue(snap.exists)
@@ -157,7 +157,7 @@
         
         snap = Filesystem(dataset.snapname(snapname))
         
-        self.engine.snapshot(cp_data)
+        self.engine.snapshot(cp_data=cp_data)
         self.assertTrue(os.path.exists(cp_data.data_cache_path),
                         "Path doesn't exist: " + cp_data.data_cache_path)
         self.assertTrue(snap.exists)
@@ -261,7 +261,6 @@
         self.engine.execute_checkpoints()
 
         # manually snapshots
-        dataset.destroy(dry_run=False, snapshot=self.engine.get_zfs_snapshot_name(self.engine._LAST.name))
         dataset.destroy(dry_run=False, snapshot=self.engine.get_zfs_snapshot_name(self.name_list[-1]))
         dataset.destroy(dry_run=False, snapshot=self.engine.get_zfs_snapshot_name(self.name_list[-2]))
         
@@ -279,8 +278,6 @@
         # manually remove a snapshot in middle of checkpoint list
         snap_path = self.engine.get_zfs_snapshot_name("three")
         dataset.destroy(dry_run=False, snapshot=snap_path)
-        snap_path = self.engine.get_zfs_snapshot_name(self.engine._LAST.name)
-        dataset.destroy(dry_run=False, snapshot=snap_path)
         
         cp_list = self.engine.get_resumable_checkpoints()
         self.verify_resumable_cp_result(["one", "two", "three"], cp_list)
@@ -349,20 +346,6 @@
         cp_list = self.engine.get_resumable_checkpoints()
         self.verify_resumable_cp_result(expected_result, cp_list)
 
-    def test_get_resumable_cp_missing_last_DOC_snapshot(self):
-        '''Verify get_resumable_checkpoints() with last DOC snapshot missing'''
-        dataset = self.get_dataset()
-        self.engine.dataset = dataset
-        
-        self.engine.execute_checkpoints()
-        
-        # Remove DOC snapshot of last executed checkpoint
-        snapshot_name = self.engine.get_cache_filename(self.engine._LAST)
-        os.remove(snapshot_name)
-
-        cp_list = self.engine.get_resumable_checkpoints()
-        self.verify_resumable_cp_result(["one"], cp_list)
-        
     def test_get_resumable_cp_no_cp_registered(self):
         '''Verify get_resumable_checkpoints() with no checkpoint registered in subsequent run'''