Operation | A Nil Channel | A Closed Channel | A Not-Closed Non-Nil Channel |
---|---|---|---|
Close | panic | panic | succeed to close (C) |
Send Value To | block for ever | panic | block or succeed to send (B) |
Receive Value From | block for ever | never block (D) | block or succeed to receive (A) |
一个基于无缓存 channel 的发送操作将导致发送者 goroutine 阻塞, 直到另一个 goroutine 在相同的 channel 上执行接收操作, 当发送的值通过 channel 成功传输之后, 两个 goroutine 可以继续执行后面的语句, 反之,如果接收操作先发生, 那么接收者 goroutine 也将阻塞, 直到有另一个 goroutine 在相同的 channel 上执行发送操作
ch <- x // 将x发送给管道
x = <-ch // 从管道接收数据并赋值给x
<-ch // 从管道接收数据并丢弃数据
close(ch) // 关闭管道
ch = make(chan int) // 创建阻塞管道
ch = make(chan int, 3) // 创建带有缓冲区的管道
for x := range ch {} // 管道被关闭时才会跳出循环
// 不管是只发送管道还是只接收管道,都只在编译期间检查,实际底层都是一样的
ch = make(chan<- int) // 只发送不接收管道
ch = make(<-chan int) // 只接收不发送管道
package main
import (
"fmt"
"time"
)
func main() {
fibonacci := func() chan uint64 {
c := make(chan uint64)
go func() {
var x, y uint64 = 0, 1
for ; y < (1 << 63); c <- y { // here
x, y = y, x+y
}
close(c)
}()
return c
}
c := fibonacci()
for x := range c {
time.Sleep(time.Second)
fmt.Println(x)
}
}
package main
import (
"time"
"math/rand"
"fmt"
)
func longTimeRequest() <-chan int32 {
r := make(chan int32)
go func() {
// Simulate a workload.
time.Sleep(time.Second * 3)
r <- rand.Int31n(100)
}()
return r // 只发送还是只接收管道只在编译期间检查
}
func sumSquares(a, b int32) int32 {
return a*a + b*b
}
func main() {
rand.Seed(time.Now().UnixNano())
a, b := longTimeRequest(), longTimeRequest()
fmt.Println(sumSquares(<-a, <-b)) // 不会立即有结果,会阻塞 3s,直到有数据向管道中发送
}
package main
import (
"time"
"math/rand"
"fmt"
)
func longTimeRequest(r chan<- int32) {
// Simulate a workload.
time.Sleep(time.Second * 3)
r <- rand.Int31n(100)
}
func sumSquares(a, b int32) int32 {
return a*a + b*b
}
func main() {
rand.Seed(time.Now().UnixNano())
ra, rb := make(chan int32), make(chan int32)
go longTimeRequest(ra)
go longTimeRequest(rb)
fmt.Println(sumSquares(<-ra, <-rb))
}
Note, if there are N sources, the capacity of the communication channel must be at least N-1, to avoid the goroutines corresponding the discarded responses being blocked for ever.
// 如果缓冲区小于 N-1, 将会导致函数一直阻塞, 无法释放内存, 造成内存泄露
package main
import (
"fmt"
"time"
"math/rand"
)
func source(c chan<- int32) {
ra, rb := rand.Int31(), rand.Intn(3) + 1
// Sleep 1s/2s/3s.
time.Sleep(time.Duration(rb) * time.Second)
c <- ra
}
func main() {
rand.Seed(time.Now().UnixNano())
startTime := time.Now()
// c must be a buffered channel.
c := make(chan int32, 5)
for i := 0; i < cap(c); i++ {
go source(c)
}
// Only the first response will be used.
rnd := <- c
fmt.Println(time.Since(startTime))
fmt.Println(rnd)
}
Another way to implement the first-response-wins use case
Please note, the capacity of the channel used in the blow example must be at least one, so that the first send won't be missed if the receiver/request side has not gotten ready in time.
// 缓存区至少为1, 否则如果接收方还没准备好, 第一个响应的发送方的 select 就回因为阻塞选择执行 default
package main
import (
"fmt"
"math/rand"
"time"
)
func source(c chan<- int32) {
ra, rb := rand.Int31(), rand.Intn(3)+1
// Sleep 1s, 2s or 3s.
time.Sleep(time.Duration(rb) * time.Second)
select {
case c <- ra:
default:
}
}
func main() {
rand.Seed(time.Now().UnixNano())
// The capacity should be at least 1.
c := make(chan int32, 1)
for i := 0; i < 5; i++ {
go source(c)
}
rnd := <-c // only the first response is used
fmt.Println(rnd)
}
The third way to implement the first-response-wins use case
Note: if the channel used in the above example is an unbuffered channel, then there will two goroutines hanging for ever after the select code block is executed. This is a memory leak case.
// 如果使用非缓冲 channel, 将会有两个 routine 一直阻塞, 内存无法释放, 造成内存泄漏
package main
import (
"fmt"
"math/rand"
"time"
)
func source() <-chan int32 {
// c must be a buffered channel.
c := make(chan int32, 1)
go func() {
ra, rb := rand.Int31(), rand.Intn(3)+1
time.Sleep(time.Duration(rb) * time.Second)
c <- ra
}()
return c
}
func main() {
rand.Seed(time.Now().UnixNano())
var rnd int32
// Blocking here until one source responses.
select{
case rnd = <-source():
case rnd = <-source():
case rnd = <-source():
}
fmt.Println(rnd)
}
1-To-1 notification by sending a value to a channel
package main
import (
"crypto/rand"
"fmt"
"os"
"sort"
)
func main() {
values := make([]byte, 32 * 1024 * 1024)
if _, err := rand.Read(values); err != nil {
fmt.Println(err)
os.Exit(1)
}
done := make(chan struct{}) // can be buffered or not
// The sorting goroutine
go func() {
sort.Slice(values, func(i, j int) bool {
return values[i] < values[j]
})
// Notify sorting is done.
done <- struct{}{}
}()
// do some other things ...
<- done // waiting here for notification
fmt.Println(values[0], values[len(values)-1])
}
1-To-1 notification by receiving a value from a channel
package main
import (
"fmt"
"time"
)
func main() {
done := make(chan struct{})
// The capacity of the signal channel can
// also be one. If this is true, then a
// value must be sent to the channel before
// creating the following goroutine.
go func() {
fmt.Print("Hello")
// Simulate a workload.
time.Sleep(time.Second * 2)
// Receive a value from the done
// channel, to unblock the second
// send in main goroutine.
<- done
}()
// Blocked here, wait for a notification.
done <- struct{}{}
fmt.Println(" world!")
}
N-To-1 and 1-To-N notifications
package main
import "log"
import "time"
type T = struct{}
func worker(id int, ready <-chan T, done chan<- T) {
<-ready // block here and wait a notification
log.Print("Worker#", id, " starts.")
// Simulate a workload.
time.Sleep(time.Second * time.Duration(id+1))
log.Print("Worker#", id, " job done.")
// Notify the main goroutine (N-to-1),
done <- T{}
}
func main() {
log.SetFlags(0)
ready, done := make(chan T), make(chan T)
go worker(0, ready, done)
go worker(1, ready, done)
go worker(2, ready, done)
// Simulate an initialization phase.
time.Sleep(time.Second * 3 / 2)
// 1-to-N notifications.
ready <- T{}; ready <- T{}; ready <- T{}
// Being N-to-1 notified.
<-done; <-done; <-done
}
In fact, the ways to do 1-to-N and N-to-1 notifications introduced in this sub-section are not used commonly in practice. In practice, we often use sync.WaitGroup
to do N-to-1 notifications, and we do 1-to-N notifications by close channels.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(i int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println(i)
}(i, &wg)
}
wg.Wait()
fmt.Println("Done!")
}
Broadcast (1-To-N) notifications by closing a channel
By the example in the last sub-section, we can replace the three channel send operations ready <- struct{}{}
in the last example with one channel close operation close(ready)
to do an 1-to-N notifications.
...
close(ready) // broadcast notifications
...
Timer: scheduled notification
package main
import (
"fmt"
"time"
)
func AfterDuration(d time.Duration) <- chan struct{} {
c := make(chan struct{}, 1)
go func() {
time.Sleep(d)
c <- struct{}{}
}()
return c
}
func main() {
fmt.Println("Hi!")
<- AfterDuration(time.Second)
fmt.Println("Hello!")
<- AfterDuration(time.Second)
fmt.Println("Bye!")
}
In fact, the After
function in the time
standard package provides the same functionality, with a much more efficient implementation. We should use that function instead to make the code look clean.
We can use the built-in functions len
and cap
to check the length and capacity of a channel. However, we seldom do this in practice. The reason for we seldom use the len
function to check the length of a channel is the length of the channel may have changed after the len
function call returns. The reason for we seldom use the cap
function to check the capacity of a channel is the capacity of the channel is often known or not important.
package main
import "runtime"
func DoSomething() {
for {
// do something ...
runtime.Gosched() // avoid being greedy
}
}
func main() {
go DoSomething()
go DoSomething()
select{}
}
try-receive and try-send with len
and cap
// try-receive
for len(c) > 0 {
value := <-c
// use value ...
}
// try-send
for len(c) < cap(c) {
c <- aValue
}
A select
block with one default
branch and only one case
branch is called a try-send or try-receive channel operation, depending on whether the channel operation following the case
keyword is a channel send or receive operation.
package main
import "fmt"
func main() {
type Book struct{id int}
bookshelf := make(chan Book, 3)
for i := 0; i < cap(bookshelf) * 2; i++ {
select {
case bookshelf <- Book{id: i}:
fmt.Println("succeeded to put book", i)
default:
fmt.Println("failed to put book")
}
}
for i := 0; i < cap(bookshelf) * 2; i++ {
select {
case book := <-bookshelf:
fmt.Println("succeeded to get book", book.id)
default:
fmt.Println("failed to get book")
}
}
}
Assume it is guaranteed that no values were ever (and will be) sent to a channel
// 只能检测非缓冲 channel
func IsClosed(c chan T) bool {
select {
case <-c:
return true
default:
}
return false
}
func requestWithTimeout(timeout time.Duration) (int, error) {
c := make(chan int)
// May need a long time to get the response.
go doRequest(c)
select {
case data := <-c:
return data, nil
case <-time.After(timeout):
return 0, errors.New("timeout")
}
}
package main
import "fmt"
import "time"
func Tick(d time.Duration) <-chan struct{} {
// The capacity of c is best set as one.
c := make(chan struct{}, 1)
go func() {
for {
time.Sleep(d)
select {
case c <- struct{}{}:
default:
}
}
}()
return c
}
func main() {
t := time.Now()
for range Tick(time.Second) {
fmt.Println(time.Since(t))
}
}
In fact, there is a Tick
function in the time
standard package provides the same functionality, with a much more efficient implementation. We should use that function instead to make code look clean and run efficiently.
We can implement peak limiting by combining use channels as counting semaphores and try-send/try-receive. Peak-limit (or burst-limit) is often used to limit the number of concurrent requests without blocking any requests.
...
// Can serve most 10 customers at the same time
bar24x7 := make(Bar, 10)
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
customer := Consumer{customerId}
select {
case bar24x7 <- customer: // try to enter the bar
go bar24x7.ServeConsumer(customer)
default:
log.Print("customer#", customerId, " goes elsewhere")
}
}
...
One general principle of using Go channels is don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders. In other words, we should only close a channel in a sender goroutine if the sender is the only sender of the channel.
M receivers, one sender, the sender says "no more sends" by closing the data channel
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
// the sender
go func() {
for {
if value := rand.Intn(Max); value == 0 {
// The only sender can close
// the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()
// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
// Receive values until dataCh is
// closed and the value buffer queue
// of dataCh becomes empty.
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
One receiver, N senders, the only receiver says "please stop sending more" by closing an additional signal channel
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel
// dataCh, and its receivers are the
// senders of channel dataCh.
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
// The try-receive operation is to try
// to exit the goroutine as early as
// possible. For this specified example,
// it is not essential.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first
// branch in the second select may be
// still not selected for some loops if
// the send to dataCh is also unblocked.
// But this is acceptable for this
// example, so the first select block
// above can be omitted.
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// the receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == Max-1 {
// The receiver of channel dataCh is
// also the sender of stopCh. It is
// safe to close the stop channel here.
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
M receivers, N senders, any one of them says "let's end the game" by notifying a moderator to close an additional signal channel
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown
// below, and its receivers are all senders
// and receivers of dataCh.
toStop := make(chan string, 1)
// The channel toStop is used to notify the
// moderator to close the additional signal
// channel (stopCh). Its senders are any senders
// and receivers of dataCh, and its receiver is
// the moderator goroutine shown below.
// It must be a buffered channel.
var stoppedBy string
// moderator
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
// Here, the try-send operation is
// to notify the moderator to close
// the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
// The try-receive operation here is to
// try to exit the sender goroutine as
// early as possible. Try-receive and
// try-send select blocks are specially
// optimized by the standard Go
// compiler, so they are very efficient.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first
// branch in this select block might be
// still not selected for some loops
// (and for ever in theory) if the send
// to dataCh is also non-blocking. If
// this is unacceptable, then the above
// try-receive operation is essential.
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// Same as the sender goroutine, the
// try-receive operation here is to
// try to exit the receiver goroutine
// as early as possible.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first
// branch in this select block might be
// still not selected for some loops
// (and forever in theory) if the receive
// from dataCh is also non-blocking. If
// this is not acceptable, then the above
// try-receive operation is essential.
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max-1 {
// Here, the same trick is
// used to notify the moderator
// to close the additional
// signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}