Created
December 25, 2011 08:36
-
-
Save BinaryMuse/1518892 to your computer and use it in GitHub Desktop.
Event-based processing funnel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# An EventEmitter is an object that allows you to 'emit' | |
# events (with arguments) and listen for those events | |
# elsewhere. | |
# | |
# Example: | |
# | |
# eventEmitter.on 'data', (data) -> | |
# console.log "Got some data: #{data}" | |
# | |
# eventEmitter.emit 'data', "some data here" | |
EventEmitter = require('events').EventEmitter | |
# Simulates a long-running asynchronous operation. | |
# In a regular app, this would either be file IO or | |
# shelling out to an external process (e.g. any NON-app code). | |
simulateAsyncOperation = (callback) => | |
setTimeout ( => | |
callback() | |
), Math.random() * 10000 | |
# Logging with timestamp (seconds) | |
log = (msg) => | |
uptime = "#{process.uptime()}" | |
uptime = " #{uptime}" if uptime.length == 1 | |
console.log "%s %s", uptime, msg | |
# Processors can be chained together in a ProcessorRunner | |
# to filter an 'item' through each of them in turn. | |
class Processor extends EventEmitter | |
process: (item) => | |
console.error """ | |
You forgot to implement #process in your class. | |
It should emit 'item' with the item to process | |
in the next processor in the chain as a parameter.\n | |
""" | |
# A Processor that converts items into other file formats | |
# by calling 'convert' on each object that is passed to its | |
# constructor on every item it's told to process. | |
# | |
# The objects must emit 'converted' with the item to pass | |
# further along the chain. | |
class Converter extends Processor | |
constructor: (@conversions) -> | |
for conversion in @conversions | |
conversion.on 'converted', (done) => | |
@emit 'item', done | |
process: (item) => | |
for conversion in @conversions | |
conversion.convert item | |
# Converts items to an MPG. | |
class MpgConversion extends EventEmitter | |
convert: (item) => | |
log "[CONVERT] #{item} -> MPG" | |
simulateAsyncOperation => @emit 'converted', "#{item}.mpg" | |
# Converts items to an MP3. | |
class Mp3Conversion extends EventEmitter | |
convert: (item) => | |
log "[CONVERT] #{item} -> MP3" | |
simulateAsyncOperation => @emit 'converted', "#{item}.mp3" | |
# Uploads an item to the Internet. Constructor takes an integer | |
# that indicates how many simultaneous uploads may be processed | |
# at one time. | |
class Uploader extends Processor | |
constructor: (@max_uploads) -> | |
@uploadQueue = [] | |
@currentUploads = 0 | |
process: (item) => | |
# If the max number of uploads are being processed, the new | |
# item is placed on a queue. The callback for any in-progress | |
# uploads check the queue for existing items. | |
if @currentUploads == @max_uploads | |
log " [QUEUE] Maximum uploads in progress; queueing #{item}" | |
@uploadQueue.push item | |
else | |
@currentUploads++ | |
log " [UPLOAD] #{item}" | |
simulateAsyncOperation => | |
@emit 'item', item | |
@itemUploaded() | |
# Checks the queue for pending uploads and processes the next one | |
# if the queue is not empty. Only called as a callback to 'process'. | |
itemUploaded: => | |
@currentUploads-- | |
if @uploadQueue.length | |
@currentUploads++ | |
item = @uploadQueue.pop() | |
log "[UNQUEUE] #{item}" | |
simulateAsyncOperation => | |
@emit 'item', item | |
@itemUploaded() | |
# Simply reports an item to the console; | |
# useful as the final Processor in a chain. | |
class ProgressReporter extends Processor | |
process: (item) => | |
log " [DONE] #{item}" | |
# ProcessRunner chains the given Processors together | |
# and processes items through them via 'run'. | |
class ProcessorRunner | |
constructor: (@processors) -> | |
# no op, just assign instance variables | |
run: (items) => | |
# Attach the 'item' emission of each processor to the | |
# 'process' method of the next processor, if one exists. | |
@processors.forEach (processor, index) => | |
if @processors[index + 1]? | |
processor.on 'item', (item) => | |
@processors[index + 1].process item | |
# Kick off the process by calling the first Processor manually. | |
for item in items | |
@processors[0].process item | |
# Define the Processor chain | |
processors = [ | |
new Converter([ | |
new MpgConversion, | |
new Mp3Conversion | |
]), | |
new Uploader(2), | |
new ProgressReporter | |
] | |
# Some items to process; Strings for simplicity | |
items = [ | |
"file.wmv", | |
"file.avi", | |
"dvd_title" | |
] | |
# And off we go | |
runner = new ProcessorRunner processors | |
runner.run items |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
btilley@mbair ~/src/sandbox/wsmanage $ coffee event.coffee | |
0 [CONVERT] file.wmv -> MPG | |
0 [CONVERT] file.wmv -> MP3 | |
0 [CONVERT] file.avi -> MPG | |
0 [CONVERT] file.avi -> MP3 | |
0 [CONVERT] dvd_title -> MPG | |
0 [CONVERT] dvd_title -> MP3 | |
2 [UPLOAD] dvd_title.mpg | |
3 [UPLOAD] dvd_title.mp3 | |
3 [DONE] dvd_title.mpg | |
3 [UPLOAD] file.wmv.mp3 | |
7 [QUEUE] Maximum uploads in progress; queueing file.wmv.mpg | |
8 [QUEUE] Maximum uploads in progress; queueing file.avi.mp3 | |
8 [QUEUE] Maximum uploads in progress; queueing file.avi.mpg | |
11 [DONE] dvd_title.mp3 | |
11 [UNQUEUE] file.avi.mpg | |
12 [DONE] file.wmv.mp3 | |
12 [UNQUEUE] file.avi.mp3 | |
19 [DONE] file.avi.mp3 | |
19 [UNQUEUE] file.wmv.mpg | |
20 [DONE] file.avi.mpg | |
23 [DONE] file.wmv.mpg |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment