Skip to content

Instantly share code, notes, and snippets.

@imjasonh
Last active October 28, 2021 06:11
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save imjasonh/6713e855b431a0c067afea4b74cbf504 to your computer and use it in GitHub Desktop.
Save imjasonh/6713e855b431a0c067afea4b74cbf504 to your computer and use it in GitHub Desktop.
More Powerful Synchronization in Go, using Channels
package holder
import (
"errors"
"time"
)
type chanHolder struct {
setValCh chan string
getValCh chan string
closeCh chan struct{}
}
// NewChanHolder returns a new Holder backed by Channels.
func NewChanHolder() Holder {
h := chanHolder{
setValCh: make(chan string),
getValCh: make(chan string),
closeCh: make(chan struct{}),
}
go h.mux()
return h
}
func (h chanHolder) mux() {
var value string
for {
// if the value is empty, only accept setting, or closing.
if value == "" {
select {
case <-h.closeCh: // we also need to handle closing here!
close(h.setValCh)
close(h.getValCh)
return
case value = <-h.setValCh:
continue
}
}
// once the value is non-empty, it can be set or gotten
// as normal.
select {
case value = <-h.setValCh: // set the current value.
case h.getValCh <- value: // send the current value.
case <-h.closeCh: // closing, time to clean up!
close(h.setValCh)
close(h.getValCh)
return
}
}
}
func (h chanHolder) Get() string {
return <-h.getValCh
}
func (h chanHolder) Set(s string) {
h.setValCh <- s
}
// Close closes the Holder, making calls to Set panic, and calls to Get return
// the last-set value.
func (h chanHolder) Close() {
close(h.closeCh)
}
// ErrTimeout is the error returned by GetWithTimeout if the value
// was not provided before the given timeout.
var ErrTimeout = errors.New("timeout waiting for value")
// GetWithTimeout attempts to get the value, or returns ErrTimeout
// if getting it takes too long.
func (h chanHolder) GetWithTimeout(d time.Duration) (string, error) {
select {
case <-time.After(d):
return "", ErrTimeout
case v := <-h.getValCh:
return v, nil
}
}
package holder
// Holder holds a string value in a concurrency-safe manner.
type Holder interface {
Get() string
Set(string)
}
package holder
import (
"testing"
"time"
)
func TestValue(t *testing.T) {
for _, h := range []Holder{
NewMutexHolder(),
NewChanHolder(),
} {
h.Set("foo")
if got, want := h.Get(), "foo"; got != want {
t.Errorf("Get() got %q, want %q", got, want)
}
h.Set("bar")
if got, want := h.Get(), "bar"; got != want {
t.Errorf("Get() got %q, want %q", got, want)
}
if got, want := h.Get(), "bar"; got != want {
t.Errorf("Get() got %q, want %q", got, want)
}
// These tests are only supported by chanHolder.
if ch, ok := h.(chanHolder); ok {
// Attempt to Get while there is no value, should block indefinitely.
ch.Set("") // clear the value.
done := make(chan struct{})
go func() {
defer close(done)
ch.Get()
}()
select {
case <-time.After(100 * time.Millisecond): // expected.
case <-done:
t.Errorf("Get() returned, expected it to block")
}
// GetWithTimeout should time out eventually.
if got, err := ch.GetWithTimeout(100 * time.Millisecond); err != ErrTimeout {
t.Errorf("GetWithTimeout() got %q (err: %v), want timeout", got, err)
}
ch.Close()
}
}
}
package holder
import "sync"
type mutexHolder struct {
val string
mu sync.Mutex
}
// NewMutexHolder returns a Holder backed by a sync.Mutex.
func NewMutexHolder() Holder {
return &mutexHolder{}
}
func (h *mutexHolder) Get() string {
h.mu.Lock()
defer h.mu.Unlock()
return h.val
}
func (h *mutexHolder) Set(s string) {
h.mu.Lock()
defer h.mu.Unlock()
h.val = s
}

More Powerful Synchronization in Go, using Channels

Here's a common predicament: you have a value, e.g., some string or number or struct, that can be read and written by many goroutines, and you want to make sure readers and writers don't collide -- that readers get the most recently written value, and no other, and that writers don't write the value while a reader is reading it.

You may have an idea in your head about how you would approach this problem, and your idea may be completely correct. But I want to try to level up your Go-fu and see what it looks like to solve this problem using channels, Go's powerful concurrency primitive.

First, let's imagine a simple interface which we'll implement below. This example guards reads and writes of a string, but it could easily guard any other value:

// Holder holds a string value in a concurrency-safe manner.
type Holder interface {
  Get() string
  Set(string)
}

The canonical solution is to use a Mutex. In Go these are provided by sync.Mutex. Here's an example using a sync.Mutex to provide concurrency safety.

type mutexHolder struct {
  val string
  mu sync.Mutex
}

// NewMutexHolder returns a Holder backed by a sync.Mutex.
func NewMutexHolder() Holder {
  return &mutexHolder{}
}

func (h *mutexHolder) Get() string {
  h.mu.Lock()
  defer h.mu.Unlock()
  return h.val
}

func (h *mutexHolder) Set(s string) {
  h.mu.Lock()
  defer h.mu.Unlock()
  h.val = s
}

This is very simple, and in many cases it may give you exactly what you want. If that's the case, you should just use a Mutex and get back to solving the real problem.

But sometimes, you want additional logic and flexibility around your synchronization, and using a Mutex, while easy, doesn't make it easy to add more synchronization features. Mutexes are simple, but can also be limiting.


Here's an alternative solution, which guards reading from happening concurrently with writing, using Channels instead of Mutexes.

type chanHolder struct {
  setValCh chan string
  getValCh chan string
}

// NewChanHolder returns a new Holder backed by Channels.
func NewChanHolder() Holder {
  h := chanHolder {
    setValCh: make(chan string),
    getValCh: make(chan string),
  }
  go h.mux()
  return h
}

func (h chanHolder) mux() {
  var value string
  for {
    select {
    case value = <-h.setValCh: // set the current value.
    case h.getValCh <- value: // send the current value.
    }
  }
}

func (h chanHolder) Get() string {
  return <-h.getValCh
}

func (h chanHolder) Set(s string) {
   h.setValCh <- s
}

This is a bit more code than the Mutex-based solution, but at its heart it's not magic. The key is in the mux method, which is spawned in a goroutine in NewChanHolder.

mux runs an infinite loop with a select statement inside it, which will block until either a value is received from setValCh or sent to getValCh. This ensures that either the value is read or written each time through the for loop. This is where the synchronization comes from, the select statement ensures that value is only being written to or read from at a given time.

The implementation of Set sends the new value to the setter channel, and Get receives from the getter channel.

The value is local to mux's loop, which means that it's impossible for another method on Holder to modify it. This is a useful contrast to the Mutex-based solution, where a new method on mutexHolder can modify the value and forget to acquire the lock first.


But let's imagine a scenario where you want to guarantee to callers that Get always returns a non-empty string, even if you have to wait for it. The value it's holding might be an auth token, and callers requesting the token should be able to guarantee that they've gotten a valid non-empty token before proceeding. In this case, callers to Get should block until a value can be provided by some other goroutine's call to Set.

To do this, we need to handle the empty string as a special value. This is not so simple to implement if all you're using is a Mutex. But with the channel-backed implementation, this isn't terribly difficult at all, it's just another special case in mux:

func (h chanHolder) mux() {
  val value string
  for {
    // if the value is empty, only accept setting.
    if value == "" {
      value = <- h.setValCh
      continue
    }
    // once the value is non-empty, it can be set or gotten as normal.
    select {
    case value = <-h.setValCh: // set the current value.
    case h.getValCh <- value: // send the current value.
    }
  }
}

With this simple tweak to mux, calls to Get will block until a value has been previously Set. Callers can call Set("") to clear the value and force callers of Get to block until Set is called with a non-empty string.

NB: The continue in the above code is important; if a caller calls Set("") while the value is already empty, we want to start over with the for loop, and not proceed down to the select statement, which might provide values to callers of Get.


Some callers of Get might not want to their call to block forever. In the case of an auth token, you may want to just fail if you can't acquire the token after some amount of time. This is also pretty easy to implement with our channel-backed Holder, with another select statement and time.After.

// ErrTimeout is the error returned by GetWithTimeout if the value
// was not provided before the given timeout.
var ErrTimeout = errors.New("timeout waiting for value")

// GetWithTimeout attempts to get the value, or returns ErrTimeout
// if getting it takes too long.
func (h chanHolder) GetWithTimeout(d time.Duration) (string, error) {
  select {
  case <-time.After(d):
    return "", ErrTimeout
  case v := <-h.getValCh:
    return v, nil
  }
}

This sort of feature is not so easy to add if you’ve only got a Mutex guarding your value.


But wait! Eagle-eyed readers might notice that this implementation keeps a goroutine running indefinitely, forever. In doing so, we have committed a cardinal sin of Go: we started a goroutine without knowing how it will stop.

If you expect the Holder to live for the duration of your program, then the goroutine will stop when the program stops, and we don't need to do anything more. On the other hand, if you expect your program to stop setting and getting values at some point, you will want to clean up the Holder when it's not needed anymore. Let's add a Close method to clean up.

type chanHolder struct {
  setValCh chan string
  getValCh chan string
  closeCh  chan struct{}
}

// NewChanHolder returns a new Holder backed by Channels.
func NewChanHolder() Holder {
  h := chanHolder {
    setValCh: make(chan string),
    getValCh: make(chan string),
    closeCh:  make(chan struct{}),
  }
  go h.mux()
  return h
}

func (h chanHolder) mux() {
  val value string
  for {
    // if the value is empty, only accept setting, or closing.
    if value == "" {
      select {
      case <-h.closeCh: // we also need to handle closing here!
        close(h.setValCh)
        close(h.getValCh)
        return
      case value = <- h.setValCh:
        continue
    }
    select {
    case value = <-h.setValCh: // set the current value.
    case h.getValCh <- value: // send the current value.
    case <-h.closeCh: // closing, time to clean up!
      close(h.setValCh)
      close(h.getValCh)
      return
    }
  }
}

func (h chanHolder) Close() {
  close(h.closeCh)
}

This version adds a third channel, which is used to signal to mux that it's time to clean up. This channel doesn't need to pass any information, it just needs to signal a single event, which we can do just by closing the channel. Because it doesn't need to carry any information, it can be a chan struct{}.

mux's select statement receives from closeCh in addition to getValCh and setValCh, and when it sees the channel is closed, it closes the other two channels and returns from mux.

With this implementation, if any callers call Set after Close, it will panic ("send on closed channel"). If any callers call Get after Close, it will return an empty string, because they're receiving from a closed channel. If any callers call Close multiple times, it will panic ("close of a closed channel"). With a little more code you can change this behavior however you want.


Using channels to synchronize data access can unlock a new level of power and control, but obviously also requires a fair bit of care to make sure you're not leaking resources. With great power comes great responsibility. Future Work Using channels for synchronization opens up even more possibilities than those described here. Consider how you might implement these features:

  • Don't panic if Set is called after Close, return an error instead, or do nothing.
  • Don't return an empty string if Get is called after Close, return an error instead, or return the last value that was set.
  • Don't panic if Close is called multiple times, return an error instead, or do nothing.
  • Implement SetWithExpiry(s string, d time.Duration), which sets the value and clears it after the duration, unless another value is Set in the meantime. If the same value is set again before the value expires, just update the expiration time. What kinds of cases might such a Holder be useful?
  • Implement chanHolder using only one internal channel. What would the channel communicate, and how would mux use it? How might this change make other future changes easier?
  • Consider the difference in the performance of the Mutex-based solution and the channel-based solution. Write a benchmark to determine which is faster under which use cases. If you plan to use this pattern in a hot code path, the flexibility of the channel-based solution may not be worth the performance penalty. As usual, you should profile your code to determine hotspots before optimizing code, especially if it makes the code harder to understand.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment