Skip to content

Instantly share code, notes, and snippets.

@dreamer2q
Last active April 17, 2020 06:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dreamer2q/f022425fdff0d1fc42c9db0ef675cf69 to your computer and use it in GitHub Desktop.
Save dreamer2q/f022425fdff0d1fc42c9db0ef675cf69 to your computer and use it in GitHub Desktop.
go_study

Go语言goroutine通道学习

学习Go语言也有一段时间了,Go带给我的最直观的感受就是强大,简洁的并发能力。 于是乎,Go在Web等需要并发的地方大方光彩。我还记得第一次用Go写出一个httpServer是, 简直不敢相信如此简单明了。这可能是我接触语言较少有关系吧,反正如果是用C语言来写的花, 我估计还在为Socket监听,和多线程困恼。反正Go带给我的就是怎么酸爽!

Goroutine

Go语言中,每一个并发的活动成为goroutine,创建一个goroutine只需要在调用的函数前面加上关键字go,即可创建一个协程。 goroutine类型线程,但是比线程过于轻量,称为协程

这里给出书本上面的示例,实现一个并发的时钟服务器。

func main(){
    lis,err := net.Listen("tcp","localhost:8080")
    if err != nil {
        log.Fatal(err)
    }
    for {
        conn,err := lis.Accept()
        if err != nil {
            log.Println(err)
        }
        go func(){
            defer conn.Close()
            for {
                _,err := io.WriteString(conn, time.Now().Format("15:04:05\n"))
                if err != nil {
                    return 
                }
                time.Sleep(1*time.Second)
            }
        }(conn net.Conn)
        /*
        这里不要使用内部函数来捕获变量,申明一个参数,将需要的变量传递进来。
        */
    }
}

这里唯一需要关注的就是go启动的匿名函数,这个函数处理每一个连接请求,然后每秒钟发送一次格式化的时间。 仅仅加上了go,就让简易的Server有了并发能力。

通道

对于多个协程,它们可能做着互相独立不干扰的工作,但也可能协程之间需要密切配置,来共同完成同样的任务,比如说支持并发的WebSpider,可能有许多的goroutine不辞劳苦地爬爬爬, 但是改如何控制它们的整体行为,比如说某个goroutine爬过的url,其它的就不用再爬了。又或者说,用户想要停止,如何通知每个goroutine,让它们停下手中的活,乖乖的退出呢?

这个时候或许通道就派上用场了,这也是Go语言推荐的方案。

通道是可以让一个goroutine发送特定值到另一个goroutine的通信机制。每一个通道是一个具体类型的导管,叫做通道的元素类型

通道使用内置函数make来创建,通道类似map传递的是引用,零值是nil

  • ch := make(chan int) //创建一个通道

通道支持发送 接送关闭操作,前两个是通信,有点类似socket,在发送和接受的时候,若缓冲区满/空,则会进入阻塞状态,关闭后的通道,只允许会宕机。

  • ch <- x //发送
  • x = <-ch //接送&赋值
  • <- ch //仅接送
  • close(ch) //关闭一个通道

无缓冲通道

通道在创建的时候可以指定缓存的大小,默认的没有缓存。此时对其的读写操作都会造成阻塞,因此需要至少两个goroutine,一个负责读,另一个负责写,完成goroutine之间的通信。

没有缓冲的通道的特性实现两个goroutine同步化,因此又称为同步通道。这个时候若仅仅作为同步使用,一般会创建make(chan struct{})这个的通道以此强调。

管道 && 单向通道

听名字是不是很熟悉,类似shell里面的管道概念,例如ps aux|grep XX我们就使用到管道,一个命令的输出作为另一个命令的输入。 Go的通道很容易实现管道的功能,下面是一个管道的示例

func main(){
    naturals := make(chan int)
    squares := make(chan int)

    //计数器
    go func(){
        for x:=0 ; x<100 ; x++ {
            naturals <- x
        }
        close(naturals) //通知接收方停止
    }()
    
    //平方产生器
    go func(){
        for x := range naturals {
            squares <- x*x
        }
        close(squares) //同上
    }()

    //打印器
    for x := range squares {
        fmt.println(x)
    }
}

上面的例程有点类似流水生产线,上游->中游->下游,上游只负责写,中游负责读写,下游负责读。

对于明确知道只使用管道的读写之一的,我们可以写成函数,写成单通道的参数。

  • func counter(out chan<- int) //只写
  • func squarer(out chan<- int, in <-chan int)
  • func printer(in <-chan int)

这样若在函数内部错误的读一个只写的通道,编译器就会报错,可以帮助我们及早发现问题。

缓存通道

在创建的通道的时候指定大于零的容量,通道就变成了缓存通道缓存通道队列的特性。

缓存通道大小,容量可以使用len,cap内置函数来获取。

Select多路复用

当一个goroutine需要处理多个通道的时候,我们不能让一个某个通道阻塞我们的goroutine从而忽略了其它通道的处理,这个时候select就排上用场了(有点类似socket的轮询操作)。

select {
    case <-ch1:

    case x := <-ch2:

    case ch3 <- y:

    //可选
    default:
    //当default出现时,select永不阻塞,相当于轮询
    //没有default时,若所有case都是阻塞状态,这select会阻塞,
    //有任一case得到响应,select则进入相应的case执行代码
    //若同时有多个case满足条件,select随机选择一个进入
}

由于通道的可以是nil,零值,对零值的通道进行读写将会永远阻塞, 因此select中的零值case将永不会选中,这个特性可以用来开启/关闭一些功能。

示例

来个示例,加深一下理解

并发目录遍历

var verbose = flag.Bool("v", false, "Show verbose progress message")
var done = make(chan struct{}) //标识是否停止

func main(){
    flag.Parse()
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }
    go func(){
        os.Stdin.Read(make([]byte,1)) //任意按键停止程序
        close(done)
    }()

    fileSize := make(chan int64) //接受文件大小
    var n sync.WaitGroup
    for _, root := range roots {
        n.Add(1)
        go walkDir(root, &n, fileSize)
    }
    go func(){
        n.Wait()
        close(fileSize)
    }()

    var tick <-chan time.Time
    if *verbose {
        tick = time.Tick(500*time.Millisecond) //500毫秒显示一次
    }
    
    var nfiles, nbytes int64
loop:
    for {
        select{
            case <-done:
                for range fileSize{} //吃空所有goroutine
                return
            case size,ok := <-fileSize:
                if !ok { 
                    break loop  //跳出for循环
                }
                nfiles ++
                nbytes += size
            case <-tick.C:  //定时任务
                printDiskUsage(nfiles,nbytes)
        }
    }
    printDiskUsage(nfiles, nbytes)
    fmt.Println(nbytes)
}

func printDiskUsage(nfiles,nbytes int64){
    fmt.Printf("du: %d files, %s\n",nfiles,formatUnit(nbytes))
}

func formatUnit(nbytes int64) string {
    units := []string{"bytes","Kb","Mb","Gb","Pb"}
    index := 0
    var unit = units[index]
    var b = float64(nbytes)
    for b>1e3 && index<len(units) {
        index ++
        unit = units[index]
        b /= 1e3
    }
    return fmt.Sprintf(".2f %s",b,unit)
}

func isCanceled() bool {
    select{
        case <-done:
            return true
        default:
            return false
    }
}

func walkDir(dir string, n *sync.WaitGroup, fileSize <-chan int64){
    defer n.Done()
    if isCanceled() {
        return
    }
    for _,entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subDir := filePath.Join(dir,entry.Name())
            go walkDir(subDir,n,fileSize)
        } else {
            fileSize <- entry.Size()
        }
    }
}

var sema = make(chan struct{}, 20) //最多20个goroutine

func dirents(dir string) []os.FileInfo {
    select {
        case sema <- struct{}{}: //获取令牌
        case <-done:
            return nil
    }
    defer func(){ <-sema }() // 释放令牌

    entries,err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr,"du: %v\n", err)
        return nil
    }
    return entries
}

这个示例解决的问题

  • 限制goroutine的数量(20个)
  • 等待goroutine的结束
  • 快速响应的终止
  • 特性的开关

结尾

goroutine可谓是Go的一大法宝,配合通道可谓锦上添花。

以后可能会有许多的坑,而且这些东西我的理解还不够。

需要学习的东西还好多,比如说并发安全,变量共享等等。

Goroutine 与 线程

可增长的栈

每个OS线程有一个固定大小的栈内存(通常2MB),栈内存用于保存函数调用中的局部变量。

对于一个小的goroutine,比如仅仅等待一个WaitGroup再关闭一个通道,2MB的栈太浪费了。对于复杂和深度递归函数,固定大小的栈不够大。

一个goroutine生命周期开始时只有一个很小的栈(通常2KB),与OS线程不同的是,goroutine的栈不是大小固定的,可以按需增大和缩小。

  • 使用通道构造一个把任意多个goroutine串联再一起的流水程序。在内存耗尽之前你能创建的最大流水线级数是多少?一个值穿过整个流水线需要多久?
func main(){

    var ch = make(chan struct{})
	var number int = 1e6
	fmt.Println("Creating goroutines",number)
	startTime := time.Now()
	for i := 0; i < number; i++ {
		go func(in <-chan struct{}, out chan<- struct{}) {  //流水线
			out <- <-in
		}(ch, ch)
	}
	fmt.Println("Creating finished",time.Since(startTime))
	startTime = time.Now()
	ch <- struct{}{}
	<- ch
	fmt.Println("Time: ", time.Since(startTime))
}

Output

Creating goroutines 1000000
Creating finished 6.6581723s
Time:  11.6568727s

当我尝试创建3*10^6个goroutine时,我的goland卡没了,就是自动关闭了。 下图是我重开的goland,然后时间看不到了。 3*10^6个goroutine

goroutine真是强大,可以随便开到10W个。

电脑配置

  • i5-8265U
  • 8g ddr4 2666

goroutine调度

OS线程由OS内核来调度。因为OS线程由内核来调度,所有控制权权限从一个线程到另一个线程需要一个完整的上下文切换(context switch)。

Go运行时包含一个自己的调度器,这个调度器使用一个称为m:n调度的技术,它可以复用/调度m个goroutine到n个OS线程。

与操作系统线程调度器不同,Go调度器由特定的Go语言结构来触发。因为它不需要切换到内核语境,所以调用一个goroutine比调用一个线程成本低很多。

  • 写一个程序,两个goroutine通过两个无缓冲通道来互相转发消息。这个程序能每秒多少次通信?
func main() {
	var counter int64
	var t = time.NewTimer(1 * time.Second)
	var chIn = make(chan struct{})
	var chOut = make(chan struct{})
	var done = make(chan struct{})
	go func() {
		for {
			select {
			case <-done:
				return
			default:
				chIn <- struct{}{}
				<-chOut
				counter++
			}
		}
	}()
	go func() {
		for {
			select {
			case <-done:
				return
			default:
				<-chIn
				chOut <- struct{}{}
			}
		}
	}()
	select {
	case <-t.C:
		close(done)
		fmt.Println("Counter:",counter)
	}
}

Output

Counter: 1816315

GOMAXPROCS

Go调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。 默认是机器的CPU内核数(我们有超线程啊,一个掰成俩来用!?)。

正在休眠或者正在被通道通信阻塞的goroutine不需要占用线程。阻塞在I/O和其它系统调中或调用非Go语言写的函数的goroutine需要一个独立的OS线程,但这个线程不计算在GOMAXPROCS

  • 那么问题来了,前面的两个小练习都是基于默认的GOMAXPROCS运行的,如果修改GOMAXPROCS会发生什么?
//GOMAXPROCS = 1
Creating goroutines 1000000
Creating finished 15.8037546s
Time:  405.9402ms
//GOMAXPROCS = 2
Creating goroutines 1000000
Creating finished 6.8087977s
Time:  10.2795211s

//GOMAXPROCS = 1
Counter: 3264829
//GOMAXPROCS = 2
Counter: 1572741

嘤嘤?

个人理解

  • 对于练习1 当只有一个线程的时候,goroutine的调度都是Go自己的调度器来实现的,因此成本比多个线程,靠OS来调度的成本低。导致,多个线程的时候性能反而没有一个线程好。但是创建goroutine的时候多线程就发挥了优势,但是更多线程的时候可能受制于内存的速度,导致创建goroutine的一个瓶颈。

  • 对于练习2 还是受制于OS调度开销的影响。这里开了两个goroutine但是两个不能同时运行,其中有一个处于阻塞状态,这样的话,如果开两个线程,去处理一个goroutine确实有种帮倒忙的感觉。

这两个例子就是给我们理解goroutine协程的轻量吧。

!!!此外goroutine调度的因素很多,运行时也在不断变化,这里的结果真的有时候差距很大。!!!

goroutine 没有标识

这个看的不是很懂。。。。呜呜...

结尾

goroutine线程的差距本质上是属于量变,但一个足够大的量会变成质变。

嘤嘤嘤

goroutine好用就对了。

Go 如何优雅的处理错误

Go Bible

Go 的错误机制

一开始接触 Go 的时候,对多返回值不以为然,在 C 语言中,直接返回一个结构体也可以实现类似多返回值的效果,在 C++就更不用说了。 但是,后面渐渐发现这种多返回值可以额外返回一个实现了error接口的值,可以给我们额外的判断信息。

这样的做法对我而言最大的好处就是,可以更快得上手一个没有见过的函数。 在 Golang 中,一个函数如果没有返回error,往往意味着我们不需要担心函数出错,即不用担心函数会失败。 而在 C 语言中,有些函数会返回NULL代表函数执行失败,比如说malloc,fopen等, 如果需要更具体的信息,我们可以从读取 C 预定义的错误变量来判断时候有错误产生,还可以使用perror来输出详细的错误文本。 所谓,C 更是一个隐式的错误处理,你完全可以直接忽略,但是后果需要自己承担。此外,必须在执行一个函数后立马检查,因为后续的函数可能会覆盖之前的信息。

而在,golang 中,这一切都是显式的,这就要求我们处理那些带有error返回的函数,尽管我处理的做换就是直接返回这个error或者更进一步,给error添加一点额外的信息后在返回。于是我们可以在 golang 随处见到如下代码

if err != nil {
    return err
}

一行代码一个错误

Go 常见的错误处理方法

下面我想说一说平时我是怎么处理error

  • 让它崩溃吧
if err != nil {
    log.Panic(err)
}
  • 直接返回
  • 添加一些附加信息
if err != nil {
    return fmt.Errorf("Error: %v",err)
}

这上面可能是我比较容易想到的,上面的处理很多都是不负责任,仅仅是把错误向上抛,而往往最终的结果也就是在log里面打印一条出错信息。 但是,这样在调试的时候非常不友好,有可能一个函数在很多个地方调用,最后输出的错误信息是一样的,非常不方便调试。 此外,一些err是带有附加结构体信息的,如果使用fmt.Errorf方式的话会丢失这些信息,此外,一些错误可能是莫一个哨兵方式存在,例如err == io.EOF这样的处理就不行了。

Go errors 库的演化

go1.13之前,errors库是非常简洁的,大概如下

package errors

func New(text string) error {
	return &errorString{text}
}

type errorString struct {
	s string
}

func (e *errorString) Error() string {
	return e.s
}

没错,就是怎么的简洁,仅仅是一个实现了error接口的结构体。但是,功能确实很实用。 不过,这样的缺少了许多的信息,我们虽然可以在一个string里面包含更多的错误信息, 但是这样仅仅是对于human而言。为此,更好的做法是,我们自定义自己的错误类型。

type MyError struct {
    caller string //调用函数
    typ string //错误类型
    msg string //错误消息
}
func (m myError) Error() string {
    return fmt.Sprintf("%s: %s from %s",m.typ,m.msg,m.caller)
}
func IsMyError(err error) bool {
    _,ok := err.(*MyError)
    return ok
}
  • 使用类型断言来识别错误
err := SomeFunc()
switch err {
    case nil:
        //nothing wrong
    case *MyError:
        //MyError handler
    default:
        //other error type
}

//或者
func MyFunc(){
    defer func(){
        err := recover()
        switch err := err.(type) {
            case nil:
                return
            case *myError:
                fmt.Printf("%v",err)
            default:
                panic(err)
        }
    }
    panic(&myError{
        typ: "MyErrorType",
        msg: "this is a test message",
        caller: "MyFunc",
    })
}

但是这样带来的另一个坏处就是,我们的结构体必须是要导出的,这样用户才能够使用。 如果有多个不同的结构体,就会导致这种形式的滥用。不过这种形式可以在包内自己使用,而不是暴露出来。

  • 假设错误的行为,而不是类型(Assert errors for behaviour, not type)

在 golang 中,interface就是莫一类行为的抽象,单反满足这类行为的结构体就实现了这个接口。 在 golang 中,常用的行为(接口)有io.Writer,io.Reader

假定错误的行为,这个是 Dave 最推崇的一种方式,名为Opaque Errors。 但是,我不是很理解这一个理念,或者说我根本没有使用过这样的方式。

//我们假设错误实现了某一种行为(接口)
type temporary interface{
    Temporary() bool
}
func IsTemporary(err error) bool {
    te,ok := err.(temporary)
    return ok && te.Temporary()
}

我觉得上面的方法写起来挺麻烦的,我更在乎error包含了那些信息,而不是error所具有那些行为。 虽然,上面的抽象很高,但是目前我还是用不来。


其实上面说了怎么多,我们无非是要在error上面传递更多的上下文,同时避免一大串的类型断言。 其实,已经有写好的包了github.com/pkg/errors 此外,golang 在1.13版本已经拓展了errors包 我们可以使用这些包,更加方便的携带上下文。

使用 pkg/errors

  • 链接

  • wrapper error(adding context to an erro)

jsonBytes, err := json.Marshal(tempMsg)
if err != nil {
	return errors.Wrap(err, "marshal")
}
// Wrap returns an error annotating err with a stack trace
// at the point Wrap is called, and the supplied message.
// If err is nil, Wrap returns nil.
func Wrap(err error, message string) error {
	if err == nil {
		return nil
	}
	err = &withMessage{
		cause: err,
		msg:   message,
	}
	return &withStack{
		err,
		callers(),
	}
}

这里我们看到Wrap的实现携带了stack信息,这样可以方便我们调试。

  • retrieve the cause of an error
type causer interface{
    Cause() error
}
//errors.Cause会递归解包直到没有实现`causer`接口的结构体
switch err := errors.Cause(err).(type) {
    case *MyError:
        //handle you own error type
    default:
        //do default
}

Golang 1.13 后拓展的 errors

errors引入了三个新函数

  • func Unwrap(err error) error
  • func Is(err, target error) bool
  • func As(err error, target interface{}) bool

没有wrap函数? 其实在fmt.Frintf("%w",err)里面 下面是使用的 demo

type myError struct {
	typ    string
	msg    string
	caller string
}
func (m myError) Error() string {
	return fmt.Sprintf("%s: %s from %s", m.typ, m.msg, m.caller)
}
func TestMyError() {
	defer func() {
		err := recover()
		switch err := err.(type) {
		case nil:
			return
		case *myError:
			fmt.Printf("MyError: %v\n\n", err)
		default:
			panic(err)
		}
	}()
	panic(&myError{
		typ:    "MyErrorType",
		msg:    "This is a test message",
		caller: "TestMyError",
	})
}
func main() {
	TestMyError()

	originErr := &myError{
		typ:    "myerror",
		msg:    "this is origin error",
		caller: "main",
	}

    fmt.Printf("%#v\n\n", originErr)
	wrapErr := fmt.Errorf("Wrap the original error: %w", originErr)
	fmt.Printf("%#v\n\n", wrapErr)

	if errors.Is(wrapErr, originErr) {
		fmt.Printf("it is origin error\n")
	} else {
		fmt.Printf("it is not the origin error\n")
	}

	err := errors.Unwrap(wrapErr)
	fmt.Printf("unwrap: %v\n", err)

	targetErr := &myError{}
	if errors.As(wrapErr, &targetErr) {
		fmt.Printf("target: %v", targetErr)
	}
}

go 标准库为我们提供了对error进行Wrap的操作,但还是需要我们自己来传递额外的上下文。

总结

这里的error处理绝对不是 golang 的error的终点

我相信 go2,会对这情况进行改进, 虽然 go2 的草案提供的error处理方案非常的ugly 但是我还是很期待go2的表现

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment