Skip to content

Instantly share code, notes, and snippets.

@jmcclell
Last active March 20, 2018 20:08
Show Gist options
  • Save jmcclell/5365994bfb8c01b2cc0633ca2ba262c9 to your computer and use it in GitHub Desktop.
Save jmcclell/5365994bfb8c01b2cc0633ca2ba262c9 to your computer and use it in GitHub Desktop.
Concurrency Patterns in Go-Lang

Concurrency Patterns in Go-Lang

Channels Primer

Creating Channels

Channels are created via the builtin make function. We specify the type as chan {datatype} where by {datatype} represents the type of data that can be written to and read from the channel.

myChan := make(chan bool) // Creates a channel for bool values

The zero value of a channel is nil.

var myChan chan bool // myChan == nil

Writing to Chanels

Channels are written to using the chan<- syntax.

myChan := make(chan bool)
x := true
myChan<- x // Writes x to the channel myChan

Reading from Channels

Channels are read from using the <-chan syntax.

x := <-myChan // Reads a value from the myChan channel into x

The read operation is actually multi-value. A second bool return value will tell you whether or not the channel has been closed and all values read from it.

myChan := make(chan bool)
close(myChan)
x, isClosed := <-myChan // returns (false, true)

Nil vs Closed Channels

Reading from a nil channel blocks indefinitely. Writing to a nil channel blocks indefinitely. Closing a nil channel panics.

var nilChan chan bool

x := <- nilChan // Blocks forever
nilChan <- true // Blocks forever
close(nilChan)  // PANIC!

Reading from a closed channel returns the zero value. Writing to a closed channel panics. Closing an already Closed channel panics.

closedChan := make(chan bool)
close(closedChan)

x := <- closedChan // x == false
closedChan <- true // PANIC!
close(closedChan)  // PANIC!

Buffered vs Unbuffered Channels

Buffered channels can be created by providing a buffer size as the second argument to make.

buffered := make(chan bool, 10) // Create a bool channel with a buffer size of
10

Writing to an unbuffered channel blocks until another process reads from the channel.

unbuffered := make(chan bool)
unbuffered <- true // Blocks until another process reads from the channel

Writing to a buffered channel only blocks if there is no more room left in the channel's buffer.

buffered := make(chan bool, 3)
buffered <- true  // Okay
buffered <- true  // Okay
buffered <- true  // Okay
buffered <- true  // Blocks until another process reads from the channel 

Reading from an empty buffered channel or an unuffered channel blocks until another process writes to the channel.

unbuffered := make(chan bool)
buffered := make(chan bool, 1)

x := <- unbuffered // blocks
y := <- buffered // blocks

An unbuffered channel is equivalent to a buffered channel of capacity 0.

Channel Direction

As part of the channel's type you can include a direction. By default, channels are bidirectional - they accept both sending and receiving values. You can, however, enforce unidirectional behavior in a type safe manner.

// This function accepts an input channel for reading only and an output
// channel for writing only.
func foo(in chan<- string, out chan-> string) {
 // ...
}

This is useful in communicating an API's intended behavior and is enforceable at compile time. It also helps us to define and enforce the concept of the channel's owner vs its consumer. If we isolate the write side of the channel to one goroutine, then this function will be wholly responsible for the lifecycle of the channel and can ensure it is never inadvertantly closed, for example. We do this by making sure that all places we copy the channel to receive a unidirectional, read-only copy that cannot be closed or written to.

Range over Channels

Channels can be used in for loops via range. If the channel is closed, the loop will read all remaining values and exit. If the channel is open, the loop wll read all available values and block until more values become available or the channel is closed.

myChan := make(chan int, 2)
myChan <- 1
myChan <- 2
close(myChan) // Closing the channel ensures range loop does not block

for n := range myChan {
  fmt.Printf("%d\n", n)
}

Multiplexing Channels with Select

The select statement lets you wait on multiple channel operations.

select {
case msg1 := <- ch1:
  fmt.Printf("Received message from channel 1: %s\n", msg1)
case <- ch2
  fmt.Println("Received and discarded message from channel 2")
case ch3 <- "foobar"
  fmt.Println("Wrote foobar to channel 3")
}

Often time select statements are contained within a for loop to continuously multiplex operations over multiple channels. In a select statement, whichever case is unblocked will execute. If more than one case is unblocked, one is chosen at random. This randomness is guaranteed to be uniform.

Passing Channels as Values

Like other types initialized via make, channels are values which hold a reference to one or more underlying data structrues. This means that when we pass channels from function to function, we are passing a copy, but each copy references the same underlying data structures in memory.

This is important to remember. It means we can safely pass channels as function parameters and know that operations such as close will impact all copies. However, we need to be careful when multiple goroutnes share the same copy, such as when storing the value of the channel as a struct field. Race conditions can and wll occur if we mutate the shared memory without proper synchronization.

type Foo struct {
  l sync.Mutex
  c chan int
}

// If another method sets f.c to nil or another channel, what happens?
func (f *Foo) BarNoSync(n int) {
  for ; n > 0; n-- {
    go func() {
      for i := range f.c  { // BAD! Accessing shared memory `f.c` without syncing
        //...
      }
    }()
  }
}

func (f *Foo) BarSyncBetter() {
  f.l.Lock()
  c := f.c  // Isolated critical section to method call, not the goroutine
  f.l.Unlock()
  go func(c <-chan int) { // Type enforces read-only semantics inside goroutine
    for i := range c  { // Goroutine dealing with local value only
      //...
    }
  }(c) // Pass copy of channel value to each goroutine
}

In general, we should not share channel values via shared memory such as struct fields. When cases arise where we must, we should isolate the shared access (critical sections) to as few occurrences as possible and always synchronize access.

##Patterns

Simple Timeout

When we are multiplexing communications over one or more channels in a select block, there may be instances where we cannot block forever and instead want to se a timeout.

func readOne(c <-chan int) (int, err) {
  select {
  case val := <-c:
    return val, nil
  case <-time.After(1 * time.Second):
    // Timeout!
    return 0, errors.New("Timed out!")
  }
}

Rate Limiting

A basic rate limiting pattern ensures that we adhere to a specific interval for processing items read from a channel, 200ms in this case. No matter how fast items are written to the requestChan, we will process one every 200ms.

limiter := time.Tick(200 * time.Millisecond)

for incomingReq := range requestsChan {
  <- limiter
  fmt.Println("processing request")
}

A burstable rate limiting pattern is similar to the pattern above, except it allows some level of bursting via a buffered channel. In this case, we can handle bursts of up to 3 requests at once without delay.

burstLimiter := make(chan time.Time, 3)
// Start with a filled buffer to allow bursting from the get-go
for i := 0; i < 3; i++ {
  burstLimiter <- time.Now()
}

// Our own timer function which uses our channel
go func() {
  for t := range time.Tick(200 * tme.Millisecond) {
    burstLimiter <- t
  }
}


for incomingReq := range requestsChan {
  <- burstLimiter
  fmt.Println("processing request")
}

Pipelines

There is a pattern to our pipeline functions:

stages close their outbound channels when all the send operations are done.
stages keep receiving values from inbound channels until those channels are closed.

This pattern allows each receiving stage to be written as a range loop and ensures that all goroutines exit once all values have been successfully sent downstream.

Fan In

When one process consumes multiple channels, this is a fan in pattern.

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Fan Out

When many processes read from a single channel, this is a fan out pattern.

TODO

Fan In -> Fan Out

TODO

Cancellation with Channels

TODO

Cancellation with Context

TODO

Worker Pools

TODO

Graceful Shutdown

TODO

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment