Skip to content

Instantly share code, notes, and snippets.

@mpelos
Last active August 29, 2015 14:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mpelos/94e2167e12c1de3382ae to your computer and use it in GitHub Desktop.
Save mpelos/94e2167e12c1de3382ae to your computer and use it in GitHub Desktop.
class Cloudy::CloudWorker
class CurrentStepNotFound < StandardError; end
class StepClassNotFound < StandardError; end
class CanNotAsyncExecuteFirstStep < StandardError; end
# @!attribute customer_repository
# @return [CustomerRepository] the customer repository
attr_accessor :customer_repository
# @!attribute pipeline_manager
# @return [Cloudy::PipelineManager] the pipeline manager
attr_accessor :pipeline_manager
# @!attribute pipeline
# @return [Pipeline] the pipeline
attr_accessor :pipeline
# @!attribute current_step_id
# @return [Integer] the current step id
attr_accessor :current_step_id
# @!attribute [r] customer_id
# @return [Integer] the customer id
attr_reader :customer_id
# @!attribute pipeline_consumer
# @return [#perform_async] the pipeline consumer class
attr_accessor :pipeline_consumer
# @param customer_id [Integer]
# @param customer_repository [CustomerRepository]
# @param pipeline_manager [Cloudy::PipelineManager]
# @param pipeline_consumer [#perform_async] the pipeline consumer
# @param pipeline [Pipeline]
# @param current_step_id [Integer] the current step id if any
def initialize(customer_id:,
customer_repository: CustomerRepository.new,
pipeline_manager: Cloudy::PipelineManager.new,
pipeline_consumer: Job::PipelineConsumer,
pipeline: nil,
current_step_id: nil)
@customer_repository = customer_repository
@customer_id = customer_id
@current_step_id = current_step_id
@pipeline_manager = pipeline_manager
@pipeline = pipeline
@pipeline_consumer = pipeline_consumer
@pipeline_manager.pipeline = pipeline
end
# @return [Customer]
def customer
@customer ||= customer_repository.find_by_id(customer_id)
end
# Starts the creation
# @return [Pipeline] the pipeline that will handle the creation
# @note this method must schedule the first step execution
def start
fail NotImplementedError
end
# Returns the pipeline type
# @return [String]
def pipeline_type
self.class.to_s.split("::").last.underscore
end
# Executes the current step
# @raise [CurrentStepNotFound] if the current step is not found
def execute_current_step
fail CurrentStepNotFound unless current_step
current_runner.run
current_step.async_result = current_runner.async_result
pipeline_manager.save_step current_step
end
# Returns the current runner
#
# @note the default implementation tries to find a method with the
# current_step#step_class name append the "_step" string. eg.
#
# ```
# current_step.step_class = "Cloudy::AWS::Runner::KeyPairCreator"
# ```
#
# The method in self that will be called is:
#
# ```
# #cloudy_aws_runner_key_pair_creator_step
# ```
#
# @return [Cloudy::Runner] the current runner
#
# @raise [StepClassNotFound] if the `current_step#step_class` is unknown
def current_runner
return unless current_step
return @current_runner if @current_runner
method_name = current_step.step_class.underscore.gsub("/", "_") + "_step"
fail StepClassNotFound, method_name unless respond_to? method_name, true
@current_runner = send(method_name)
end
# Returns whether or not the current step is done
# @return [Boolean] if the current step is done
def current_step_done?
current_runner.try :done?
end
# Returns whether or not the pipeline is done
# @return [Boolean] whether or not the pipeline is done
def pipeline_done?
pipeline_manager.pipeline_done?
end
# Marks the current step as done
def current_step_done!
current_step.done = true
current_step.output = current_runner.output
pipeline_manager.save_step current_step
pipeline_manager.pipeline_done! if pipeline_manager.pipeline_done?
end
# Returns the current step if any
# @return [PipelineStep, nil] the current step or nil
def current_step
@current_step ||= pipeline_manager.step_by_id(current_step_id) if current_step_id
end
# Marks the current step as a failure
def current_step_failed!
return unless current_step
current_step.failed = true
begin
current_step.output = current_runner.try(:output)
rescue
# Imagine current_runner implementation has a bug, so the only
# way to avoid exploding here again is to rescue any errors this
# may throw.
end
pipeline_manager.save_step current_step
end
protected
# Schedules the first pipeline step for execution
# @raise [CanNotAsyncExecuteFirstStep] if there are not first step
def async_execute_first_step
step = pipeline_manager.first_step
fail CanNotAsyncExecuteFirstStep unless step
async_execute_step step
end
# Schedules a step async execution
# @param step [PipelineStep]
def async_execute_step(step)
pipeline_consumer.perform_async step.id
end
end
class Cloudy::AWS::AutoScalingFormationUpdaterWorker < Cloudy::CloudWorker
# @!attribute [r] topology
# @return [Topology]
attr_reader :topology
# @!attribute [r] min_instances
# @return [Integer, nil]
attr_reader :min_instances
# @!attribute [r] max_instances
# @return [Integer, nil]
attr_reader :max_instances
# @!attribute [r] profile_repository
# @return [String]
attr_reader :profile_repository
# @!attribute [r] cloud_formation_stack_name
# @return [String]
attr_reader :cloud_formation_stack_name
# @param topology_id [Integer]
# @param min_instances [Integer, nil]
# @param max_instances [Integer, nil]
# @param topology_repository [TopologyRepository]
# @param profile_repository [ProfileRepository]
# @param auto_scaling_formation_stack_name_class [Class]
def initialize(
topology_id:,
min_instances: nil,
max_instances: nil,
topology_repository: TopologyRepository.new,
profile_repository: ProfileRepository.new,
auto_scaling_formation_stack_name_class: Cloudy::AWS::AutoScalingFormationStackName,
**kwargs
)
@topology = topology_repository.find_by_id(topology_id)
super(customer_id: topology.customer_id, **kwargs)
@provider = topology.provider
@min_instances = min_instances
@max_instances = max_instances
@profile_repository = profile_repository
@cloud_formation_stack_name = auto_scaling_formation_stack_name_class.new(topology: topology).to_s
end
def cloudy_aws_runner_auto_scaling_formation_updater_step
current_step.step_class.constantize.new(
stack_name: cloud_formation_stack_name,
product_name: topology.product_name,
min_instances: min_instances,
max_instances: max_instances,
access_key: profile.user,
secret_key: profile.password,
region: topology.region,
async_result: current_step.async_result
)
end
def current_runner
return unless current_step
return @current_runner if @current_runner
@current_runner =
case current_step.step_class
when Cloudy::AWS::Runner::AutoScalingFormationUpdater.name
then cloudy_aws_runner_auto_scaling_formation_updater_step
else
fail Cloudy::CloudWorker::StepClassNotFound
end
rescue => e
mark_the_current_step_as_failed
if e.is_a? Cloudy::CloudWorker::StepClassNotFound
raise e
else
raise Cloudy::CloudWorker::RunnerInitializationFailure, e.message, e.backtrace
end
end
def current_step_done?
if super
current_step.done = true
current_step.output = current_runner.output
pipeline_manager.save_step current_step
pipeline_manager.pipeline_done! if pipeline_manager.pipeline_done?
true
end
end
def profile
@profile ||= profile_repository.find_by_customer_id_and_provider_id topology.customer_id, topology.provider_id
end
def responsible_class_args
@responsible_class_args ||= {
topology_id: topology.id,
min_instances: min_instances,
max_instances: max_instances
}
end
def start
@pipeline = pipeline_manager.create_pipeline(
customer: customer,
provider: topology.provider,
pipeline_type: pipeline_type,
responsible_class: self.class.name,
responsible_class_args: responsible_class_args
)
pipeline_manager.create_step step_class: Cloudy::AWS::Runner::AutoScalingFormationUpdater.name, priority: 0
async_execute_first_step
pipeline
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment