Skip to content

Instantly share code, notes, and snippets.

@dherman
Last active August 29, 2015 14:16
Show Gist options
  • Save dherman/7de3a1dabf28eaf22106 to your computer and use it in GitHub Desktop.
Save dherman/7de3a1dabf28eaf22106 to your computer and use it in GitHub Desktop.
fgrep: a style/expressiveness test for stream libraries

How Would You Write fgrep?

This is a little style and expressiveness test for various stream libraries. It is certainly not a proof of anything, so much as an ability to showcase how some basic features of streams fit together with different libraries.

Problem statement: given a library of streams, implement an API analogous to fgrep, i.e., a literal string grepper.

Specifically, implement a function fgrep(test, filenames[, limit=10]), which takes a source string, a list of filenames, and an optional concurrency limit, and produces a stream of Match records { filename: string, lineNumber: number, line: string }, representing all lines of all files that contain the string test. For each matching record, filename is the name of the file where the match was found, lineNumber is the one-indexed line number where the match was found, and line is the contents of the line where the match was found.

The limit argument indicates the maximum number of concurrent filehandles that should be allowed to be open at any given time.

Pseudoimplementation

The basic shape of the problem solution might look something like this:

# string, string, number=10 -> Stream<Match>
fgrep(test, filenames, limit=10):
  filenames.map(filename => fgrepFile(test, filename)) # Stream<Stream<Match>>
           .rateLimit(limit)                           # Stream<Stream<Match>>
           .concat()                                   # Stream<Match>

# string, string -> Stream<Match>
fgrepFile(test, filename):
  lazy-stream(() => line-stream(open-file-stream(filename)))  # Stream<Line>
    .filterMap((line, lineNumber) =>                          # Stream<Match>
                 fgrepLine(test, filename, lineNumber, line))

# string, string, number, number -> Match | null
fgrepLine(test, filename, lineNumber, line):
  if line.contains(test)
  then { filename, lineNumber, line }
  else null

From the top, this starts by creating a stream of lazy streams, each of which greps a single file. These are limited to limit concurrent greps to avoid opening too many filehandles. This produces a stream of streams of matches, which are then flattened sequentially to a stream of matches.

Why This is Interesting

This program is simple enough to be understandable and also useful, but it also introduces a few challenges:

  • combines both binary and value/object streams
  • requires a low-level line splitter, which showcases either transformations from binary streams to value streams or a stdlib of basic file manipulation combinators
  • requires lazy file manipulation and throttling input to avoid dying from too many open filehandles
  • result should be made more readable with simple combinators like map or filterMap or flatMap
  • should also work well with backpressure, although this isn't directly demonstrated without using the function in a larger program
@isaacs
Copy link

isaacs commented Mar 9, 2015

The first nontrivial part of this is the throttling. At a simple approximation, you could provide a list of files, and it could iterate through them one by one. (I'm not sure I'd call it "hard", it's a tricky bit that probably should be a separate module.)

The second is doing line-buffering, but there are many modules that do that already.

Once you have that, with the trivial one-at-a-time iteration, it's a matter of:

// (c) Isaac Z. Schlueter, Released into the Public Domain
function fgrep(test, files) {
  var stream = new FGrep(test, files)
}
util.inherits(FGrep, Readable)
function FGrep (test, files) {
  this.test = test
  this.files = files
  this.index = 0
  this.current = null
  Readable.call(this, { objectMode: true })
  this._process()
}
FGrep.prototype._read = function () {
  if (this.current)
    this.current.resume()
}
FGrep.prototype._process = function () {
  var filename = this.files[++this.index]
  if (!filename)
    return this.push(null)
  this.current = fs.createReadStream(filename).pipe(new LineBufferStream())
  var self = this
  var lineNumber = 0
  this.current.on('data', function (line) {
    lineNumber ++
    if (line.indexOf(self.test) !== -1) {
      if (!self.push({ filename: filename, lineNumber: lineNumber, line: line }))
        this.current.pause()
    }
  })
  this.current.on('end', function () {
    self._process()
  })
}

@kourge
Copy link

kourge commented Mar 9, 2015

A naïve transformation of the basic shape description to Scala weighs at about 30 lines. A little more simplification results in:

object Fgrep {
  case class Match(
    filename: String,
    lineNumber: Int,
    line: String
  )

  def fgrepFile(test: String, filename: String): Iterator[Match] =
    scala.io.Source.fromFile(filename).getLines().zipWithIndex.flatMap {
      case (line, lineNumber) if line.contains(test) =>
        Some(Match(filename, lineNumber, line))
      case _ =>
        None
    }

  def fgrep(test: String, filenames: Seq[String], limit: Int = 10): Iterator[Match] =
    filenames.grouped(limit).flatMap { chunk =>
      chunk.par.map(filename => fgrepFile(test, filename).toStream.force)
    }.flatten
}

The non-trivial part was getting grouping right. Since grouped works by transforming Seq[A] to Iterator[Seq[A]], the type Iterator[Seq[Iterator[Fgrep.Match]]] needed to be flattened twice to get Iterator[Fgrep.Match].

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment