Skip to content

Instantly share code, notes, and snippets.

@IronSavior
Last active January 20, 2021 01:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IronSavior/d2cbb6e4f492150d989d to your computer and use it in GitHub Desktop.
Save IronSavior/d2cbb6e4f492150d989d to your computer and use it in GitHub Desktop.
Composable Enumerators in Ruby
# @author Erik Elmore <erik@erikelmore.com>
# This is getting a little out of hand... :dizzy_face:
# For printing trace output for demonstration
module Status
def status( method_name, args = [], extra = nil )
extra = ' => %s' % extra if extra
puts '%s#%s(%s)%s' % [self.class, method_name, args.join(', '), extra]
end
end
# Enumerator which is composed of an Enumerable and an external iterator block.
# This allows compatibility with ruby 1.9
class EnumWithIterator
include Enumerable
def initialize( enum, &iter )
raise ArgumentError, 'The given enum is not Enumerable' unless enum.respond_to? :each
raise ArgumentError, 'Block required but not given' unless block_given?
raise ArgumentError, 'Iterator block must accept at least one param.' unless iter.arity.abs > 0
@enum = enum
@iter = iter
end
def each( *args )
return enum_for __callee__, *args unless block_given?
@iter.call(@enum, *args){ |yielded| yield yielded }
end
# @return true when the built-in Enumerator class allows args at call-time. (expect false for ruby 1.9)
def self.native_support?
@native_support ||= Enumerator.new{}.method(:each).arity.abs > 0
end
# Constructs either a native Enumerator (for ruby 2+) or an EnumWithIterator (ruby 1.9)
def self.build( enum, &iter )
return new enum, &iter unless native_support?
raise ArgumentError, 'The given enum is not Enumerable' unless enum.respond_to? :each
raise ArgumentError, 'Iterator block is required' unless block_given?
raise ArgumentError, 'Iterator block must accept at least one param.' unless iter.arity.abs > 0
Enumerator.new do |y, *args|
iter.call(enum, *args){ |yielded| y.yield yielded }
end
end
def self.build_chain( *args )
args.reduce{ |enum, iter| build enum, &iter }
end
end
# Alternative way to wrap access to an Enumerable with an external iterator
module Enumerable
def with_iterator( &iter )
EnumWithIterator.build self, &iter
end
end
# Mockup for an object which fetches the same number of messages infinitely.
class Fetcher
include Status
def initialize( batch_size = 10 )
@batch_size = batch_size
end
def each( src, *args )
status __callee__, args, 'start'
src.each_slice(@batch_size){ |batch|
status __callee__, args, batch.join(', ')
yield batch
}
end
end
# Same as Fetcher, but each iteration can yield a random number of messages up to the max batch size, including zero.
# Simulates a message queue that may give less than a full batch (or zero) per fetch
class RandomBatchFetcher
include Status
def initialize( max_batch_size = 10 )
@max_batch_size = max_batch_size
end
def each( src, *args, &blk )
enum = Enumerator.new do |y|
status __callee__, args, 'start'
src = src.enum_for :each
loop do
batch = Array[]
rand(@max_batch_size).times{ batch << src.next }
status __callee__, args, batch.join(', ')
y.yield batch
end
end
return enum unless block_given?
enum.each &blk
end
end
# Tracks messages which are fetched, but not handled. Demonstrates two iterators from the same object.
class Tracker
include Status
def unhandled
@unhandled ||= Array[]
end
def register_fetch( src, *args )
status __callee__, args, 'start'
unhandled.clear
src.each(*args){ |batch|
unhandled.concat batch
yield batch
}
end
def msg_handled( src, *args )
status __callee__, args, 'start'
src.each(*args){ |msg|
yield msg
unhandled.delete msg
}
ensure
puts '** Unhandled messages: [%s]' % unhandled.join(', ') unless unhandled.empty?
end
end
# Stops iteration after yielding a maximum number of messages
class LimitBreak
include Status
def initialize( max )
@max = max
end
def each( src, *args )
status __callee__, args, 'start'
n = 0
src.each(*args){ |item|
yield item
if (n += 1) >= @max
status __callee__, args, 'Breaking, limit reached (%s)' % @max
break
end
}
end
end
# Receives batches and yields individual messages. Demonstrates iterator which is just a bare Proc.
batch_splitter = proc{ |src, *args, &blk|
src.each(*args){ |batch|
batch.each{ |msg| blk.call msg }
}
}
# Iterate over all messages the given enum will provide.
def demo( enum )
enum.each(some: 'args') do |msg|
puts ' --> Handled %s' % msg
end
end
integers = 1.upto Float::INFINITY
tracker = Tracker.new
# Build a composed enum and run the demo!
# Some messages are expected to go unhandled to show that functionality.
enum = EnumWithIterator.build_chain(
integers,
Fetcher.new.method(:each),
tracker.method(:register_fetch),
batch_splitter,
LimitBreak.new(17).method(:each),
tracker.method(:msg_handled)
)
demo enum
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment