Skip to content

Instantly share code, notes, and snippets.

@xin053
Last active April 4, 2020 12:34
Show Gist options
  • Save xin053/23a88a00c118d14ed8eaea2833e563f8 to your computer and use it in GitHub Desktop.
Save xin053/23a88a00c118d14ed8eaea2833e563f8 to your computer and use it in GitHub Desktop.
[go channel] go channel #go #channel

channel 重点

Operation A Nil Channel A Closed Channel A Not-Closed Non-Nil Channel
Close panic panic succeed to close (C)
Send Value To block for ever panic block or succeed to send (B)
Receive Value From block for ever never block (D) block or succeed to receive (A)

一个基于无缓存 channel 的发送操作将导致发送者 goroutine 阻塞, 直到另一个 goroutine 在相同的 channel 上执行接收操作, 当发送的值通过 channel 成功传输之后, 两个 goroutine 可以继续执行后面的语句, 反之,如果接收操作先发生, 那么接收者 goroutine 也将阻塞, 直到有另一个 goroutine 在相同的 channel 上执行发送操作

channel 基本使用

ch <- x // 将x发送给管道
x = <-ch // 从管道接收数据并赋值给x
<-ch // 从管道接收数据并丢弃数据
close(ch) // 关闭管道
ch = make(chan int) // 创建阻塞管道
ch = make(chan int, 3) // 创建带有缓冲区的管道

for x := range ch {} // 管道被关闭时才会跳出循环

// 不管是只发送管道还是只接收管道,都只在编译期间检查,实际底层都是一样的
ch = make(chan<- int) // 只发送不接收管道
ch = make(<-chan int) // 只接收不发送管道

使用 channel 实现 fibonacchi 生成器

package main

import (
    "fmt"
    "time"
)
func main() {
    fibonacci := func() chan uint64 {
        c := make(chan uint64)
        go func() {
            var x, y uint64 = 0, 1
            for ; y < (1 << 63); c <- y { // here
                x, y = y, x+y
            }
            close(c)
        }()
        return c
    }
    c := fibonacci()
    for x := range c {
        time.Sleep(time.Second)
        fmt.Println(x)
    }
}

Return receive-only channels as results

package main

import (
	"time"
	"math/rand"
	"fmt"
)

func longTimeRequest() <-chan int32 {
	r := make(chan int32)

	go func() {
		// Simulate a workload.
		time.Sleep(time.Second * 3)
		r <- rand.Int31n(100)
	}()

	return r // 只发送还是只接收管道只在编译期间检查
}

func sumSquares(a, b int32) int32 {
	return a*a + b*b
}

func main() {
	rand.Seed(time.Now().UnixNano())

	a, b := longTimeRequest(), longTimeRequest()
	fmt.Println(sumSquares(<-a, <-b)) // 不会立即有结果,会阻塞 3s,直到有数据向管道中发送
}

Pass send-only channels as arguments

package main

import (
	"time"
	"math/rand"
	"fmt"
)

func longTimeRequest(r chan<- int32)  {
	// Simulate a workload.
	time.Sleep(time.Second * 3)
	r <- rand.Int31n(100)
}

func sumSquares(a, b int32) int32 {
	return a*a + b*b
}

func main() {
	rand.Seed(time.Now().UnixNano())

	ra, rb := make(chan int32), make(chan int32)
	go longTimeRequest(ra)
	go longTimeRequest(rb)

	fmt.Println(sumSquares(<-ra, <-rb))
}

The first response wins

Note, if there are N sources, the capacity of the communication channel must be at least N-1, to avoid the goroutines corresponding the discarded responses being blocked for ever.

// 如果缓冲区小于 N-1, 将会导致函数一直阻塞, 无法释放内存, 造成内存泄露
package main

import (
	"fmt"
	"time"
	"math/rand"
)

func source(c chan<- int32) {
	ra, rb := rand.Int31(), rand.Intn(3) + 1
	// Sleep 1s/2s/3s.
	time.Sleep(time.Duration(rb) * time.Second)
	c <- ra
}

func main() {
	rand.Seed(time.Now().UnixNano())

	startTime := time.Now()
	// c must be a buffered channel.
	c := make(chan int32, 5)
	for i := 0; i < cap(c); i++ {
		go source(c)
	}
	// Only the first response will be used.
	rnd := <- c
	fmt.Println(time.Since(startTime))
	fmt.Println(rnd)
}

Another way to implement the first-response-wins use case

Please note, the capacity of the channel used in the blow example must be at least one, so that the first send won't be missed if the receiver/request side has not gotten ready in time.

// 缓存区至少为1, 否则如果接收方还没准备好, 第一个响应的发送方的 select 就回因为阻塞选择执行 default
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func source(c chan<- int32) {
	ra, rb := rand.Int31(), rand.Intn(3)+1
	// Sleep 1s, 2s or 3s.
	time.Sleep(time.Duration(rb) * time.Second)
	select {
	case c <- ra:
	default:
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())

	// The capacity should be at least 1.
	c := make(chan int32, 1)
	for i := 0; i < 5; i++ {
		go source(c)
	}
	rnd := <-c // only the first response is used
	fmt.Println(rnd)
}

The third way to implement the first-response-wins use case

Note: if the channel used in the above example is an unbuffered channel, then there will two goroutines hanging for ever after the select code block is executed. This is a memory leak case.

// 如果使用非缓冲 channel, 将会有两个 routine 一直阻塞, 内存无法释放, 造成内存泄漏
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func source() <-chan int32 {
	// c must be a buffered channel.
	c := make(chan int32, 1)
	go func() {
		ra, rb := rand.Int31(), rand.Intn(3)+1
		time.Sleep(time.Duration(rb) * time.Second)
		c <- ra
	}()
	return c
}

func main() {
	rand.Seed(time.Now().UnixNano())

	var rnd int32
	// Blocking here until one source responses.
	select{
	case rnd = <-source():
	case rnd = <-source():
	case rnd = <-source():
	}
	fmt.Println(rnd)
}

Use Channels for Notifications

1-To-1 notification by sending a value to a channel

package main

import (
	"crypto/rand"
	"fmt"
	"os"
	"sort"
)

func main() {
	values := make([]byte, 32 * 1024 * 1024)
	if _, err := rand.Read(values); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	done := make(chan struct{}) // can be buffered or not

	// The sorting goroutine
	go func() {
		sort.Slice(values, func(i, j int) bool {
			return values[i] < values[j]
		})
		// Notify sorting is done.
		done <- struct{}{}
	}()

	// do some other things ...

	<- done // waiting here for notification
	fmt.Println(values[0], values[len(values)-1])
}

1-To-1 notification by receiving a value from a channel

package main

import (
	"fmt"
	"time"
)

func main() {
	done := make(chan struct{})
		// The capacity of the signal channel can
		// also be one. If this is true, then a
		// value must be sent to the channel before
		// creating the following goroutine.

	go func() {
		fmt.Print("Hello")
		// Simulate a workload.
		time.Sleep(time.Second * 2)

		// Receive a value from the done
		// channel, to unblock the second
		// send in main goroutine.
		<- done
	}()

	// Blocked here, wait for a notification.
	done <- struct{}{}
	fmt.Println(" world!")
}

N-To-1 and 1-To-N notifications

package main

import "log"
import "time"

type T = struct{}

func worker(id int, ready <-chan T, done chan<- T) {
	<-ready // block here and wait a notification
	log.Print("Worker#", id, " starts.")
	// Simulate a workload.
	time.Sleep(time.Second * time.Duration(id+1))
	log.Print("Worker#", id, " job done.")
	// Notify the main goroutine (N-to-1),
	done <- T{}
}

func main() {
	log.SetFlags(0)

	ready, done := make(chan T), make(chan T)
	go worker(0, ready, done)
	go worker(1, ready, done)
	go worker(2, ready, done)

	// Simulate an initialization phase.
	time.Sleep(time.Second * 3 / 2)
	// 1-to-N notifications.
	ready <- T{}; ready <- T{}; ready <- T{}
	// Being N-to-1 notified.
	<-done; <-done; <-done
}

In fact, the ways to do 1-to-N and N-to-1 notifications introduced in this sub-section are not used commonly in practice. In practice, we often use sync.WaitGroup to do N-to-1 notifications, and we do 1-to-N notifications by close channels.

package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(i int, wg *sync.WaitGroup) {
			defer wg.Done()
			fmt.Println(i)
		}(i, &wg)
	}

	wg.Wait()
	fmt.Println("Done!")
}

Broadcast (1-To-N) notifications by closing a channel

By the example in the last sub-section, we can replace the three channel send operations ready <- struct{}{} in the last example with one channel close operation close(ready) to do an 1-to-N notifications.

...
	close(ready) // broadcast notifications
...

Timer: scheduled notification

package main

import (
	"fmt"
	"time"
)

func AfterDuration(d time.Duration) <- chan struct{} {
	c := make(chan struct{}, 1)
	go func() {
		time.Sleep(d)
		c <- struct{}{}
	}()
	return c
}

func main() {
	fmt.Println("Hi!")
	<- AfterDuration(time.Second)
	fmt.Println("Hello!")
	<- AfterDuration(time.Second)
	fmt.Println("Bye!")
}

In fact, the After function in the time standard package provides the same functionality, with a much more efficient implementation. We should use that function instead to make the code look clean.

Check Lengths and Capacities of Channels

We can use the built-in functions len and cap to check the length and capacity of a channel. However, we seldom do this in practice. The reason for we seldom use the len function to check the length of a channel is the length of the channel may have changed after the len function call returns. The reason for we seldom use the cap function to check the capacity of a channel is the capacity of the channel is often known or not important.

Block the Current Goroutine Forever

package main

import "runtime"

func DoSomething() {
	for {
		// do something ...

		runtime.Gosched() // avoid being greedy
	}
}

func main() {
	go DoSomething()
	go DoSomething()
	select{}
}

Try-Send and Try-Receive

try-receive and try-send with len and cap

// try-receive
for len(c) > 0 {
	value := <-c
	// use value ...
}

// try-send
for len(c) < cap(c) {
	c <- aValue
}

A select block with one default branch and only one case branch is called a try-send or try-receive channel operation, depending on whether the channel operation following the case keyword is a channel send or receive operation.

package main

import "fmt"

func main() {
	type Book struct{id int}
	bookshelf := make(chan Book, 3)

	for i := 0; i < cap(bookshelf) * 2; i++ {
		select {
		case bookshelf <- Book{id: i}:
			fmt.Println("succeeded to put book", i)
		default:
			fmt.Println("failed to put book")
		}
	}

	for i := 0; i < cap(bookshelf) * 2; i++ {
		select {
		case book := <-bookshelf:
			fmt.Println("succeeded to get book", book.id)
		default:
			fmt.Println("failed to get book")
		}
	}
}

Check if an unbuffered channel is closed without blocking the current goroutine

Assume it is guaranteed that no values were ever (and will be) sent to a channel

// 只能检测非缓冲 channel
func IsClosed(c chan T) bool {
	select {
	case <-c:
		return true
	default:
	}
	return false
}

Timeout

func requestWithTimeout(timeout time.Duration) (int, error) {
	c := make(chan int)
	// May need a long time to get the response.
	go doRequest(c)

	select {
	case data := <-c:
		return data, nil
	case <-time.After(timeout):
		return 0, errors.New("timeout")
	}
}

Ticker

package main

import "fmt"
import "time"

func Tick(d time.Duration) <-chan struct{} {
	// The capacity of c is best set as one.
	c := make(chan struct{}, 1)
	go func() {
		for {
			time.Sleep(d)
			select {
			case c <- struct{}{}:
			default:
			}
		}
	}()
	return c
}

func main() {
	t := time.Now()
	for range Tick(time.Second) {
		fmt.Println(time.Since(t))
	}
}

In fact, there is a Tick function in the time standard package provides the same functionality, with a much more efficient implementation. We should use that function instead to make code look clean and run efficiently.

Peak/burst limiting

We can implement peak limiting by combining use channels as counting semaphores and try-send/try-receive. Peak-limit (or burst-limit) is often used to limit the number of concurrent requests without blocking any requests.

...
	// Can serve most 10 customers at the same time
	bar24x7 := make(Bar, 10)
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		customer := Consumer{customerId}
		select {
		case bar24x7 <- customer: // try to enter the bar
			go bar24x7.ServeConsumer(customer)
		default:
			log.Print("customer#", customerId, " goes elsewhere")
		}
	}
...

How to Gracefully Close Channels

One general principle of using Go channels is don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders. In other words, we should only close a channel in a sender goroutine if the sender is the only sender of the channel.

M receivers, one sender, the sender says "no more sends" by closing the data channel

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 100

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int, 100)

	// the sender
	go func() {
		for {
			if value := rand.Intn(Max); value == 0 {
				// The only sender can close
				// the channel safely.
				close(dataCh)
				return
			} else {
				dataCh <- value
			}
		}
	}()

	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()

			// Receive values until dataCh is
			// closed and the value buffer queue
			// of dataCh becomes empty.
			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	wgReceivers.Wait()
}

One receiver, N senders, the only receiver says "please stop sending more" by closing an additional signal channel

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)

	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
		// stopCh is an additional signal channel.
		// Its sender is the receiver of channel
		// dataCh, and its receivers are the
		// senders of channel dataCh.

	// senders
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// The try-receive operation is to try
				// to exit the goroutine as early as
				// possible. For this specified example,
				// it is not essential.
				select {
				case <- stopCh:
					return
				default:
				}

				// Even if stopCh is closed, the first
				// branch in the second select may be
				// still not selected for some loops if
				// the send to dataCh is also unblocked.
				// But this is acceptable for this
				// example, so the first select block
				// above can be omitted.
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(Max):
				}
			}
		}()
	}

	// the receiver
	go func() {
		defer wgReceivers.Done()

		for value := range dataCh {
			if value == Max-1 {
				// The receiver of channel dataCh is
				// also the sender of stopCh. It is
				// safe to close the stop channel here.
				close(stopCh)
				return
			}

			log.Println(value)
		}
	}()

	// ...
	wgReceivers.Wait()
}

M receivers, N senders, any one of them says "let's end the game" by notifying a moderator to close an additional signal channel

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
		// stopCh is an additional signal channel.
		// Its sender is the moderator goroutine shown
		// below, and its receivers are all senders
		// and receivers of dataCh.
	toStop := make(chan string, 1)
		// The channel toStop is used to notify the
		// moderator to close the additional signal
		// channel (stopCh). Its senders are any senders
		// and receivers of dataCh, and its receiver is
		// the moderator goroutine shown below.
		// It must be a buffered channel.

	var stoppedBy string

	// moderator
	go func() {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// senders
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					// Here, the try-send operation is
					// to notify the moderator to close
					// the additional signal channel.
					select {
					case toStop <- "sender#" + id:
					default:
					}
					return
				}

				// The try-receive operation here is to
				// try to exit the sender goroutine as
				// early as possible. Try-receive and
				// try-send select blocks are specially
				// optimized by the standard Go
				// compiler, so they are very efficient.
				select {
				case <- stopCh:
					return
				default:
				}

				// Even if stopCh is closed, the first
				// branch in this select block might be
				// still not selected for some loops
				// (and for ever in theory) if the send
				// to dataCh is also non-blocking. If
				// this is unacceptable, then the above
				// try-receive operation is essential.
				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()

			for {
				// Same as the sender goroutine, the
				// try-receive operation here is to
				// try to exit the receiver goroutine
				// as early as possible.
				select {
				case <- stopCh:
					return
				default:
				}

				// Even if stopCh is closed, the first
				// branch in this select block might be
				// still not selected for some loops
				// (and forever in theory) if the receive
				// from dataCh is also non-blocking. If
				// this is not acceptable, then the above
				// try-receive operation is essential.
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == Max-1 {
						// Here, the same trick is
						// used to notify the moderator
						// to close the additional
						// signal channel.
						select {
						case toStop <- "receiver#" + id:
						default:
						}
						return
					}

					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	// ...
	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment