Skip to content

Instantly share code, notes, and snippets.


miku/.gitignore Secret

Last active Dec 18, 2020
What would you like to do?
Fast, parallel filters in Go

Fast, parallel filters in Go

Lightning Talk, Leipzig Gophers, 2020-12-18, 19:00 CET,

The Filter

From Software Tools, 1976, Chapter 2 ("Filters"):

By obvious analogy to electronics (or plumbing) we call such programs filter, because they make useful changes to a stream of data passing through.

The examples in the book use a Ratfor ("Rational FORTRAN") - which is quite readable.


The trick of getting most filters right is to find an orderly way of recognizing the components of the input stream.

Examples that chapter mentions:

  • wordcount (words)
  • entab, detab
  • overstrike (the overstrike looks for backspaces in typewriter text ...)

  • text compression with run length encoding (runs of one or more identical characters), compress, expand

On page 45, we find a quite modern term:

[...] This is what is known as "defensive programming". It costs next to nothing in source text or execution time, yet it reduces the chance of the program going wild should an important control variable somehow be damaged.

Some general design principle for args handling:

An ususual argument is given some reasonable interpretation whenever possible, and a harmless interpretation otherwise.

On page 63, finally:

Now for the payoff. We can use charcount in series with translit to provide all sorts of useful information.

translit ¬@n | charcount

We call this contruction a pipeline.

Snippets from the summary:

By pushing information about particular devices as far our to the edges as possible, we expand the range of programs that can freely cooperate.

[...] Once you learn that you can isolate and adapt by introducing filters, you begin to think more freely in terms of combining existing programs instead of writing new ones. You overcome much of the temptation to build a whole new package; instead you adapt pieces that already exist. You become, in short, more of a tool user.

The other chapters of the book are concerned with implementations of text patterns (grep), editing (ed), formatting (roff) (TeX was still in the future), macro processing, Ratfor-Fortran translator.

It also is about "clean code", in that it is concerned with approriate data structures, defensive programming, cohesion (reason for being an entity of its own), short subroutines, readability - which make software more robust (as you have an easier time understanding the components); and finally, composability on the user level (i.e. pipes and filters).


  • keep it simple
  • build it in stages
  • let someone else do the hard part

Tiny library: parallel

Problem: Process larger amounts of data (e.g. 10k or 10M records), with a filter.

  • components coupled by a byte stream reminds me of the io interfaces

I wanted to have a minimal interface for

  • reading a stream
  • applying code in parallel
  • writing results

Decouple form (how) and content (what)

  • specify tranformation function
  • do not care about how and when it is executed

Example noop transformation:

func Noop(b []byte) ([]byte, error) {
    return b, nil


Design is minimalistic (but could be reduced further, I guess):

p := parallel.NewProcessor(os.Stdin, os.Stdout, Noop)
if err := p.Run(); err != nil {

Have a processor struct that encapsulates streams and function.

type Processor struct {
    BatchSize       int
    RecordSeparator byte
    NumWorkers      int
    SkipEmptyLines  bool
    r               io.Reader
    w               io.Writer
    f               TransformerFunc

Let p.Run() setup channels, reading and batching.



package main

import (


func main() {
	p := parallel.NewProcessor(os.Stdin, os.Stdout, func(p []byte) ([]byte, error) {
		return bytes.ToUpper(p), nil
	if err := p.Run(); err != nil {


$ go run xu.go < xu.go | head -7

Fetch links in parallel (batch size of 1, many parallel workers). We can use a generic fetch function.

func Fetch(link string) (*FetchResult, error) {
	start := time.Now()
	resp, err := client.Get(link)
	if err != nil {
		return nil, err
	defer resp.Body.Close()
	n, err := io.Copy(ioutil.Discard, resp.Body)
	if err != nil {
		return nil, err
	elapsed := time.Since(start)
	return &FetchResult{
		URL:        link,
		StatusCode: resp.StatusCode,
		Length:     n,
		Elapsed:    fmt.Sprintf("%0.2f", elapsed.Seconds()),
	}, nil

Then adapt input, output and error handling in the parallel processor.

func main() {
	p := parallel.NewProcessor(strings.NewReader(input), os.Stdout, func(b []byte) ([]byte, error) {
		link := string(bytes.TrimSpace(b))
		if len(link) == 0 {
			return nil, nil
		r, err := Fetch(link)
		if err != nil {
			return nil, nil
		return MarshalEnd(r, []byte("\n"))
	p.BatchSize = 1
	p.NumWorkers = 128
	if err := p.Run(); err != nil {

Hello, Pipes!

$ unzip -p | cut -d , -f 2 | shuf -n 10 | go run xf.go


View raw

(Sorry about that, but we can’t show files that are this big right now.)

View raw

(Sorry about that, but we can’t show files that are this big right now.)

View raw

(Sorry about that, but we can’t show files that are this big right now.)

View raw

(Sorry about that, but we can’t show files that are this big right now.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment