Created

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

Example of using piece_pipe to generate a health summary of multiple nuke plants based on reactor leakage.

View nuke_plant.rb
1 2 3 4 5 6 7 8 9
PiecePipe::Pipeline.new.
source([{region: region}]).
step(FetchPowerPlantsByRegion).
step(FindWorstReactor).
step(DetermineStatusClass).
step(BuildPlantHealthSummary).
step(SortByRadiationLevelsDescending).
collect(:plant_health_summary).
to_enum
View nuke_plant.rb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
class NuclearPowerPlantHealthSummaryGenerator
def generate(region)
PiecePipe::Pipeline.new.
source([{region: region}]).
step(FetchPowerPlantsByRegion).
step(FindWorstReactor).
step(DetermineStatusClass).
step(BuildPlantHealthSummary).
step(SortByRadiationLevelsDescending).
collect(:plant_health_summary).
to_enum
end
 
# Custom Step, overriding #generate_sequence to produce a sequence of power plants for a given region.
# Each power plant is "produced" as a Hash with one key (so far) heading down the pipeline.
class FetchPowerPlantsByRegion < PiecePipe::Step
def generate_sequence(inputs)
# The expected interface of any Step's soure is that it supports #to_enum.
source.to_enum.each do |inputs|
inputs[:region].power_plants.those_still_open.each do |power_plant|
produce power_plant: power_plant # Each call to #produce sends another object down the pipe
end
end
enb
end
 
# For any given power plant, determine the worst reactor.
# Implemented as an AssemblyStep that analyzes inputs[:power_plant] from the prior Step,
# and installs a new key/val pair for :worst_reactor.
class FindWorstReactor < PiecePipe::AssemblyStep
def receive(inputs)
# Figure out which reactor has the highest radiation levels.
# "install" works a lot like "produce", but rather than take responsibility for the totality
# of the produced object, we're just saying "add :worst_reactor to whatever's there and pass it on".
install worst_reactor: inputs[:power_plant].reactors.reject(&:offline?).max_by(:&radiation)
end
end
 
# Figure out which CSS class corresonds to the radiation from the worst reactor.
# (At this point, the inputs Hash has keys :region, :power_plant, and :worst_reactor.)
class DetermineStatusClass < PiecePipe::AssemblyStep
def receive(inputs)
install highlight_css_class: StatusFormatters.determine_css_class(inputs[:worst_reactor].radiation)
end
end
 
# Composite our details into a line-item structure for our report.
# Even though we consume most of the interesting values that arrive in the inputs Hash,
# we're letting them ride as we simply install one more key, :plant_health_summary.
# (This comes in handy, as we intend to sort these structures in a later step, using values
# that are present in our transient input Hash, but NOT actually available in the
# report structure.)
class BuildPlantHealthSummary < PiecePipe::AssemblyStep
def receive(inputs)
power_plant = inputs[:power_plant]
worst_reactor = inputs[:worst_reactor]
install plant_health_summary: PlantHealthSummary.new(
power_plant_id: power_plant.id,
supervisor: power_plant.supervisor,
reactor_name: worst_reactor.name,
radiation: StatusFormatters.format_radiation(worst_reactor.radiation),
css_class: inputs[:highlight_css_class]
)
end
end
 
# Sort all the values that come through the pipe based on the radiation of the worst reactor
# in each power_plant.
# Notice this is not an AssemblyStep, and we're overriding #generate_sequence again, this time
# because we're implementing a sink. The resulting downstream objects have the same structure
# they arrived with.
class SortByRadiationLevelsDescending < PiecePipe::Step
def generate_sequence
source.to_enum.sort_by do |inputs|
inputs[:worst_reactor].radiation
end.each do |inputs|
produce inputs
end
end
end
 
#... and that's it. Recall that the pipeline terminates with .collect(:plant_health_summary), which
# is shorthand for a special Step that accepts Hashes and uses #produce to spit out only the specified
# objects. Downstream of our #collect, only the PlantHealthSummary remains.
 
end
View nuke_plant.rb
1 2 3 4 5 6 7 8 9 10 11
# For any given power plant, determine the worst reactor.
# Implemented as an AssemblyStep that analyzes inputs[:power_plant] from the prior Step,
# and installs a new key/val pair for :worst_reactor.
class FindWorstReactor < PiecePipe::AssemblyStep
def receive(inputs)
# Figure out which reactor has the highest radiation levels.
# "install" works a lot like "produce", but rather than take responsibility for the totality
# of the produced object, we're just saying "add :worst_reactor to whatever's there and pass it on".
install worst_reactor: inputs[:power_plant].reactors.reject(&:offline?).max_by(:&radiation)
end
end
View nuke_plant.rb
1 2 3 4 5 6 7 8 9
# For any given Person, continue processing that Person plus their family members
class IncludeFamilyMembers < PiecePipe::Step
def process(person)
produce person
person.family_members.each do |fam_member|
produce fam_member
end
end
end
View nuke_plant.rb
1 2 3 4 5 6
# Do not proceed with the processing of a Person unless they're local
class KeepLocals < PiecePipe::Step
def process(person)
produce person if person.lives_in_our_neighborhood?
end
end
View nuke_plant.rb
1 2 3 4 5 6
# Given an id, load and produce a Person instance
class LoadPersonById < PiecePipe::Step
def process(person_id)
produce Person.find(id)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.