Skip to content

Instantly share code, notes, and snippets.

@loveencounterflow
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save loveencounterflow/65fd8ec711cf78950aa0 to your computer and use it in GitHub Desktop.
Save loveencounterflow/65fd8ec711cf78950aa0 to your computer and use it in GitHub Desktop.
zero
one
two
three
four
five
six
seven
eight
nine
ten
./.
i
ii
iii
iv
v
vi
vii
viii
ix
x
'C' [ 0, './.' ]
'$match' [ 0, './.' ]
'C' [ 1, 'i' ]
'$match' [ 1, 'i' ]
'C' [ 2, 'ii' ]
'$match' [ 2, 'ii' ]
'C' [ 3, 'iii' ]
'$match' [ 3, 'iii' ]
'C' [ 4, 'iv' ]
'$match' [ 4, 'iv' ]
'C' [ 5, 'v' ]
'$match' [ 5, 'v' ]
'C' [ 6, 'vi' ]
'$match' [ 6, 'vi' ]
'C' [ 7, 'vii' ]
'$match' [ 7, 'vii' ]
'C' [ 8, 'viii' ]
'$match' [ 8, 'viii' ]
'C' [ 9, 'ix' ]
'$match' [ 9, 'ix' ]
'C' [ 10, 'x' ]
'$match' [ 10, 'x' ]
'M' [ 0, './.', 'zero' ]
'M' [ 4, 'iv', 'four' ]
'M' [ 5, 'v', 'five' ]
'M' [ 6, 'vi', 'six' ]
'M' [ 7, 'vii', 'seven' ]
'M' [ 1, 'i', 'one' ]
'M' [ 8, 'viii', 'eight' ]
'M' [ 3, 'iii', 'three' ]
'M' [ 10, 'x', 'ten' ]
'M' [ 2, 'ii', 'two' ]
'M' [ 9, 'ix', 'nine' ]
njs_path = require 'path'
njs_util = require 'util'
njs_fs = require 'fs'
ES = require 'event-stream'
ES_through = require 'through' # same as ES.through
#-----------------------------------------------------------------------------------------------------------
rpr = njs_util.inspect
log = ( P... ) -> console.log ( rpr p for p in P ).join ' '
#-----------------------------------------------------------------------------------------------------------
through = ( on_data, on_end ) ->
### Just a slight wrapper so we can experiment with `ES.through`. ###
return ES_through on_data, on_end, { autoDestroy: false }
# return ES_through on_data, on_end
#-----------------------------------------------------------------------------------------------------------
$show = ( badge ) ->
on_data = ( data ) ->
log badge, data
@emit 'data', data
return through on_data, null
#-----------------------------------------------------------------------------------------------------------
@$add_index = ->
### Transformer to turn each non-empty line of a stream on the stream into `[ idx, line, ]`. ###
idx = 0
#.........................................................................................................
on_data = ( line ) ->
if line? and line.length > 0
do ( idx ) => @queue [ idx, line, ]
idx += 1
return null
#.........................................................................................................
return through on_data, null
#-----------------------------------------------------------------------------------------------------------
@create_indexstream = ( route ) ->
### Given a FS `route`, return a run-of-the-mill NodeJS `fs` `ReadableStream` decoded as UTF-8, split
into lines, and each non-empty line passed on as `[ idx, line, ]`. ###
input = njs_fs.createReadStream route, { autoClose: false, }
.pipe ES.split()
.pipe @.$add_index()
return input
#-----------------------------------------------------------------------------------------------------------
@$match = ( route ) ->
### Transformer that expects `[ idx, line, ]` data events and, for each piece of data received, will
open an indexed stream for `route`; that sub-stream will be read through until its sub-index matches
`idx`, whence a single item `[ idx, line, sub_line, ]` is passed on into the original stream. ###
match = @_match.bind @
#.........................................................................................................
on_data = ( data ) ->
[ idx, line, ] = data
log '$match', data
match idx, line, route, @
return null
#.........................................................................................................
return through on_data
#-----------------------------------------------------------------------------------------------------------
@_match = ( idx, line, route, caller ) ->
#.........................................................................................................
on_data = ( data ) ->
[ sub_idx, sub_line, ] = data
if idx == sub_idx
new_data = [ idx, line, sub_line, ]
caller.queue new_data
@queue new_data # only for debugging
#.........................................................................................................
input = @create_indexstream route
.pipe through on_data
.pipe $show 'M'
#.........................................................................................................
return null
#-----------------------------------------------------------------------------------------------------------
@read = ( handler ) ->
number_route = njs_path.join __dirname, './numbers.txt'
name_route = njs_path.join __dirname, './names-en.txt'
input = @create_indexstream number_route
.pipe $show 'C'
.pipe @$match name_route
.pipe $show 'D'
############################################################################################################
@read ( error ) ->
throw error if error?
help 'ok'
@loveencounterflow
Copy link
Author

(see dominictarr/through#27 for more)

i'm trying to use through to pair matching lines from two files. i read in a file with one roman number on each line and add an index to each; then, for each line, i feed that index into a matcher function that reads another file in the very same way and, as soon as the two indexes match, the matching lines are passed on.

as the terminating lines of streaming.coffee show, there should be some events marked 'D' in the output, but there aren't any. all events from the '$match' function are there, though, and my suspicion is that they are simply late to the party—the input has been closed and its end event has long since caused the output end to close its gates, too. since the input files are so short, they can be read in in a single chunk, meaning everything is processed within a single tick; whenever you do something asynchronous from a transformer under such circumstances, all your processing will only occur after output is closed. at least, that's my (somewhat) educated guess.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment