Skip to content

Instantly share code, notes, and snippets.

@BinaryMuse
Created December 25, 2011 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BinaryMuse/1519891 to your computer and use it in GitHub Desktop.
Save BinaryMuse/1519891 to your computer and use it in GitHub Desktop.
btilley@mbair ~/src/sandbox/wsmanage/eventmachine on master! ± ruby app.rb
0.0 [CONVERT] test1.avi -> mpg
0.0 [CONVERT] test1.avi -> mp3
0.0 [CONVERT] test2.wmv -> mpg
0.0 [CONVERT] test2.wmv -> mp3
2.5 [UPLOAD] test2.wmv.mpg
6.5 [DONE] test2.wmv.mpg.uploaded
6.6 [UPLOAD] test1.avi.mpg
8.3 [UPLOAD] test1.avi.mp3
9.2 [QUEUED] test2.wmv.mp3
10.0 [DONE] test1.avi.mp3.uploaded
10.0 [UPLOAD] test2.wmv.mp3
12.8 [DONE] test2.wmv.mp3.uploaded
15.0 [DONE] test1.avi.mpg.uploaded
EventMachine is over...
#!/usr/bin/env ruby
require 'bundler'
Bundler.setup
require 'eventmachine'
require './video'
require './helpers'
# Define our Processor chain.
chain = [
Converter.new([:mpg, :mp3]),
Uploader.new(2),
DoneReporter.new
]
# Attach it to our runner.
runner = ProcessorRunner.new chain
# Shut down EventMachine when the chain is complete.
runner.on :complete do
print "EventMachine is over...\n"
EM.stop
end
# Define our clips.
clips = [
Clip.new("test1.avi"),
Clip.new("test2.wmv")
]
# Kick it off!
EM.run { runner.run clips }
source "http://rubygems.org/"
gem "eventmachine"
gem "events"
START_TIME = Time.now.to_f
# Log a message to the console prepended with
# the time the process has been running.
def log(msg)
now = Time.now.to_f - START_TIME
msg = "%4.1f %s\n" % [now, msg]
print msg
end
# Simulate a long blocking operation.
def simulate_long_operation
sleep(rand * 10)
end
require 'events'
# Allows any method in a class to be called asynchronously via
# EventMachine by appending `_async` to the method name. Also allows
# objects to call Procs asynchronously by passing them to `async`.
#
# When an asynchronous operation starts, the object emits a
# `:start_async_operation` event; when one ends, it emits a
# `:end_async_operation` event. This allows any controlling object
# to determine when there are no more outstanding asynchronous operations.
module Asynchronicity
def async(block)
operation = Proc.new { block.call }
callback = Proc.new { |value| emit :end_async_operation }
emit :start_async_operation
EM.defer(operation, callback)
end
def method_missing(method, *args)
if matches = method.to_s.match(/^(.*)_async$/)
sync_method = matches[1].to_sym
if respond_to?(sync_method)
operation = Proc.new { send(sync_method, *args) }
return async(operation)
end
end
super
end
end
# A Processor performs some operation on an item and then passes
# the item on to other Processors (managed by a ProcessorRunner).
#
# You should implement your Processor's functionality in a method
# called `process`; this method will be called asynchronously.
#
# To send a modified (or new) item further down the Processor chain,
# call `emit :complete, item` (where `item` is the item to send).
# More than one object can be emitted; simply call `emit` multiple times.
#
# Any method in a Processor can be made asynchronous by calling
# the method with `_async` appended to the method name. Furthermore,
# Procs can be executed asynchronously by passing them to the `async` method.
class Processor
include Events::Emitter
include Asynchronicity
def run(item)
process_async(item)
end
end
# A special Processor that calls `process` synchronously instead of
# asynchronously; useful for processors that do not block for long
# periods of time.
class SynchronousProcessor < Processor
def run(item)
process(item)
end
end
# A ProcessorRunner is responsible for chaining together Processors
# and kicking off the first one in the chain. It is also responsible
# for determining when all Processors are finished processing based
# on the event emissions from each Processor. Once finished, the
# ProcessorRunner will emit the `:complete` event.
class ProcessorRunner
include Events::Emitter
def initialize(processors)
@processors = processors
@pending_ops = 0
@completed = false
attach_processor_listeners
end
# Attach listeners to the completion and begin/end async operation events.
def attach_processor_listeners
@processors.each do |processor|
processor.on(:start_async_operation) { EM.next_tick { @pending_ops += 1; check_pending_operations } }
processor.on(:end_async_operation) { EM.next_tick { @pending_ops -= 1; check_pending_operations } }
processor.on(:complete) { |item| EM.next_tick { process_completion_for processor, item } }
end
end
# Increment or decrement the pending operations and check to see if there are no more.
def check_pending_operations
EM.next_tick { complete if @pending_ops == 0 }
end
# Called upon the completion of a Processor. Passes the item
# down the chain if there are more Processors. Also checks to
# see if there are no remaining asynchronous operations and
# calls the `complete` method if there are not.
def process_completion_for(processor, item)
index = @processors.index processor
if next_processor = @processors[index + 1]
next_processor.run item
end
end
# Emit the `:complete` event as long as it has not already been emitted.
# Without this guard, if two items finish at exactly the same time
# they will sometimes both cause an emission.
def complete
emit :complete unless @completed
@completed = true
end
# Start the Processor chain running by providing an array of items
# to be processed by the first Processor.
def run(items)
first_processor = @processors.first
items.each { |item| first_processor.run(item) }
end
end
require './processor'
class Clip
attr_reader :name
def initialize(name)
@name = name
end
def to_s
@name
end
end
class Converter < Processor
def initialize(formats)
@formats = formats
end
def process(item)
@formats.each { |format| convert_async(item, format) }
end
def convert(item, format)
log "[CONVERT] #{item} -> #{format}"
simulate_long_operation
emit :complete, Clip.new("#{item}.#{format}")
end
end
class Uploader < Processor
def initialize(max_uploads = 0)
@max_uploads = max_uploads
@current_uploads = 0
@queue = []
end
def can_upload_now?
@max_uploads == 0 || @current_uploads < @max_uploads
end
def process(item)
if can_upload_now?
@current_uploads += 1
log " [UPLOAD] #{item}"
simulate_long_operation
emit :complete, Clip.new("#{item}.uploaded")
@current_uploads -= 1
check_queue
else
log " [QUEUED] #{item}"
@queue << item
end
end
def check_queue
if @queue.any? && can_upload_now?
new_item = @queue.pop
process_async(new_item)
end
end
end
class DoneReporter < SynchronousProcessor
def process(item)
log " [DONE] #{item}"
emit :complete, item
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment