Skip to content

Instantly share code, notes, and snippets.

@BinaryMuse
Created December 25, 2011 08:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BinaryMuse/1518892 to your computer and use it in GitHub Desktop.
Save BinaryMuse/1518892 to your computer and use it in GitHub Desktop.
Event-based processing funnel
# An EventEmitter is an object that allows you to 'emit'
# events (with arguments) and listen for those events
# elsewhere.
#
# Example:
#
# eventEmitter.on 'data', (data) ->
# console.log "Got some data: #{data}"
#
# eventEmitter.emit 'data', "some data here"
EventEmitter = require('events').EventEmitter
# Simulates a long-running asynchronous operation.
# In a regular app, this would either be file IO or
# shelling out to an external process (e.g. any NON-app code).
simulateAsyncOperation = (callback) =>
setTimeout ( =>
callback()
), Math.random() * 10000
# Logging with timestamp (seconds)
log = (msg) =>
uptime = "#{process.uptime()}"
uptime = " #{uptime}" if uptime.length == 1
console.log "%s %s", uptime, msg
# Processors can be chained together in a ProcessorRunner
# to filter an 'item' through each of them in turn.
class Processor extends EventEmitter
process: (item) =>
console.error """
You forgot to implement #process in your class.
It should emit 'item' with the item to process
in the next processor in the chain as a parameter.\n
"""
# A Processor that converts items into other file formats
# by calling 'convert' on each object that is passed to its
# constructor on every item it's told to process.
#
# The objects must emit 'converted' with the item to pass
# further along the chain.
class Converter extends Processor
constructor: (@conversions) ->
for conversion in @conversions
conversion.on 'converted', (done) =>
@emit 'item', done
process: (item) =>
for conversion in @conversions
conversion.convert item
# Converts items to an MPG.
class MpgConversion extends EventEmitter
convert: (item) =>
log "[CONVERT] #{item} -> MPG"
simulateAsyncOperation => @emit 'converted', "#{item}.mpg"
# Converts items to an MP3.
class Mp3Conversion extends EventEmitter
convert: (item) =>
log "[CONVERT] #{item} -> MP3"
simulateAsyncOperation => @emit 'converted', "#{item}.mp3"
# Uploads an item to the Internet. Constructor takes an integer
# that indicates how many simultaneous uploads may be processed
# at one time.
class Uploader extends Processor
constructor: (@max_uploads) ->
@uploadQueue = []
@currentUploads = 0
process: (item) =>
# If the max number of uploads are being processed, the new
# item is placed on a queue. The callback for any in-progress
# uploads check the queue for existing items.
if @currentUploads == @max_uploads
log " [QUEUE] Maximum uploads in progress; queueing #{item}"
@uploadQueue.push item
else
@currentUploads++
log " [UPLOAD] #{item}"
simulateAsyncOperation =>
@emit 'item', item
@itemUploaded()
# Checks the queue for pending uploads and processes the next one
# if the queue is not empty. Only called as a callback to 'process'.
itemUploaded: =>
@currentUploads--
if @uploadQueue.length
@currentUploads++
item = @uploadQueue.pop()
log "[UNQUEUE] #{item}"
simulateAsyncOperation =>
@emit 'item', item
@itemUploaded()
# Simply reports an item to the console;
# useful as the final Processor in a chain.
class ProgressReporter extends Processor
process: (item) =>
log " [DONE] #{item}"
# ProcessRunner chains the given Processors together
# and processes items through them via 'run'.
class ProcessorRunner
constructor: (@processors) ->
# no op, just assign instance variables
run: (items) =>
# Attach the 'item' emission of each processor to the
# 'process' method of the next processor, if one exists.
@processors.forEach (processor, index) =>
if @processors[index + 1]?
processor.on 'item', (item) =>
@processors[index + 1].process item
# Kick off the process by calling the first Processor manually.
for item in items
@processors[0].process item
# Define the Processor chain
processors = [
new Converter([
new MpgConversion,
new Mp3Conversion
]),
new Uploader(2),
new ProgressReporter
]
# Some items to process; Strings for simplicity
items = [
"file.wmv",
"file.avi",
"dvd_title"
]
# And off we go
runner = new ProcessorRunner processors
runner.run items
btilley@mbair ~/src/sandbox/wsmanage $ coffee event.coffee
0 [CONVERT] file.wmv -> MPG
0 [CONVERT] file.wmv -> MP3
0 [CONVERT] file.avi -> MPG
0 [CONVERT] file.avi -> MP3
0 [CONVERT] dvd_title -> MPG
0 [CONVERT] dvd_title -> MP3
2 [UPLOAD] dvd_title.mpg
3 [UPLOAD] dvd_title.mp3
3 [DONE] dvd_title.mpg
3 [UPLOAD] file.wmv.mp3
7 [QUEUE] Maximum uploads in progress; queueing file.wmv.mpg
8 [QUEUE] Maximum uploads in progress; queueing file.avi.mp3
8 [QUEUE] Maximum uploads in progress; queueing file.avi.mpg
11 [DONE] dvd_title.mp3
11 [UNQUEUE] file.avi.mpg
12 [DONE] file.wmv.mp3
12 [UNQUEUE] file.avi.mp3
19 [DONE] file.avi.mp3
19 [UNQUEUE] file.wmv.mpg
20 [DONE] file.avi.mpg
23 [DONE] file.wmv.mpg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment