Channels are created via the builtin make
function. We specify the type as
chan {datatype}
where by {datatype}
represents the type of data that can be
written to and read from the channel.
myChan := make(chan bool) // Creates a channel for bool values
The zero value of a channel is nil.
var myChan chan bool // myChan == nil
Channels are written to using the chan<-
syntax.
myChan := make(chan bool)
x := true
myChan<- x // Writes x to the channel myChan
Channels are read from using the <-chan
syntax.
x := <-myChan // Reads a value from the myChan channel into x
The read operation is actually multi-value. A second bool
return value will
tell you whether or not the channel has been closed and all values read from it.
myChan := make(chan bool)
close(myChan)
x, isClosed := <-myChan // returns (false, true)
Reading from a nil channel blocks indefinitely. Writing to a nil channel blocks indefinitely. Closing a nil channel panics.
var nilChan chan bool
x := <- nilChan // Blocks forever
nilChan <- true // Blocks forever
close(nilChan) // PANIC!
Reading from a closed channel returns the zero value. Writing to a closed channel panics. Closing an already Closed channel panics.
closedChan := make(chan bool)
close(closedChan)
x := <- closedChan // x == false
closedChan <- true // PANIC!
close(closedChan) // PANIC!
Buffered channels can be created by providing a buffer size as the second argument to make.
buffered := make(chan bool, 10) // Create a bool channel with a buffer size of
10
Writing to an unbuffered channel blocks until another process reads from the channel.
unbuffered := make(chan bool)
unbuffered <- true // Blocks until another process reads from the channel
Writing to a buffered channel only blocks if there is no more room left in the channel's buffer.
buffered := make(chan bool, 3)
buffered <- true // Okay
buffered <- true // Okay
buffered <- true // Okay
buffered <- true // Blocks until another process reads from the channel
Reading from an empty buffered channel or an unuffered channel blocks until another process writes to the channel.
unbuffered := make(chan bool)
buffered := make(chan bool, 1)
x := <- unbuffered // blocks
y := <- buffered // blocks
An unbuffered channel is equivalent to a buffered channel of capacity 0.
As part of the channel's type you can include a direction. By default, channels are bidirectional - they accept both sending and receiving values. You can, however, enforce unidirectional behavior in a type safe manner.
// This function accepts an input channel for reading only and an output
// channel for writing only.
func foo(in chan<- string, out chan-> string) {
// ...
}
This is useful in communicating an API's intended behavior and is enforceable at compile time. It also helps us to define and enforce the concept of the channel's owner vs its consumer. If we isolate the write side of the channel to one goroutine, then this function will be wholly responsible for the lifecycle of the channel and can ensure it is never inadvertantly closed, for example. We do this by making sure that all places we copy the channel to receive a unidirectional, read-only copy that cannot be closed or written to.
Channels can be used in for
loops via range
. If the channel is closed, the
loop will read all remaining values and exit. If the channel is open, the loop
wll read all available values and block until more values become available or
the channel is closed.
myChan := make(chan int, 2)
myChan <- 1
myChan <- 2
close(myChan) // Closing the channel ensures range loop does not block
for n := range myChan {
fmt.Printf("%d\n", n)
}
The select statement lets you wait on multiple channel operations.
select {
case msg1 := <- ch1:
fmt.Printf("Received message from channel 1: %s\n", msg1)
case <- ch2
fmt.Println("Received and discarded message from channel 2")
case ch3 <- "foobar"
fmt.Println("Wrote foobar to channel 3")
}
Often time select statements are contained within a for
loop to continuously
multiplex operations over multiple channels. In a select
statement, whichever
case
is unblocked will execute. If more than one case
is unblocked, one is
chosen at random. This randomness is guaranteed to be uniform.
Like other types initialized via make
, channels are values which hold
a reference to one or more underlying data structrues. This means that when we
pass channels from function to function, we are passing a copy, but each copy
references the same underlying data structures in memory.
This is important to remember. It means we can safely pass channels as function
parameters and know that operations such as close
will impact all copies.
However, we need to be careful when multiple goroutnes share the same copy, such
as when storing the value of the channel as a struct
field. Race conditions
can and wll occur if we mutate the shared memory without proper synchronization.
type Foo struct {
l sync.Mutex
c chan int
}
// If another method sets f.c to nil or another channel, what happens?
func (f *Foo) BarNoSync(n int) {
for ; n > 0; n-- {
go func() {
for i := range f.c { // BAD! Accessing shared memory `f.c` without syncing
//...
}
}()
}
}
func (f *Foo) BarSyncBetter() {
f.l.Lock()
c := f.c // Isolated critical section to method call, not the goroutine
f.l.Unlock()
go func(c <-chan int) { // Type enforces read-only semantics inside goroutine
for i := range c { // Goroutine dealing with local value only
//...
}
}(c) // Pass copy of channel value to each goroutine
}
In general, we should not share channel values via shared memory such as struct fields. When cases arise where we must, we should isolate the shared access (critical sections) to as few occurrences as possible and always synchronize access.
##Patterns
When we are multiplexing communications over one or more channels in a select
block, there may be instances where we cannot block forever and instead want to
se a timeout.
func readOne(c <-chan int) (int, err) {
select {
case val := <-c:
return val, nil
case <-time.After(1 * time.Second):
// Timeout!
return 0, errors.New("Timed out!")
}
}
A basic rate limiting pattern ensures that we adhere to a specific interval for
processing items read from a channel, 200ms in this case. No matter how fast
items are written to the requestChan
, we will process one every 200ms.
limiter := time.Tick(200 * time.Millisecond)
for incomingReq := range requestsChan {
<- limiter
fmt.Println("processing request")
}
A burstable rate limiting pattern is similar to the pattern above, except it allows some level of bursting via a buffered channel. In this case, we can handle bursts of up to 3 requests at once without delay.
burstLimiter := make(chan time.Time, 3)
// Start with a filled buffer to allow bursting from the get-go
for i := 0; i < 3; i++ {
burstLimiter <- time.Now()
}
// Our own timer function which uses our channel
go func() {
for t := range time.Tick(200 * tme.Millisecond) {
burstLimiter <- t
}
}
for incomingReq := range requestsChan {
<- burstLimiter
fmt.Println("processing request")
}
There is a pattern to our pipeline functions:
stages close their outbound channels when all the send operations are done.
stages keep receiving values from inbound channels until those channels are closed.
This pattern allows each receiving stage to be written as a range loop and ensures that all goroutines exit once all values have been successfully sent downstream.
When one process consumes multiple channels, this is a fan in pattern.
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
When many processes read from a single channel, this is a fan out pattern.
TODO
TODO
TODO
TODO
TODO
TODO