Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Channels Are Not Enough or Why Pipelining Is Not That Easy

Channels Are Not Enough

... or Why Pipelining Is Not That Easy

Golang Concurrency Patterns for brave and smart.

By @kachayev

Intro

Go was designed for building concurrency applications easily so it has goroutines for running independent computations and channels to communicate with each other. We've all heard this story before. All examples and tutorials look just fine: we can create a new channel, we can send to a channel, we can read from a channel, we even have the nifty, elegant select statement (btw, why do we still have statements in 21th century?), blocking reads and buffers…

channels

Keynote: 99% of time I don't really care if the response is delivered with a channel or a magical unicorn brought it on its horn.

It’s so cool to find yourself writing tutorial for beginners! And it's a bit painful when you are trying to implement big and sophisticated system(s). Channels are primitives. They are low-level building blocks and I highly doubt you want to work with them on daily basis.

Look at "advanced patterns" and "pipeline". Not that simple right? Too many things to keep in mind and constantly memorize: when and how to close channels, how to propagate errors, how to free resources. I'm complaining about this not because I've tried implementing something and failed. But because I work with stuff like this every single day.

You can say that it's not necessary for beginner to understand all of the details of the whole picture. But... are described patterns really "advanced"? Unfortunately, the answer is NO. They're basic and common.

Take a closer look to pipeline problem. Is it really a pipeline? No… "...for each path from directory calculate MD5 checksum and collect result to a single map[string]string...". It’s just a pmap (parallel map). Or pmap with pool of executors in case of bounded parallelism. And pmap should not require that I enter so many lines of code. Wanna look at the real pipelining problem - I will describe one at the end of the article (see "Building Twitter Analyzer" paragraph).

So What About Patterns?

To develop real-world applications fast enough we should be able to distil higher level abstractions than primitive channels. They are only a transport layer. We need application level abstractions to write programs (compare to OSI), otherwise you will find yourself in constantly digging through low-level details of your channels network trying to find why it doesn’t work. Only on production machine. From time to time. Without any reasonable steps to reproduce. Check Erlang OTP that aims to solve the same problem: to protect you from low-level message passing code.

What is the problem with low-level code? There is a great article "Edward C++Hands":

Having scissors for hands in not all that bad. Edward has many talents: he can, for instance, create stunning dog hairdos. Don’t get me wrong — there were many stunning dog hairdos on display (I mean C++ code that was elegant and simple) but the bulk of the conference was about how to avoid mutilation and how to deliver first aid in case of accidental amputation.

At Kyiv Go Meetup I experienced the same situation: 20 lines of clean and readable code on a single slide. One non-trivial race condition and one possible runtime panic. Was it obvious for all listeners? No. Not even for half of them.

Any Reason for Panic?

Ok. Let’s try to collect such patterns. From work experience, from books, from other languages (yeah, guys, I know that it's hard to believe but there are several other languages also designed for concurrency).

Rob Pike talks about Fan-in, Fan-out. It’s useful in many ways, but still about the network of channels. Not about your application. In any case, let's check (shamelessly stolen from here).

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Hm... <-chan int. Not that abstract to reuse in my own application (i.e. move to library)... And not that obvious to reimplement each time when I need it. How to make it reusable? <-chan interface{}? Welcome to the land of types casting and runtime panics. If you want to implement high level fan-in (merge) you’re losing type safety. The same (unfortunately) goes for all other patterns.

What I really want here is:

func merge[T](cs ...<-chan T) <-chan T

Yeah, I know that Go doesn't have generics because who ever need them?

What is the Weather Now?

Return to patterns. Let’s analyse hypothetical project that’s really close to practical experience in server-side development. We need a server that accepts request about weather for a given USA state and respond with info collected from OpenWeatherMap. In such a way:

$ http localhost:4912/weather?q=CA
HTTP/1.1 200 OK
Access-Control-Allow-Credentials: true
Access-Control-Allow-Methods: GET, POST
Access-Control-Allow-Origin: *
Connection: keep-alive
Content-Type: application/json; charset=utf-8
[{
    "clouds": {
        "all": 40
    },
    "id": 5391959,
    "main": {
        "temp": 288.89,
        "temp_max": 291.48,
        "temp_min": 286.15
    },
    "name": "San Francisco",
    "weather": [
        {
            "description": "mist",
            "icon": "50d",
            "id": 701,
            "main": "Mist"
        }
    ]
}, {
    "clouds": {
        "all": 90
    },
    "id": 5368361,
    "main": {
        "temp": 292.83,
        "temp_max": 296.15,
        "temp_min": 289.15
    },
    "name": "Los Angeles",
    "weather": [
        {
            "description": "mist",
            "icon": "50d",
            "id": 701,
            "main": "Mist"
        }
    ]
}]

Pmap

Let's start from something that we already know. So, we've got the request ?q=CA. I don't want to specify where we're going to find list of relative cities. We can use for this database, in-memory cache or whatever reasonable. Assume that we have magical findCities(state) function that returns chan City (as most universal go representation for lazy sequence). Next? For each city we have to call OpenWeatherMap API and collect results into single map[City]Weather. We've talked about such pattern already. It's pmap. And I want my code to look like

chanCities := findCities(state)
resolver := func(name City) Weather { return openWeatherMap.AskFor(name) }
weather := chanCities.Par.Map(resolver)

or in case of bounded parallelism

chanCities := findCities(state)
pool := NewWorkers(20)
resolver := func(w Worker, name City) Weather { return w.AskFor(name) }
weather := chanCities.Par.BoundedMap(pool, resolver)

That's it. I want all this <-done synchronizations and select sacramentals to be entire hidden.

Futures & Promises

It can take a long time to get current weather, i.e. when you have long list of cities for concrete state. Off course you don't want to duplicate API calls so you should manage simultaneous requests somehow:

func collect(state string) Weather {
  calc, ok := calculations.get(state) // check if it's in progress
  if !ok {
      calc = calculations.run(state) // run otherwise
  }
  return calc.Wait() // wait until done
}

This is so called future/promise. From Wiki:

They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is yet incomplete.

I've heard from a number of people that go future is a simple:

f := make(chan int, 1)

Which is wrong because all waiters should get result. And this version is also wrong:

f := make(chan int, 1)
v <- f
f <- v
// use v here

Because it's impossible to manage resources this way. And I wish you luck finding a bug if somebody missed the f <- v part in his code.

It's not that hard though to implement promise directly storing all waiters (I'm not sure that this code is really bugs free):

type PromiseDelivery chan interface{}
type Promise struct {
	sync.RWMutex
	value interface{}
	waiters []PromiseDelivery
}

func (p *Promise) Deliver(value interface{}) {
	p.Lock()
	defer p.Unlock()
	p.value = value
	for _, w := range p.waiters {
		locW := w
		go func(){
			locW <- value
		}()
	}
}

func (p *Promise) Value() interface{} {
	if p.value != nil {
		return p.value
	}

	delivery := make(PromiseDelivery)
	p.waiters = append(p.waiters, delivery)
	return <-delivery
}

func NewPromise() *Promise {
	return &Promise{
		value: nil,
		waiters: []PromiseDelivery{},
	}
}

How to use it?

p := NewPromise()
go func(){
  p.Deliver(42)
}()
p.Value().(int) // blocks and returns interface{} when ready

But interface{} and type casting are already here. What do I really want?

// .. somewhere in well-tested library or even in stdlib
type PromiseDelivery[T] chan T
type Promise[T] struct {
	sync.RWMutex
	value T
	waiters []PromiseDelivery[T]
}
func (p *Promise[T]) Deliver(value T)
func (p *Promise[T]) Value() T
func NewPromise[T]() *Promise[T]

// in my code:
v := NewPromise[int]()
go func(){
  v.Deliver("woooow!") // compilation error
  v.Deliver(42)
}()
v.Value() // blocks and returns 42, not interface{}

No, sure, nobody needs generics. What the hell am I talking about?

You can also avoid p.Lock() using select to listen to deliver and wait operations in a single goroutine. You can also introduce special .ValueWithTimeout method which will be really helpful for end users. And there're lots of other "you can...". Despite the fact that we're talking about 20 lines of code (that probably will grow each time you discover more details of futures/promises interactions). Do I really need to know (or even think) about channels that delivered me the value? No.

Pub/sub

Assume that we want to build real-time service. So now our client can open websocket connection with a q=CA request and get instant updates about weather changes in California. It would look something like:

// deliverer
calculation.WhenDone(func(state string, w Weather) {
  broker.Publish("CA", w)
})

// client
ch := broker.Subscribe("CA")
for update := range ch {
  w.Write(update.Serialize())
}

It's a typical pub/sub. You can learn about it from Advanced Go Patterns talk and even find ready-to-use implementations. The problems is that they are all about interfaces.

Is it possible to implement:

broker := NewBroker[String, Weather]()
// so that
broker.Subs(42) // compilation failure
// and
broker.Subs("CA") // returns (chan Weather) not (chan interface{})

Sure! If you're brave enough to copy-paste code from project to project with small fixes here and there.

Map/filter

Assume that we want to give our users more flexibility and we're introducing new query params: show which can be equal to all|temp|wind|icon.

Probably you'll start from simples:

ch := broker.Subscribe("CA")
for update := range ch {
  temps := []Temp
  for _, t := update.Temp {
    temps = append(temps, t)
  }

  w.Write(temps)
}

But after 10 such methods you'll realize that it's not composable and even boring. Maybe you need

ch := broker.Subscribe("CA").Map(func(w Weather) Temp { return w.Temp })
for update := range ch {
  w.Write(update)
}

Wait, did I just say that channel is a functor? As well as Future/Promise.

p := NewPromise().Map(func(w Weather) Temp { return w.Temp })
go func(){
  p.Deliver(Weather{Temp{42}})
}()
p.Value().(Temp) // Temp, not Weather

Which means that I can reuse the same code for channels and futures. You may also end up with something like transducers. I frequently use in ClojureScript code tricks like

(->> (send url) ;; returns chan, put single value to it {:status 200 :result 42} when ready
     (async/filter< #(= 200 (:status %))) ;; check that :status is 200
     (async/map< :result)) ;; expose only 42 to end user
;; note, that it will close all channels (including implicit intermediate one) properly

Do I really have to worry if x is a channel or a future if I can simply do x.Map(transformation) and get back value of the same type? Probably not. In this case why am I allowed to do make(chan int) and not make(Future int)?

Request/Reply

Assume that our users like our service and use it actively. So we decided to introduce a simple API limitation: number of requests for each IP, per day. It's simply to collect the number of calls into single map[string]int. Go docs says "Do not communicate by sharing memory; instead, share memory by communicating". Ok, sounds like a nice idea.

So for the first we need the goroutine that will be responsible for collecting number of requests.

req := make(chan string)
go func() { // wow, look here - it's an actor!
  m := map[string]int{}
  for r := range req {
    if v, ok := m[r]; !ok {
      m[r] = 1
    } else {
      m[r] = v + 1
    }
  }	
}()

go func() {
  req <- "127.0.0.2"
}()

go func() {
  req <- "127.0.0.1"
}()

It's easy. Now can calculate number of requests for each IP. Not that much... We also have to be able to ask permission to execute request.

type Req struct {
  ip string
  resp chan int
}

func NewRequest(ip string) *Req {
  return &Req{ip, make(chan int)}
}

requests := make(chan *Req)

go func() {
  m := map[string]int{}
  for r := range requests {
    if v, ok := m[r.ip]; !ok {
      m[r.ip] = 1
    } else {
      m[r.ip] = v + 1
    }
    r.resp <- m[r.ip]
  }	
}()

go func() {
  r := NewRequest("127.0.0.2")
  requests <- r
  fmt.Println(<- r.resp)
}()

go func() {
  r := NewRequest("127.0.0.1")
  requests <- r
  fmt.Println(<- r.resp)
}()

I won't even try to ask you about generic solution (without hardcoded strings and ints). I ask you instead, to check if everything is right with this code? Is it that simple?

Are you sure, that r.resp <- m[r.ip] is a good solution? No. Definitely not. I don't want anybody to wait for slow clients. Yes? And what if I have too many slow clients? Maybe I have to handle this somehow.

Is this part requests <- r that simple? What if my actor (server) is too busy to reply now? Maybe I need to handle timeouts here...

And from time to time I need to specify initialization process.. And cleanup... Both with timeouts. And the ability to hold requests until initialization is finished.

What about priority of calls? I.e. when I need to implement Dump procedure for my analytic system but I don't want to pause all users until analytics data is collected.

And... Looks like gen_server in Erlang. For sure I want it to be implemented once and shipped with well-documented library with high test coverage rate. 98% of time I don't want to see this resp: make(chan int, ?) and I really don't want to think what should I put instead of ?.

99% of time I don't really care if the response is delivered with a channel or a magical unicorn brought it on its horn.

And counting

There are many other common concurrency situations. I think you've already gotten the idea.

The Pain

You can tell me, that described patterns are not common. But.. I have to implement almost all of them in all my projects. Every. Single. Time. Maybe I’m just out of luck and your projects are as simple as tutorial for beginners.

I know that most of you will say "world is too hard and programming is too painful". I continue to upset you: there are at least a few examples of languages that solved this problem at least partially. Or at least working on solving it. Haskell and Scala type systems give you ability to build strong high-level abstractions or even custom control flows to deal with concurrency. An opposite Clojure is dynamically typed that encourage you to distil and share high level abstraction. Rust has channels and generics.

Make it works -> Make it beautiful -> Make it reusable.

Now that the first step is done. What's next? Don't get me wrong, go is a perspective language: channels and goroutines are way much better than i.e. pthread, but should we really stop here?

P.S. Building Twitter Analyzer

About real-world pipelining.

Probably you've already seen Twitter Analytics which is really great. Assume for the time being, that it's not introduced yet and we have to run our own analysis: for a given username calculate how many unique users saw (at least theoretically) each of his own tweets. How we can do this? Not that hard really: read user's timeline, filter away all retweets and replies, for all other tweets ask for all retweeters, for each retweeter ask for a list of followers, merge all retweeters' followers together and add own user's followers. What I want to have as a result of these steps: map[TweetId][]Username (retweeters) and map[Username][]Username. It would be enough to build a fancy table to show to the requester.

Few technical details that you should be aware of:

  • Twitter API requires OAuth for each call and sets strong limitations (450 calls per 15 minutes per each user). To deal with such a limitation, we're going to use a predefined list of OAuth tokens (i.e. 50) organized into a pool of workers where each worker is able to suspend himself before facing problems with limits.

  • Most Twitter API calls use pagination for results with since_id or max_id continuations. So you can't rely on the fact that single call to worker will give you full result.

Example of rough implementation. Note, that there is no need for you to understand everything in this file. Just the opposite. If you can't understand it after quick screening then we're on the right path.

So what do we have here?

  1. Few stages of computations: TimelineReader -> RetweetersReader -> FollowersReader -> FinalReducer.

  2. Self-stage messaging. All stages are a recursive cause of pagination. It means that each stage emits messages both to the next stage and to itself. It's much harder to deal with cancellations in this case. Or even find out when everything is done on a single stage.

  3. Early propagation. There are at least 2 cases here: for the first in order to collect mapping from TweetId to []Username we need to send collected info directly from RetweetersReader to FinalReducer; for the second we know from the very beginning that we need to fetch followers for the initial user, so his username should be emitted to RetweetersReader bypassing TimelineReader stage.

  4. Middleware reducers. FollowersReader is not only a pipe. It's a reducer for usernames that we've already seen (cause you don't want to duplicate your work).

  5. Long running workers. You can't just wait for workers shutdown in many cases, i.e. when you're implementing a server that should respond to many clients simultaneously.

rbucker commented Sep 14, 2014

One thing that rings so true for me ... if you want to adopt a new technology or framework ... write a tutorial or workshop ... then decide whether it's the right path. For example docker and CoreOS are great but when I tried to write a workshop I found enough warts to justify getting a better handle on the use-cases before just adopting containers on the word of the brogrammers.

Generics allow for code to be written more concisely, but generally speaking the 'template' method of generics that the author 'examples' can be dangerous. Most of the time you do not actually want a function to be able to accept a term of any type. Being precise about what exactly a function or method is supposed to accept leads to both safer code and code that is easier to read.

Go isn't perfect (no language is) but if someone feels very strongly about certain language features then Go is a great place to test them out. Just branch the source code for the compiler and implement something new to your hearts content [0]. Even if they do not add it to the official language specs, you still might be able to market your features yourself. Maybe you are looking to be the Bjarne for a Go++?

[0] http://golang.org/doc/install/source

tonyhb commented Sep 15, 2014

If you don't want runtime panics when selecting from a channel, use a typecaster function:

func typecast(i interface{}) int64 {
  switch i.(type) {
    case int64:
      return i.(int64)
  }
  return nil
}

Use it like this:

select {
  case i := <- comms:
    var val int64
    if val = typecast(i); i == nil {
      // whatever
    }
    // carry on safely.
}

Allows you to create any library which communicates over channels via interfaces. In your app, just typecast to what you expect. A little bit more verbose, but no biggie. Especially for something so generic. Solves a lot of what you're not happy with.

tylerb commented Sep 15, 2014

Typecast function is nice, though I prefer a direct if assert block:

if val,ok := i.(int64); ok {
  // type assertion succeeded
}
tonyhb commented Sep 15, 2014

That's awesome! Good little tip. Definitely going to be used going forward.

andys commented Sep 15, 2014

I'm not sure that you're comparing like with like when you talk about "timeouts and contention" for communicating over channels, but not shared memory.

I understand the point of using channels is it is a more efficient way to deal with threads executing in parallel, than locks around shared memory, and the efficiency gain up with the number of cores. This is even more the case with the example presented - of many writers, one reader.

(Having said that, today I seriously doubt if the current Go run-time implementation is actually more efficient in practice.)

momer commented Sep 15, 2014

Much understanding of why the language asserts itself this way can be had by reading Hoare's 1978 paper, Communicating Sequential Processes.

The essential proposals are:
(1) Dijkstra's guarded commands [8] are adopted (with a slight change of notation) as sequential control structures, and as the sole means of introducing and controlling nondeterminism.
(2) A parallel command, based on Dijkstra's parbegin [6], specifies concurrent execution of its constituent sequential commands (processes). All the processes start simultaneously, and the parallel command ends only when they are all finished. They may not communicate with each other by updating global variables.
(3) Simple forms of input and output command are introduced. They are used for communication between concurrent processes.

And later on, Hoare addresses the desire for convenience of one type you'd mentioned (the automatic closing of a repetitive command / channel); the ideas apply:

But the dangers of convenient facilities are notorious. For example, the repetitive commands with input guards may tempt the programmer to write them without making adequate plans for their termination; and if it turns out that the automatic termination is unsatisfactory, reprogramming for explicit termination will involve severe changes, affecting even the interfaces between the processes.

hugows commented Sep 15, 2014

Nobody needs generics (https://groups.google.com/forum/#!msg/golang-nuts/iwlzqNa4h3g/xCy3G2V-xwkJ):

" to be honest, if i need a stack, i usually implement it for the type in question, or inline, as it's only a very small number of lines: "

And maybe he is right, maybe you can always write your own for the type you are using in 50 lines...
But it doesn't feel right...

e-oz commented Sep 16, 2014

If you implement it every day, why not create a library to reuse your code?

Owner

@jamm The entire article is about "why I can't create a library".

hugows commented Sep 16, 2014

@jamm For example check this https://github.com/petar/GoLLRB It has to use interface{} everywhere due to the lack of generics.

samuell commented Sep 19, 2014

@kachayev: Have you had a look at GoFlow?

savaki commented Sep 22, 2014

@kachayev Rather than try to recreate another language in Go, have you considered using channels differently? After reading your gist, I put together a reusable library to handle parallel and optionally redundant calls with timeouts and cancelation. None of this requires any typecasting or use of interface{}. All it requires is thinking differently.

github.com/savaki/par

For giggles, I used that library to put together a weather service on Heroku that:

  • finds all the cities in the state you specify from one service (sba.gov)
  • queries openweathermap to find the weather in those cities
  • queries are performed with a redundancy of 2 with concurrency of 10
  • after 3 seconds, system returns what results it has regardless of whether or not it finished and actively closes any outstanding connections

Again, all with type safety and no use of type casting.

http://weather-redux.herokuapp.com/weather?state=ca

github.com/savaki/weather-redux

Owner

@savaki

  1. I don't see where and how you "think different"
  2. I don't understand what do you mean by "type safety" and "None of this requires interface{}". Look at context library that you use as "thinking different" point.

Love the article! :)

For the moment, though, instead of using generics, what
I do is create editor snippets with placeholders for types
for whatever pattern I use.

While we wait for a suitable "generics" design to appear,
yasnippet for Emacs, for example, may save you a bit of hair:

https://www.youtube.com/watch?v=-4O-ZYjQxks

Kills part of the problem pretty much instantly, but kludgy, I'd agree.
Also, not suitable for every pattern out there. :)

"btw, why do we still have statements in 21th century"

Not all languages are functional. And that is a good thing.

@caelifer Can you please explain how is that a good thing?

AlekSi commented Mar 12, 2016

Because diversity is a good thing?

tucnak commented Mar 12, 2016

Just use for loops, man! No need to be much clever.

bump 👍 just passing here...

AFAIK Go solves generics with interfaces. Simply implement the interface with your types. The stack example is trivial, many data structures would just need a comparable style function. Am i missing something?

Stack<T> is replaced by having (roughly, written OTO) :

type Stackable interface{
// whatever fn's required by  "<T>"
} 

type Stack {
  guts []Stackable
}

func (s *Stack) Push(item Stackable) {
  s.guts = append(guts, item)
}

func (s *Stack) Pop() Stackable {
  item = s.guts[len(guts)-1]
  s.guts = s.guts[:len(guts)-1]
}
cstrahan commented May 3, 2016 edited

@MaerF0x0 Stacks need not know anything about their elements, so there's no need to create a custom interface. Therefore, you might as well replace every occurrence of Stackable in your example with interface{}. But now you have to perform an unsafe cast (e.g. someStack.Pop().(HopefullyTheRightType), which is precisely what you would avoid with generics / parametric polymorphism.

The alternative to using one type of Stack implemented in terms of interface{} would be multiple implementations for each type you want to have a stack of (e.g. StackOfInt, StackOfString, etc). That code bloat is also something that would be avoided with parametric polymorphism.

With Go, you either give up type safety, or you duplicate your definitions for each data type.

Go devs aren't against adding Generics. There is an issue open on the project's Github issue tracker for proposals.

Personally I would love Go to have a more functional style of programming.

Another thing to consider is a channel generator function that takes a chan {}interface, and outputs only the specific type if the cast worked allowing you to avoid the cast for the caller (but not in the rest of the channel logic and it's obviously going to need a different one for int's and so on).

func OnlyStrings(in <-chan interface{}) <-chan string {
    out := make(chan string)
    go func() {
        for val := range in {
            if str, ok := val.(string); ok {
                out <- str
            }
        }
        close(out)
    }()
    return out
}

func main() {

    c := make(chan interface{})
    go func() {
        c <- "Hello"
        c <- 1
        c <- "World"
        c <- 2
        close(c)
    }()

    for s := range OnlyStrings(c) {
        log.Printf("I am a string: %s\n", s)
    }
}
kokizzu commented Feb 3, 2017

add that func (optionalOwner) functionName[GenericNames](params) returnValues {} to Go! :+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment