Skip to content

Instantly share code, notes, and snippets.

@zhanghui9700
Created May 3, 2017 10:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zhanghui9700/04b6f970d19d617d3ada369b18fac2ae to your computer and use it in GitHub Desktop.
Save zhanghui9700/04b6f970d19d617d3ada369b18fac2ae to your computer and use it in GitHub Desktop.

explore deployment-task workflow

@zhanghui9700
Copy link
Author

what happend when Deploy Changes button click in fuel-web-ui

Deploy Changes Button(fuel-ui)
---> PUT /api/cluster/{cluster-id}/chagnes (nailgun)
    ---> nailgun/api/v1/handers/cluster.py: ClusterChangesHandler.PUT
        ---> nailgun/task/manager.py: ApplyChangesTaskManager.execute 
        ---> _execute_sync 
        ---> _execute_async_content
            ---> provision_message -> ProvisionTask.message
            ---> deployment_message -> tasks.ClusterTransaction -> Deployment.message
                ---> rpc.cast('naily', task_messages)

class ProvisionTask(object):

    @classmethod
    def _get_provision_method(cls, cluster):
        """Get provision method name based on cluster attributes

        :param cluster: Cluster db object
        :returns: string - an Astute callable
        """
        cluster_attrs = objects.Attributes.merged_attrs_values(
            cluster.attributes)
        provision_method = cluster_attrs.get('provision', {}).get(
            'method', consts.PROVISION_METHODS.cobbler)

        # NOTE(kozhukalov):
        #
        # Map provisioning method to Astute callable.
        if provision_method == consts.PROVISION_METHODS.cobbler:
            return 'native_provision'
        return 'image_provision'

    @classmethod
    def message(cls, task, nodes_to_provisioning):
        logger.debug("ProvisionTask.message(task=%s)" % task.uuid)
        task = objects.Task.get_by_uid(
            task.id,
            fail_if_not_found=True,
            lock_for_update=True
        )
        objects.NodeCollection.lock_nodes(nodes_to_provisioning)
        serialized_cluster = provisioning_serializers.serialize(
            task.cluster, nodes_to_provisioning)

        for node in nodes_to_provisioning:
            if settings.FAKE_TASKS or settings.FAKE_TASKS_AMQP:
                continue
            logs_utils.prepare_syslog_dir(node)

        rpc_message = make_astute_message(
            task,
            cls._get_provision_method(task.cluster), #native_provision|image_provision
            'provision_resp',
            {
                'provisioning_info': serialized_cluster
            }
        )
        db().commit()
        return rpc_message

def make_astute_message(task, method, respond_to, args):
    message = {
        'api_version': settings.VERSION['api'],
        'method': method,
        'respond_to': respond_to,
        'args': args
    }
    message['args']['task_uuid'] = task.uuid
    task.cache = message
    return message


class DeploymentTask(BaseDeploymentTask):
    """Task for applying changes to cluster

    LOGIC
    Use cases:
    1. Cluster exists, node(s) added
      If we add one node to existing OpenStack cluster, other nodes may require
      updates (redeployment), but they don't require full system
      reinstallation.
      How to: run deployment for all nodes which system type is target.
      Run provisioning first and then deployment for nodes which are in
      discover system type.
      Q: Should we care about node status (provisioning, error, deploying)?
      A: offline - when node doesn't respond (agent doesn't run, not
                   implemented); let's say user should remove this node from
                   cluster before deployment.
         ready - target OS is loaded and node is Ok, we redeploy
                 ready nodes only if cluster has pending changes i.e.
                 network or cluster attrs were changed
         discover - in discovery mode, provisioning is required
         provisioning - at the time of task execution there should not be such
                        case. If there is - previous provisioning has failed.
                        Possible solution would be to try again to provision
         deploying - the same as provisioning, but stucked in previous deploy,
                     solution - try to deploy. May loose some data if reprovis.
         error - recognized error in deployment or provisioning... We have to
                 know where the error was. If in deployment - reprovisioning
                 may not be a solution (can loose data).
                 If in provisioning - can do provisioning & deployment again
    2. New cluster, just added nodes
      Provision first, and run deploy as second
    3. Remove some and add some another node
      Deletion task will run first and will actually remove nodes, include
      removal from DB.. however removal from DB happens when remove_nodes_resp
      is ran. It means we have to filter nodes and not to run deployment on
      those which are prepared for removal.
    """

    @classmethod
    def message(cls, task, nodes, affected_nodes=None, deployment_tasks=None,
                reexecutable_filter=None, graph_type=None,
                force=False, dry_run=False, **kwargs):
        """Builds RPC message for deployment task.

        :param task: the database task object instance
        :param nodes: the nodes for deployment
        :param affected_nodes: the list of nodes is affected by deployment
        :param deployment_tasks: the list of tasks_ids to execute,
                                 if None, all tasks will be executed
        :param reexecutable_filter: the list of events to find subscribed tasks
        :param force: force
        :param dry_run: dry run
        :param graph_type: deployment graph type
        """
        logger.debug("DeploymentTask.message(task=%s)" % task.uuid)

        task_ids = deployment_tasks or []

        objects.NodeCollection.lock_nodes(nodes)

        for n in nodes:
            if n.pending_roles:
                n.roles = n.roles + n.pending_roles
                n.pending_roles = []

                # If receiver for some reasons didn't update
                # node's status to provisioned when deployment
                # started, we should do it in nailgun
                if n.status in (consts.NODE_STATUSES.deploying,):
                    n.status = consts.NODE_STATUSES.provisioned
                n.progress = 0

        # database commit is required to release nodes lock before
        # serialization started otherwise concurrent nailgun API queries will
        # be locked at database level all the time it is running.
        db().commit()

        deployment_tasks = objects.Cluster.get_deployment_tasks(
            task.cluster, graph_type
        )
        task.graph_type = graph_type or consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE
        objects.Transaction.attach_tasks_snapshot(
            task,
            deployment_tasks
        )

        deployment_mode, message = cls.call_deployment_method(
            task, tasks=deployment_tasks, nodes=nodes,
            affected_nodes=affected_nodes, selected_task_ids=task_ids,
            events=reexecutable_filter, force=force,
            dry_run=dry_run, **kwargs
        )

        # After serialization set pending_addition to False
        for node in nodes:
            node.pending_addition = False

        objects.Transaction.attach_cluster_settings(
            task,
            {
                'editable': objects.Cluster.get_editable_attributes(
                    task.cluster, all_plugins_versions=True)
            }
        )
        objects.Transaction.attach_network_settings(
            task,
            objects.Cluster.get_network_attributes(task.cluster)
        )

        # get puppet_debug attribute
        cluster_settings = objects.Cluster.get_editable_attributes(
            task.cluster)
        if cluster_settings['common'].get('puppet_debug', {}).get('value'):
            message['debug'] = True

        rpc_message = make_astute_message(
            task,
            deployment_mode,
            'deploy_resp',
            message
        )
        db().flush()
        return rpc_message

    @staticmethod
    def _extend_tasks_list(dst, src):
        """Append tasks from src to dst with joining same ones.

        Append tasks from the list 'src' to the list 'dst' and
        join tasks with the same id (concatenate lists of
        node uids).

        :param dst: list of serialized tasks
        :param src: list of serialized tasks
        :return: None
        """
        src_dict = {t['id']: t for t in src if 'id' in t}

        for t in dst:
            if 'id' not in t or t['id'] not in src_dict:
                continue

            t['uids'].extend(src_dict[t['id']]['uids'])
            src_dict.pop(t['id'])

        dst.extend(src_dict.values())
        dst.extend(t for t in src if 'id' not in t)

    @classmethod
    def granular_deploy(cls, transaction, tasks, nodes,
                        affected_nodes, selected_task_ids, events,
                        dry_run=False, **kwargs):
        """Builds parameters for granular deployment.

        :param transaction: the transaction object
        :param tasks: the list of deployment tasks to execute
        :param nodes: the nodes for deployment
        :param affected_nodes: the list of nodes is affected by deployment
        :param selected_task_ids: the list of tasks_ids to execute,
                                   if None, all tasks will be executed
        :param events: the list of events to find subscribed tasks
        :param dry_run: dry run
        :return: the arguments for RPC message
        """
        if dry_run:
            raise errors.DryRunSupportedOnlyByLCM()

        graph = orchestrator_graph.AstuteGraph(transaction.cluster, tasks)
        graph.check()
        graph.only_tasks(selected_task_ids)

        # NOTE(dshulyak) At this point parts of the orchestration can be empty,
        # it should not cause any issues with deployment/progress and was
        # done by design
        resolver = TagResolver(nodes)
        serialized_cluster = deployment_serializers.serialize(
            graph, transaction.cluster, nodes)

        cls._save_deployment_info(transaction, serialized_cluster)
        serialized_cluster = deployment_info_to_legacy(serialized_cluster)

        pre_deployment = stages.pre_deployment_serialize(
            graph, transaction.cluster, nodes,
            resolver=resolver)
        post_deployment = stages.post_deployment_serialize(
            graph, transaction.cluster, nodes,
            resolver=resolver)

        if affected_nodes:
            graph.reexecutable_tasks(events)
            serialized_affected_nodes = deployment_serializers.serialize(
                graph, transaction.cluster, affected_nodes
            )
            serialized_cluster.extend(deployment_info_to_legacy(
                serialized_affected_nodes
            ))

            pre_deployment_affected = stages.pre_deployment_serialize(
                graph, transaction.cluster, affected_nodes,
                resolver=resolver)
            post_deployment_affected = stages.post_deployment_serialize(
                graph, transaction.cluster, affected_nodes,
                resolver=resolver)

            cls._extend_tasks_list(pre_deployment, pre_deployment_affected)
            cls._extend_tasks_list(post_deployment, post_deployment_affected)

        return {
            'deployment_info': serialized_cluster,
            'pre_deployment': pre_deployment,
            'post_deployment': post_deployment
        }

    deploy = granular_deploy

    @classmethod
    def task_deploy(cls, transaction, tasks, nodes, affected_nodes,
                    selected_task_ids, events, dry_run=False,
                    **kwargs):
        """Builds parameters for task based deployment.

        :param transaction: the transaction object
        :param tasks: the list of deployment tasks to execute
        :param nodes: the nodes for deployment
        :param affected_nodes: the list of nodes is affected by deployment
        :param selected_task_ids: the list of tasks_ids to execute,
                                  if None, all tasks will be executed
        :param events: the list of events to find subscribed tasks
        :param dry_run: dry run
        :return:  RPC method name, the arguments for RPC message
        """

        if dry_run:
            raise errors.DryRunSupportedOnlyByLCM()

        logger.info("cluster serialization is started.")
        serialized_cluster = deployment_serializers.serialize(
            None, transaction.cluster, nodes
        )
        cls._save_deployment_info(transaction, serialized_cluster)
        serialized_cluster = deployment_info_to_legacy(serialized_cluster)

        logger.info("cluster serialization is finished.")
        tasks_events = events and \
            task_based_deployment.TaskEvents('reexecute_on', events)

        logger.debug("tasks serialization is started.")
        directory, graph = task_based_deployment.TasksSerializer.serialize(
            transaction.cluster, nodes, tasks, affected_nodes,
            selected_task_ids, tasks_events
        )
        logger.info("tasks serialization is finished.")

        return {
            "deployment_info": serialized_cluster,
            "tasks_directory": directory,
            "tasks_graph": graph
        }

provision astute message

# see log

class diagram

$ grep TaskManager nailgun/task/manager.py

class TaskManager(object):
class BaseDeploymentTaskManager(TaskManager):
class ApplyChangesTaskManager(BaseDeploymentTaskManager):
class SpawnVMsTaskManager(ApplyChangesTaskManager):
class ProvisioningTaskManager(TaskManager):
class DeploymentTaskManager(BaseDeploymentTaskManager):
class StopDeploymentTaskManager(TaskManager):
class ClearTaskHistory(TaskManager):
class ResetEnvironmentTaskManager(ClearTaskHistory):
class CheckNetworksTaskManager(TaskManager):
class VerifyNetworksTaskManager(TaskManager):
class DumpTaskManager(TaskManager):
class GenerateCapacityLogTaskManager(TaskManager):
class NodeDeletionTaskManager(BaseDeploymentTaskManager):
class BaseStatsUserTaskManager(TaskManager):
class CreateStatsUserTaskManager(BaseStatsUserTaskManager):
class RemoveStatsUserTaskManager(BaseStatsUserTaskManager):
class UpdateDnsmasqTaskManager(TaskManager):
class OpenstackConfigTaskManager(TaskManager):

$ grep class nailgun/task/task.py | grep "Task"

class BaseDeploymentTask(object):
class DeploymentTask(BaseDeploymentTask):
class ClusterTransaction(DeploymentTask):
class UpdateNodesInfoTask(object):
class ProvisionTask(object):
class DeletionTask(object):
class DeleteIBPImagesTask(object):
class StopDeploymentTask(object):
class ResetEnvironmentTask(object):
class ClusterDeletionTask(object):
class VerifyNetworksTask(VerifyNetworksForTemplateMixin,
class CheckDhcpTask(VerifyNetworksForTemplateMixin,
class MulticastVerificationTask(BaseNetworkVerification):
class CheckNetworksTask(object):
class CheckBeforeDeploymentTask(object):
class DumpTask(object):
class GenerateCapacityLogTask(object):
class CreateStatsUserTask(object):
class RemoveStatsUserTask(object):
class UpdateDnsmasqTask(object):
class UpdateOpenstackConfigTask(BaseDeploymentTask):

Base TaskManager

class TaskManager(object):

    def __init__(self, cluster_id=None):
        if cluster_id:
            self.cluster = objects.Cluster.get_by_uid(cluster_id)

    def _call_silently(self, task, instance, *args, **kwargs):
        # create action_log for task
        al = TaskHelper.create_action_log(task)

        method = getattr(instance, kwargs.pop('method_name', 'execute'))
        if task.status == consts.TASK_STATUSES.error:
            TaskHelper.update_action_log(task, al)
            return
        try:
            to_return = method(task, *args, **kwargs)

            # update action_log instance for task
            # for asynchronous task it will be not final update
            # as they also are updated in rpc receiver
            TaskHelper.update_action_log(task, al)

            return to_return
        except errors.NoChanges as e:
            self._finish_task(task, al, consts.TASK_STATUSES.ready, str(e))
        except Exception as exc:
            if any([
                not hasattr(exc, "log_traceback"),
                hasattr(exc, "log_traceback") and exc.log_traceback
            ]):
                logger.error(traceback.format_exc())
            self._finish_task(task, al, consts.TASK_STATUSES.error, str(exc))

    def _finish_task(self, task, log_item, status, message):
        data = {'status': status, 'progress': 100, 'message': message}
        # update task entity with given data
        objects.Task.update(task, data)
        # NOTE(romcheg): Flushing the data is required to unlock
        # tasks in order to temporary fix issues with
        # the deadlock detection query in tests and let the tests pass.
        # TODO(akislitsky): Get rid of this flush as soon as
        # task locking issues are resolved.
        db().flush()
        TaskHelper.update_action_log(task, log_item)

        db().commit()

    def check_running_task(self, task_names=None, delete_obsolete=None):
        """Checks running tasks and delete obsolete tasks.

        If there is no cluster, task_names should be specified.

        NOTE: Also this method removes already finished task if
        method delete_obsolete is not False

        :param task_names: the name of tasks to filter
               if there is no cluster and task_names is not specified
               the Exception will be raised
        :param delete_obsolete: callable of False, which will be called
                                to delete obsolete tasks.
                                by default Task.delete will be used.
        :raises: errors.TaskAlreadyRunning
        """
        if isinstance(task_names, six.string_types):
            task_names = (task_names,)

        if delete_obsolete is None:
            delete_obsolete = objects.Task.delete

        all_tasks = objects.TaskCollection.all_not_deleted()

        if hasattr(self, 'cluster'):
            cluster = objects.Cluster.get_by_uid(
                self.cluster.id, lock_for_update=True, fail_if_not_found=True
            )
            all_tasks = objects.TaskCollection.filter_by(
                all_tasks, cluster_id=cluster.id
            )
        elif not task_names:
            # TODO(bgaifullin) there should not be tasks which is not linked
            # to cluster
            raise ValueError(
                "Either cluster or task_names should be specified."
            )

        if task_names:
            all_tasks = objects.TaskCollection.filter_by_list(
                all_tasks, 'name', task_names
            )

        all_tasks = objects.TaskCollection.order_by(all_tasks, 'id')

        in_progress_status = (
            consts.TASK_STATUSES.running, consts.TASK_STATUSES.pending
        )
        for task in all_tasks:
            if task.status in in_progress_status:
                raise errors.TaskAlreadyRunning()
            elif delete_obsolete:
                delete_obsolete(task)

    def serialize_network_cfg(self, cluster):
        serializer = {'nova_network': NovaNetworkConfigurationSerializer,
                      'neutron': NeutronNetworkConfigurationSerializer}
        return serializer[cluster.net_provider].serialize_for_cluster(
            cluster,
            allocate_vips=True
        )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment