Skip to content

Instantly share code, notes, and snippets.

@ismasan
Last active March 24, 2024 22:11
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ismasan/0bdcc76c2ea48f4259b38fafe131edb8 to your computer and use it in GitHub Desktop.
Save ismasan/0bdcc76c2ea48f4259b38fafe131edb8 to your computer and use it in GitHub Desktop.
Practical Railway-oriented Pipeline for Ruby
# A Pipeline extension to process steps concurrently
# Example
# class ConcurrentPipeline < Pipeline
# include ConcurrentProcessing
# end
#
# MyPipeline = ConcurrentPipeline.new do |pl|
# pl.step ValidateInput
#
# # These steps run concurrently
# pl.concurrent do |pl2|
# pl2.step BookTickets
# pl2.step BookHotels
# pl2.step BookCar
# end
#
# pl.step SendConfirmationEmail
# end
require 'concurrent'
module ConcurrentProcessing
class ConcurrentPipelineStep < SimpleDelegator
def initialize(pipeline)
super(pipeline)
end
# TODO: we need to replicate the handling of `context[:trace]` here.
# this could be handled elsewhere for all pipelines types.
# @param result [Result]
# @return [Result]
def call(result)
trace = result.context[:trace] || []
results = steps.map.with_index(1) do |step, position|
Concurrent::Future.execute { step.call(result.deep_dup.with_context(:trace, trace + [position])) }
end.map(&:value)
continue, halt = results.partition(&:continue?)
halt.any? ? result.halt(halt.map(&:value)) : result.continue(continue.map(&:value))
end
end
# Build a sub-pipeline that will run steps in parallel.
def concurrent(&block)
step ConcurrentPipelineStep.new(self.class.new(&block))
end
end
# frozen_string_literal: true
require 'delegate'
# A generic pipeline to implement railway-oriented processing.
# Blog post: https://ismaelcelis.com/posts/practical-railway-oriented-pipelines-in-ruby/
# @example
#
# pipeline = Pipeline.new do |pl|
# # with a Step interface [#call(Result) Result]
# # It can be any object that implements that interface
# # including Procs, lambdas, custom instances, classes or modules, or other [Switcher::Pipeline] instances.
# pl.step Step1
#
# # with an inline Step block
# pl.step do |result|
# result.success(result.set.first(result.params[:limit]))
# end
# end
#
# initial_result = Pipeline::Result.continue(['a', 'b', 'c', ...etc], params: { limit: 5 })
# result = pipeline.call(initial_result)
# result.continue? # => true
#
# [Result] is expected to respond to [#continue?() Boolean], [#params() Hash] and [#context() Hash].
class Pipeline
class Result
class << self
def continue(value, params: {}, context: {})
new(value, params:, context:)
end
end
attr_reader :value, :params, :context, :errors
ERRORS = Hash.new { |h, k| h[k] = [] }
def initialize(value, params: {}, context: {}, errors: ERRORS, continue: true)
@continue = continue
@value = value
@params = params
@context = context
@errors = errors
freeze
end
def inspect
%(<#{self.class.name}:#{object_id} [#{continue? ? 'CONTINUE' : 'HALT'}] value=#{value.inspect} params=#{params.inspect} context=#{context.inspect} errors=#{errors.inspect}>)
end
def continue? = @continue
def continue(a_value = value)
self.class.new(a_value, params:, context:, errors:, continue: true)
end
def halt(a_value = value)
self.class.new(a_value, params:, context:, errors:, continue: false)
end
def with_error(error_key, error_message)
errors = self.errors.dup
errors[error_key] << error_message
self.class.new(value, params:, context:, errors:, continue: continue?)
end
def with_context(key, default_value, &block)
ctx_value = if block_given?
block.call(self.context.fetch(key, default_value))
else
default_value
end
self.class.new(value, params:, context: context.merge(key => ctx_value), errors:, continue: continue?)
end
end
class << self
# Class-level middleware stack
# a Middleware decorates each step in the pipeline, including other pipelines of the same class or subclass.
# @return [Array<#call(Step, Result) Result>]
def middleware_stack
@middleware_stack ||= []
end
# @param stack [Array<#call(Step, Result) Result>]
def middleware_stack=(stack)
@middleware_stack = stack
end
# Adds a middleware to the class-level middleware stack.
# Example:
#
# class MyPipeline < Pipeline
# # With a block:
# middleware do |step, result|
# p result.context[:trace]
# step.call(result)
# end
#
# # With a callable object:
# middleware Logging.new(Rails.logger)
# end
#
# @param middleware [#call(Step, Result) Result, nil]
# @yieldparam step [Step]
# @yieldparam result [Result]
# @return [self]
def middleware(middleware = nil, &block)
middleware ||= block
raise ArgumentError, "Middleware expects a block or a callable object" unless middleware
self.middleware_stack << middleware
self
end
def inherited(subclass)
subclass.middleware_stack = middleware_stack.dup
super
end
end
# Helper to build a pipeline composed of steps
#
# pipe = Pipeline.compose([Step1.new, Step2.new])
#
# @param [Array<Step>] steps
# @return [Pipeline]
def self.compose(steps)
new do |pl|
steps.each { |st| pl.step(st) }
end
end
attr_reader :steps
def initialize(&setup)
@steps = []
setup.call(self) and freeze if block_given?
end
def freeze
@steps.freeze
super
end
def ==(other)
other.is_a?(self.class) && other.steps == steps
end
# Appends a [Step] to the internal pipeline.
# Example:
#
# Pipeline.new do |pl|
# # with a Step interface
# pl.step Step1
#
# # with an inline Step block
# pl.step do |result|
# result.success(result.set.first(10))
# end
# end
#
# @param callable [Step]. Optional.
# @param &block [Proc]. Optional.
# @return [self]
def step(callable = nil, &block)
callable ||= block
raise ArgumentError, "#step expects an interface #call(Result) Result, but got #{callable.inspect}" unless callable.respond_to?(:call)
callable = self.class.middleware_stack.reduce(callable) { |step, mid| MiddlewareStep.new(step, mid) }
callable = StepTracker.new(callable)
@steps << callable
self
end
# A step to decorate middleware as a [Step] interface.
class MiddlewareStep < SimpleDelegator
# @param step [Step]
# @param mid [#call(Step, Result) Result]
def initialize(step, mid)
super(step)
@mid = mid
end
# @param result [Result]
# @return [Result]
def call(result)
@mid.call(__getobj__, result)
end
end
# A final piece of middleware to track failed (halted) steps in the context.
class StepTracker < SimpleDelegator
# @param result [Result]
# @return [Result]
def call(result)
result = __getobj__.call(result)
return result.with_context(:halted_step, __getobj__) unless result.continue?
result
end
end
# A pipeline is a Step
# and can be composed into other pipelines.
# A pipeline's #call also collects a recursive trace of each step's position in the tree,
# passing it down to each step in #context[:trace].
# Example: a step receiving a trace of `[1,2,1]` means the step is running as the first step of the second step of the first step.
#
# Step [1]
# - Step [1,1]
# - Step [1,2]
# - Step [1,2,1]
#
# @param result [Result]
# @return [Result]
def call(result)
trace = result.context[:trace] || []
steps.each.with_index(1).reduce(result) do |res, (step, position)|
if res.continue?
step.call(res.with_context(:trace, trace + [position]))
else
res
end
end
end
def inspect
%(<#{self.class.name}:#{object_id} #{steps.size} steps>)
end
end
@jspillers
Copy link

I saw your post on HN and just wanted to say I really like this concept... in fact, I wrote something fairly similar many years ago that you might find interesting: https://github.com/jspillers/simple_service

@ismasan
Copy link
Author

ismasan commented Mar 19, 2024

Thank you @jspillers ! I'll definitely have a look at your library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment