- 并发(Concurrency):Composition of independently executing process (processes in the general sense, not linux processes.
- 并行(Parallism): Simultaneous execution of computations
并发指的是结构设计,并行是运行状态,有很多分解过程的思路,不同的分解思路,得到不同的并发组合设计。
- 传统多线程模型:共享内存,锁,缺点:复杂,不可预测
- CSP:传递数据和所有权,自动同步,不用面对共享内存和锁带来的复杂问题。
初始化
If a package p imports package q, the completion of q's init functions happens before the start of any of p's. The start of the function main.main happens after all init functions have finished.
Goroutine creation
The go statement that starts a new goroutine happens before the goroutine's execution begins.
var a string
func f() {
print(a)
}
func hello() {
a = "hello, world"
go f()
}
goroutine destruction
var a string
func hello() {
go func() { a = "hello" }()
print(a)
}
a的赋值操作没有任何同步事件,所以其值不一定被其他goroutine看到。
Channel communication
- A send on a channel happens before the corresponding receive from that channel completes. 这里没有区分channel的类型。
- The closing of a channel happens before a receive that returns a zero value because the channel is closed.
- A receive from an unbuffered channel happens before the send on that channel completes. 这里区分了channel的类型
var c = make(chan int, 10)
var a string
func f() {
a = "hello, world"
c <- 0
}
func main() {
go f()
<-c
print(a) # 一定会hello world
}
下面代码,如果chanenl是buffered, c = make(chan int, 1)
,那么不保证a打印"hello, world"。
var c = make(chan int)
var a string
func f() {
a = "hello, world"
<-c
}
func main() {
go f()
c <- 0
print(a) # c不是buffer的,那么利用happens-before的第二条规则,一定会输出hello world
}
- The kth receive on a channel with capacity C happens before the k+Cth send from that channel completes.
限制同时最多有3个goroutine同时执行
var limit = make(chan int, 3)
func main() {
for _, w := range work {
go func(w func()) {
limit <- 1
w()
<-limit
}(w)
}
select{}
}
Locks
-
For any sync.Mutex or sync.RWMutex variable l and n < m, call n of l.Unlock() happens before call m of l.Lock() returns.
-
For any call to l.RLock on a sync.RWMutex variable l, there is an n such that the l.RLock happens (returns) after call n to l.Unlock and the matching l.RUnlock happens before call n+1 to l.Lock.
Once
-
只初始化一次的语义:Multiple threads can execute once.Do(f) for a particular f, but only one will run f(), and the other calls block until f() has returned.
-
A single call of f() from once.Do(f) happens (returns) before any call of once.Do(f) returns.
var a string
var once sync.Once
func setup() {
a = "hello, world"
}
func doprint() {
once.Do(setup)
print(a)
}
func twoprint() {
go doprint()
go doprint()
}
//只打印一次hello world
不正确的同步
var a string
var done bool
func setup() {
a = "hello, world"
done = true
}
func main() {
go setup()
for !done {
}
print(a)
}
package main
import "fmt"
func main() {
ch := make(chan bool, 2)
ch <- true
ch <- true
close(ch)
for i := 0; i < cap(ch) +1 ; i++ {
v, ok := <- ch
fmt.Println(v, ok)
}
}
// 返回
// true true
// true true
// false false 第一个false是指在channel中的value类型的零值,也就是false,第二个false是指channel的状态,
如果我们for循环中的条件改成 i < 100,那么循环100次。正确的感知channel被close掉,使用range
package main
import "fmt"
func main() {
ch := make(chan bool, 2)
ch <- true
ch <- true
close(ch)
for v := range ch {
fmt.Println(v) // called twice
}
}
另外
- A receive operation on a closed channel can always proceed immediately, yielding the element type's zero value after any previously sent values have been received.
- close is effectively a broadcast signal to the senders,类似于广播的概念
package main
import (
"fmt"
"sync"
"time"
)
func main() {
finish := make(chan struct{})
var done sync.WaitGroup
done.Add(1)
go func() {
select {
case <-time.After(1 * time.Hour):
case <-finish:
}
done.Done()
}()
t0 := time.Now()
close(finish)
done.Wait()
fmt.Printf("Waited %v for goroutine to stop\n", time.Since(t0))
}
利用channel能始终接收close的信号,在channel作为信号的时候,我们可以close(finish),这样避免我们忘记发送信号。
一个没有初始化的channel会一直阻塞
package main
func main() {
var ch chan bool
ch <- true // blocks forever
}
上面的代码会阻塞,因为没有初始化channel,一个nil channel则会阻塞,同时等待多个channel
// WaitMany waits for a and b to close.
func WaitMany(a, b chan bool) {
var aclosed, bclosed bool
for !aclosed || !bclosed {
select {
case <-a:
aclosed = true
case <-b:
bclosed = true
}
}
}
即使a close后,a channel仍然能够不停的读,在陷入到了for的死循环中,而让 channel b 永远不会被判定为关闭。具体可见这里的输出
package main
import (
"fmt"
"time"
)
func WaitMany(a, b chan bool) {
for a != nil || b != nil {
select {
case <-a:
a = nil
case <-b:
b = nil
}
}
}
func main() {
a, b := make(chan bool), make(chan bool)
t0 := time.Now()
go func() {
close(a)
close(b)
}()
WaitMany(a, b)
fmt.Printf("waited %v for WaitMany\n", time.Since(t0))
}
这个地方,使用nil channel之后,当nil channel在select中,会被优化掉,因为nil channel永远是读不了的,leaving only b which blocks until it is closed, exiting the loop without spinning.
// not gracefully
package main
import (
"fmt"
"time"
)
type Task struct {
ticker *time.Ticker
}
func (t *Task) Run() {
for {
select {
case <-t.ticker.C:
handle()
}
}
}
func handle() {
for i := 0; i < 5; i++ {
fmt.Print("#")
time.Sleep(time.Millisecond * 200)
}
fmt.Println()
}
func main() {
task := &Task{
ticker: time.NewTicker(time.Second * 2),
}
task.Run()
}
//
// $ go run main.go
// #####
// ###^Csignal: interrupt
// Method 1: use goroutine to exit gracefully
func main() {
task := &Task{
ticker: time.NewTicker(time.Second * 2),
}
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
go func() {
select {
case sig := <-c:
fmt.Printf("Got %s signal. Aborting...\n", sig)
os.Exit(1)
}
}()
task.Run()
}
// Another way but more graceful
// Add a channel to Task
// It works becacuse the task is running in the main goroutine
// So what if the task is running in an ordinary goroutine???
//
type Task struct {
closed chan struct{}
ticker *time.Ticker
}
func (t *Task) Run() {
for {
select {
// If we receive a signal, then we exits
case <-t.closed:
return
case <-t.ticker.C:
handle()
}
}
}
// A wrapper to notify
func (t *Task) Stop() {
close(t.closed)
}
func main() {
task := &Task{
ticker: time.NewTicker(time.Second * 2),
}
c := make(chan os.Signal)
// Capcture the Ctrl + C signal
signal.Notify(c, os.Interrupt)
// a go
go func() {
select {
case sig := <-c:
fmt.Printf("Got %s signal. Aborting...\n", sig)
// notify another go routine
task.Stop()
}
}()
task.Run()
}
// main goroutine waits a goroutine to finish
// use sync.WaitGroup
// a new definition to tasks
type Task struct {
closed chan struct{}
wg sync.WaitGroup //
ticker *time.Ticker
}
func main() {
// previous code...
task.wg.Add(1)
// in a goroutine got do tasks
go func() { defer task.wg.Done(); task.Run() }()
// other code...
}
// the whole program
package main
import (
"fmt"
"os"
"os/signal"
"sync"
"time"
)
type Task struct {
closed chan struct{}
wg sync.WaitGroup
ticker *time.Ticker
}
func (t *Task) Run() {
for {
select {
case <-t.closed:
return
case <-t.ticker.C:
handle()
}
}
}
func (t *Task) Stop() {
close(t.closed)
t.wg.Wait()
}
func handle() {
for i := 0; i < 5; i++ {
fmt.Print("#")
time.Sleep(time.Millisecond * 200)
}
fmt.Println()
}
func main() {
task := &Task{
closed: make(chan struct{}),
ticker: time.NewTicker(time.Second * 2),
}
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
task.wg.Add(1)
go func() { defer task.wg.Done(); task.Run() }()
select {
case sig := <-c:
fmt.Printf("Got %s signal. Aborting...\n", sig)
task.Stop()
}
}
使用WaitGroup 比较典型、传统的控制方式,通过Add(int)方法在每次go func之前增加计数,并在goroutine中使用Done()方法使计数减1,在主进程中通过调用Wait()方法等待所有goroutine执行完毕,再执行之后的逻辑。
package main
import (
"sync"
"fmt"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i ++ {
wg.Add(1)
go func(i int) {
defer func() {
wg.Done()
}()
fmt.Printf("Go func %d\n", i)
}(i)
}
wg.Wait()
fmt.Println("All work done.")
}
这种方式的问题:利用goroutine执行完成后这个工作才真正完成。但是如果中间超时或者goroutine任务在某个地方不断循环,就会导致主线程无限等待下去,因此我们需要一种机制来更主动地监控、控制goroutine的运行。
channel的使用
使用channel来通信:
func (s *Scheduler) Start() {
go func() {
s.ticker = time.NewTimer(s.interval)
for {
select {
case <-s.interrupt :
// 在此做一些退出前的必要操作
return
case <-s.ticker.C:
// 在此循环运行任务
}
}()
}
func (s *Scheduler) Shutdown() {
/* 中断调度 */
s.interrupt <- true
}
使用context
// Stream generates values with DoSomething and sends them to out
// until DoSomething returns an error or ctx.Done is closed.
func Stream(ctx context.Context, out chan<- Value) error {
for {
v, err := DoSomething(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case out <- v:
}
}
}
Go官方为我们提供了两个Context,他们都是空context,不可取消、无截止时间、没有任何携带值:
- context.Background() 返回一个context,一般用于根节点
- context.TODO() 返回一个context,当我们不知道应该用什么Context的时候,一般使用这个
同时,利用官方提供的下面四个方法,可以分别获得相应字段衍生的Context:
context.WithCancel(parent Context) (ctx Context, cancel CancelFunc)
context.WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
context.WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
对于上述带有返回的CancelFunc的context,我们可以在父goroutine中通过调用返回的CancelFunc来实现主动通知子goroutine退出,由其所衍生的context也会相应收到通知退出。
context.WithValue(parent Context, key, val interface{}) Context
对于WithValue方法获得的context,我们可以通过context.Value(key)方法获取相应的数据。但是对于关键的数据最好不要通过context来传递。
一个例子
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// Slow function
func sleepRandom(fromFunction string, ch chan int) {
// defer cleanup
defer func() { fmt.Println(fromFunction, "sleepRandom complete") }()
//Perform a slow task
//For illustration purpose,
//Sleep here for random ms
seed := time.Now().UnixNano()
r := rand.New(rand.NewSource(seed))
randomNumber := r.Intn(100)
sleeptime := randomNumber + 100
fmt.Println(fromFunction, "Starting sleep for", sleeptime, "ms")
time.Sleep(time.Duration(sleeptime) * time.Millisecond)
fmt.Println(fromFunction, "Waking up, slept for ", sleeptime, "ms")
// write on the channel if it was passed in
if ch != nil {
ch <- sleeptime
}
}
//Function that does slow processing with a context
//Note that context is the first argument
func sleepRandomContext(ctx context.Context, ch chan bool) {
//Cleanup tasks
//There are no contexts being created here
//Hence, no canceling needed
defer func() {
fmt.Println("sleepRandomContext complete")
ch <- true
}()
//Make a channel
sleeptimeChan := make(chan int)
//Start slow processing in a goroutine
//Send a channel for communication
go sleepRandom("sleepRandomContext", sleeptimeChan)
//Use a select statement to exit out if context expires
select {
case <-ctx.Done():
//If context is cancelled, this case is selected
//This can happen if the timeout doWorkContext expires or
//doWorkContext calls cancelFunction or main calls cancelFunction
//Free up resources that may no longer be needed because of aborting the work
//Signal all the goroutines that should stop work (use channels)
//Usually, you would send something on channel,
//wait for goroutines to exit and then return
//Or, use wait groups instead of channels for synchronization
fmt.Println("sleepRandomContext: Time to return")
case sleeptime := <-sleeptimeChan:
//This case is selected when processing finishes before the context is cancelled
fmt.Println("Slept for ", sleeptime, "ms")
}
}
//A helper function, this can, in the real world do various things.
//In this example, it is just calling one function.
//Here, this could have just lived in main
func doWorkContext(ctx context.Context) {
//Derive a timeout context from context with cancel
//Timeout in 150 ms
//All the contexts derived from this will returns in 150 ms
ctxWithTimeout, cancelFunction := context.WithTimeout(ctx, time.Duration(150)*time.Millisecond)
//Cancel to release resources once the function is complete
defer func() {
fmt.Println("doWorkContext complete")
cancelFunction()
}()
//Make channel and call context function
//Can use wait groups as well for this particular case
//As we do not use the return value sent on channel
ch := make(chan bool)
go sleepRandomContext(ctxWithTimeout, ch)
//Use a select statement to exit out if context expires
select {
case <-ctx.Done():
//This case is selected when the passed in context notifies to stop work
//In this example, it will be notified when main calls cancelFunction
fmt.Println("doWorkContext: Time to return")
case <-ch:
//This case is selected when processing finishes before the context is cancelled
fmt.Println("sleepRandomContext returned")
}
}
func main() {
//Make a background context
ctx := context.Background()
//Derive a context with cancel
ctxWithCancel, cancelFunction := context.WithCancel(ctx)
//defer canceling so that all the resources are freed up
//For this and the derived contexts
defer func() {
fmt.Println("Main Defer: canceling context")
cancelFunction()
}()
//Cancel context after a random time
//This cancels the request after a random timeout
//If this happens, all the contexts derived from this should return
go func() {
sleepRandom("Main", nil)
cancelFunction()
fmt.Println("Main Sleep complete. canceling context")
}()
//Do work
doWorkContext(ctxWithCancel)
}
资料来源
best practices
- context.Background should be used only at the highest level, as the root of all derived contexts
- context.TODO should be used where not sure what to use or if the current function will be updated to use context in future
- context cancelations are advisory, the functions may take time to clean up and exit
- context.Value should be used very rarely, it should never be used to pass in optional parameters. This makes the API implicit and can introduce bugs. Instead, such values should be passed in as arguments.
- Don’t store contexts in a struct, pass them explicitly in functions, preferably, as the first argument. Never pass nil context, instead, use a TODO if you are not sure what to use. The Context struct does not have a cancel method because only the function that derives the context should cancel it.
资料
select
直接输出Go here
另一个代码:
select 执行的流程