Create a gist now

Instantly share code, notes, and snippets.

Example of using piece_pipe to generate a health summary of multiple nuke plants based on reactor leakage.
PiecePipe::Pipeline.new.
source([{region: region}]).
step(FetchPowerPlantsByRegion).
step(FindWorstReactor).
step(DetermineStatusClass).
step(BuildPlantHealthSummary).
step(SortByRadiationLevelsDescending).
collect(:plant_health_summary).
to_enum
class NuclearPowerPlantHealthSummaryGenerator
def generate(region)
PiecePipe::Pipeline.new.
source([{region: region}]).
step(FetchPowerPlantsByRegion).
step(FindWorstReactor).
step(DetermineStatusClass).
step(BuildPlantHealthSummary).
step(SortByRadiationLevelsDescending).
collect(:plant_health_summary).
to_enum
end
# Custom Step, overriding #generate_sequence to produce a sequence of power plants for a given region.
# Each power plant is "produced" as a Hash with one key (so far) heading down the pipeline.
class FetchPowerPlantsByRegion < PiecePipe::Step
def generate_sequence(inputs)
# The expected interface of any Step's soure is that it supports #to_enum.
source.to_enum.each do |inputs|
inputs[:region].power_plants.those_still_open.each do |power_plant|
produce power_plant: power_plant # Each call to #produce sends another object down the pipe
end
end
enb
end
# For any given power plant, determine the worst reactor.
# Implemented as an AssemblyStep that analyzes inputs[:power_plant] from the prior Step,
# and installs a new key/val pair for :worst_reactor.
class FindWorstReactor < PiecePipe::AssemblyStep
def receive(inputs)
# Figure out which reactor has the highest radiation levels.
# "install" works a lot like "produce", but rather than take responsibility for the totality
# of the produced object, we're just saying "add :worst_reactor to whatever's there and pass it on".
install worst_reactor: inputs[:power_plant].reactors.reject(&:offline?).max_by(:&radiation)
end
end
# Figure out which CSS class corresonds to the radiation from the worst reactor.
# (At this point, the inputs Hash has keys :region, :power_plant, and :worst_reactor.)
class DetermineStatusClass < PiecePipe::AssemblyStep
def receive(inputs)
install highlight_css_class: StatusFormatters.determine_css_class(inputs[:worst_reactor].radiation)
end
end
# Composite our details into a line-item structure for our report.
# Even though we consume most of the interesting values that arrive in the inputs Hash,
# we're letting them ride as we simply install one more key, :plant_health_summary.
# (This comes in handy, as we intend to sort these structures in a later step, using values
# that are present in our transient input Hash, but NOT actually available in the
# report structure.)
class BuildPlantHealthSummary < PiecePipe::AssemblyStep
def receive(inputs)
power_plant = inputs[:power_plant]
worst_reactor = inputs[:worst_reactor]
install plant_health_summary: PlantHealthSummary.new(
power_plant_id: power_plant.id,
supervisor: power_plant.supervisor,
reactor_name: worst_reactor.name,
radiation: StatusFormatters.format_radiation(worst_reactor.radiation),
css_class: inputs[:highlight_css_class]
)
end
end
# Sort all the values that come through the pipe based on the radiation of the worst reactor
# in each power_plant.
# Notice this is not an AssemblyStep, and we're overriding #generate_sequence again, this time
# because we're implementing a sink. The resulting downstream objects have the same structure
# they arrived with.
class SortByRadiationLevelsDescending < PiecePipe::Step
def generate_sequence
source.to_enum.sort_by do |inputs|
inputs[:worst_reactor].radiation
end.each do |inputs|
produce inputs
end
end
end
#... and that's it. Recall that the pipeline terminates with .collect(:plant_health_summary), which
# is shorthand for a special Step that accepts Hashes and uses #produce to spit out only the specified
# objects. Downstream of our #collect, only the PlantHealthSummary remains.
end
# For any given power plant, determine the worst reactor.
# Implemented as an AssemblyStep that analyzes inputs[:power_plant] from the prior Step,
# and installs a new key/val pair for :worst_reactor.
class FindWorstReactor < PiecePipe::AssemblyStep
def receive(inputs)
# Figure out which reactor has the highest radiation levels.
# "install" works a lot like "produce", but rather than take responsibility for the totality
# of the produced object, we're just saying "add :worst_reactor to whatever's there and pass it on".
install worst_reactor: inputs[:power_plant].reactors.reject(&:offline?).max_by(:&radiation)
end
end
# For any given Person, continue processing that Person plus their family members
class IncludeFamilyMembers < PiecePipe::Step
def process(person)
produce person
person.family_members.each do |fam_member|
produce fam_member
end
end
end
# Do not proceed with the processing of a Person unless they're local
class KeepLocals < PiecePipe::Step
def process(person)
produce person if person.lives_in_our_neighborhood?
end
end
# Given an id, load and produce a Person instance
class LoadPersonById < PiecePipe::Step
def process(person_id)
produce Person.find(id)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment