Created
December 25, 2011 22:44
-
-
Save BinaryMuse/1519891 to your computer and use it in GitHub Desktop.
Ruby version of https://gist.github.com/1518892
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
btilley@mbair ~/src/sandbox/wsmanage/eventmachine on master! ± ruby app.rb | |
0.0 [CONVERT] test1.avi -> mpg | |
0.0 [CONVERT] test1.avi -> mp3 | |
0.0 [CONVERT] test2.wmv -> mpg | |
0.0 [CONVERT] test2.wmv -> mp3 | |
2.5 [UPLOAD] test2.wmv.mpg | |
6.5 [DONE] test2.wmv.mpg.uploaded | |
6.6 [UPLOAD] test1.avi.mpg | |
8.3 [UPLOAD] test1.avi.mp3 | |
9.2 [QUEUED] test2.wmv.mp3 | |
10.0 [DONE] test1.avi.mp3.uploaded | |
10.0 [UPLOAD] test2.wmv.mp3 | |
12.8 [DONE] test2.wmv.mp3.uploaded | |
15.0 [DONE] test1.avi.mpg.uploaded | |
EventMachine is over... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'bundler' | |
Bundler.setup | |
require 'eventmachine' | |
require './video' | |
require './helpers' | |
# Define our Processor chain. | |
chain = [ | |
Converter.new([:mpg, :mp3]), | |
Uploader.new(2), | |
DoneReporter.new | |
] | |
# Attach it to our runner. | |
runner = ProcessorRunner.new chain | |
# Shut down EventMachine when the chain is complete. | |
runner.on :complete do | |
print "EventMachine is over...\n" | |
EM.stop | |
end | |
# Define our clips. | |
clips = [ | |
Clip.new("test1.avi"), | |
Clip.new("test2.wmv") | |
] | |
# Kick it off! | |
EM.run { runner.run clips } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source "http://rubygems.org/" | |
gem "eventmachine" | |
gem "events" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
START_TIME = Time.now.to_f | |
# Log a message to the console prepended with | |
# the time the process has been running. | |
def log(msg) | |
now = Time.now.to_f - START_TIME | |
msg = "%4.1f %s\n" % [now, msg] | |
print msg | |
end | |
# Simulate a long blocking operation. | |
def simulate_long_operation | |
sleep(rand * 10) | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'events' | |
# Allows any method in a class to be called asynchronously via | |
# EventMachine by appending `_async` to the method name. Also allows | |
# objects to call Procs asynchronously by passing them to `async`. | |
# | |
# When an asynchronous operation starts, the object emits a | |
# `:start_async_operation` event; when one ends, it emits a | |
# `:end_async_operation` event. This allows any controlling object | |
# to determine when there are no more outstanding asynchronous operations. | |
module Asynchronicity | |
def async(block) | |
operation = Proc.new { block.call } | |
callback = Proc.new { |value| emit :end_async_operation } | |
emit :start_async_operation | |
EM.defer(operation, callback) | |
end | |
def method_missing(method, *args) | |
if matches = method.to_s.match(/^(.*)_async$/) | |
sync_method = matches[1].to_sym | |
if respond_to?(sync_method) | |
operation = Proc.new { send(sync_method, *args) } | |
return async(operation) | |
end | |
end | |
super | |
end | |
end | |
# A Processor performs some operation on an item and then passes | |
# the item on to other Processors (managed by a ProcessorRunner). | |
# | |
# You should implement your Processor's functionality in a method | |
# called `process`; this method will be called asynchronously. | |
# | |
# To send a modified (or new) item further down the Processor chain, | |
# call `emit :complete, item` (where `item` is the item to send). | |
# More than one object can be emitted; simply call `emit` multiple times. | |
# | |
# Any method in a Processor can be made asynchronous by calling | |
# the method with `_async` appended to the method name. Furthermore, | |
# Procs can be executed asynchronously by passing them to the `async` method. | |
class Processor | |
include Events::Emitter | |
include Asynchronicity | |
def run(item) | |
process_async(item) | |
end | |
end | |
# A special Processor that calls `process` synchronously instead of | |
# asynchronously; useful for processors that do not block for long | |
# periods of time. | |
class SynchronousProcessor < Processor | |
def run(item) | |
process(item) | |
end | |
end | |
# A ProcessorRunner is responsible for chaining together Processors | |
# and kicking off the first one in the chain. It is also responsible | |
# for determining when all Processors are finished processing based | |
# on the event emissions from each Processor. Once finished, the | |
# ProcessorRunner will emit the `:complete` event. | |
class ProcessorRunner | |
include Events::Emitter | |
def initialize(processors) | |
@processors = processors | |
@pending_ops = 0 | |
@completed = false | |
attach_processor_listeners | |
end | |
# Attach listeners to the completion and begin/end async operation events. | |
def attach_processor_listeners | |
@processors.each do |processor| | |
processor.on(:start_async_operation) { EM.next_tick { @pending_ops += 1; check_pending_operations } } | |
processor.on(:end_async_operation) { EM.next_tick { @pending_ops -= 1; check_pending_operations } } | |
processor.on(:complete) { |item| EM.next_tick { process_completion_for processor, item } } | |
end | |
end | |
# Increment or decrement the pending operations and check to see if there are no more. | |
def check_pending_operations | |
EM.next_tick { complete if @pending_ops == 0 } | |
end | |
# Called upon the completion of a Processor. Passes the item | |
# down the chain if there are more Processors. Also checks to | |
# see if there are no remaining asynchronous operations and | |
# calls the `complete` method if there are not. | |
def process_completion_for(processor, item) | |
index = @processors.index processor | |
if next_processor = @processors[index + 1] | |
next_processor.run item | |
end | |
end | |
# Emit the `:complete` event as long as it has not already been emitted. | |
# Without this guard, if two items finish at exactly the same time | |
# they will sometimes both cause an emission. | |
def complete | |
emit :complete unless @completed | |
@completed = true | |
end | |
# Start the Processor chain running by providing an array of items | |
# to be processed by the first Processor. | |
def run(items) | |
first_processor = @processors.first | |
items.each { |item| first_processor.run(item) } | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require './processor' | |
class Clip | |
attr_reader :name | |
def initialize(name) | |
@name = name | |
end | |
def to_s | |
@name | |
end | |
end | |
class Converter < Processor | |
def initialize(formats) | |
@formats = formats | |
end | |
def process(item) | |
@formats.each { |format| convert_async(item, format) } | |
end | |
def convert(item, format) | |
log "[CONVERT] #{item} -> #{format}" | |
simulate_long_operation | |
emit :complete, Clip.new("#{item}.#{format}") | |
end | |
end | |
class Uploader < Processor | |
def initialize(max_uploads = 0) | |
@max_uploads = max_uploads | |
@current_uploads = 0 | |
@queue = [] | |
end | |
def can_upload_now? | |
@max_uploads == 0 || @current_uploads < @max_uploads | |
end | |
def process(item) | |
if can_upload_now? | |
@current_uploads += 1 | |
log " [UPLOAD] #{item}" | |
simulate_long_operation | |
emit :complete, Clip.new("#{item}.uploaded") | |
@current_uploads -= 1 | |
check_queue | |
else | |
log " [QUEUED] #{item}" | |
@queue << item | |
end | |
end | |
def check_queue | |
if @queue.any? && can_upload_now? | |
new_item = @queue.pop | |
process_async(new_item) | |
end | |
end | |
end | |
class DoneReporter < SynchronousProcessor | |
def process(item) | |
log " [DONE] #{item}" | |
emit :complete, item | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment