Skip to content

Instantly share code, notes, and snippets.

@mugli
Created May 18, 2021 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mugli/aa0ab904c6e60be3b6e4ddb97a5ba1b0 to your computer and use it in GitHub Desktop.
Save mugli/aa0ab904c6e60be3b6e4ddb97a5ba1b0 to your computer and use it in GitHub Desktop.
Concurrency - "Learning Go" book
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
v := 1
ch1 <- v
v2 := <-ch2
fmt.Println(v, v2)
}()
v := 2
var v2 int
select {
case ch2 <- v:
case v2 = <-ch1:
}
fmt.Println(v, v2)
}
// Cancelling with a closure allows us to perform additional clean-up work, if needed.
func countTo(max int) (<-chan int, func()) {
ch := make(chan int)
done := make(chan struct{})
cancel := func() {
close(done)
}
go func() {
for i := 0; i < max; i++ {
select {
case <-done:
return
default:
ch <- i
}
}
close(ch)
}()
return ch, cancel
}
func main() {
ch, cancel := countTo(10)
for i := range ch {
if i > 5 {
break
}
fmt.Println(i)
}
cancel()
}
  • If you are coordinating goroutines or tracking a value as it is transformed by a series of goroutines, use channels.

  • If you are sharing access to a field in a struct, use mutexes.

  • If you discover a critical performance issue when using channels, and you cannot find any other way to fix the issue, modify your code to use a mutex.

Another issue is that mutexes in Go aren’t reentrant. If a goroutine tries to acquire the same lock twice, it deadlocks, waiting for itself to release the lock. This is different from languages like Java, where locks are reentrant.

Nonreentrant locks make it tricky to acquire a lock in a function that calls itself recursively. You must release the lock before the recursive function call.

In general, be careful when holding a lock while making a function call, because you don’t know what locks are going to be acquired in those calls. If your function calls another function that tries to acquire the same mutex lock, the goroutine deadlocks.

// If you have two goroutines that both access the same two channels,
// they must be accessed in the same order in both goroutines, or they will deadlock.
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
v := 1
ch1 <- v
v2 := <-ch2
fmt.Println(v, v2)
}()
v := 2
ch2 <- v
v2 := <-ch1
fmt.Println(v, v2)
}
// If you are waiting for a single goroutine, you can use the done channel pattern.
// But if you are waiting on several goroutines, you need to use a WaitGroup
func searchData(s string, searchers []func(string) []string) []string {
done := make(chan struct{})
result := make(chan []string)
for _, searcher := range searchers {
go func(searcher func(string) []string) {
select {
case result <- searcher(s):
case <-done:
}
}(searcher)
}
r := <-result
// This signals to the goroutines that they should exit, preventing them from leaking.
close(done)
return r
}
// We have a function that calls three web services.
// We send data to two of those services,
// and then take the results of those two calls
// and send them to the third, returning the result.
// The entire process must take less than 50 milliseconds, or an error is returned.
func GatherAndProcess(ctx context.Context, data Input) (COut, error) {
ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancel()
p := processor{
outA: make(chan AOut, 1),
outB: make(chan BOut, 1),
inC: make(chan CIn, 1),
outC: make(chan COut, 1),
errs: make(chan error, 2),
}
p.launch(ctx, data)
inputC, err := p.waitForAB(ctx)
if err != nil {
return COut{}, err
}
p.inC <- inputC
out, err := p.waitForC(ctx)
return out, err
}
type processor struct {
outA chan AOut
outB chan BOut
outC chan COut
inC chan CIn
errs chan error
}
func (p *processor) launch(ctx context.Context, data Input) {
go func() {
aOut, err := getResultA(ctx, data.A)
if err != nil {
p.errs <- err
return
}
p.outA <- aOut
}()
go func() {
bOut, err := getResultB(ctx, data.B)
if err != nil {
p.errs <- err
return
}
p.outB <- bOut
}()
go func() {
select {
case <-ctx.Done():
return
case inputC := <-p.inC:
cOut, err := getResultC(ctx, inputC)
if err != nil {
p.errs <- err
return
}
p.outC <- cOut
}
}()
}
func (p *processor) waitForAB(ctx context.Context) (CIn, error) {
var inputC CIn
count := 0
for count < 2 {
select {
case a := <-p.outA:
inputC.A = a
count++
case b := <-p.outB:
inputC.B = b
count++
case err := <-p.errs:
return CIn{}, err
case <-ctx.Done():
return CIn{}, ctx.Err()
}
}
return inputC, nil
}
func (p *processor) waitForC(ctx context.Context) (COut, error) {
select {
case out := <-p.outC:
return out, nil
case err := <-p.errs:
return COut{}, err
case <-ctx.Done():
return COut{}, ctx.Err()
}
}
func countTo(max int) <-chan int {
ch := make(chan int)
go func() {
for i := 0; i < max; i++ {
ch <- i
}
close(ch)
}()
return ch
}
// if we exit the loop early, the goroutine blocks forever, waiting for a value to be read from the channel
func main() {
for i := range countTo(10) {
if i > 5 {
break
}
fmt.Println(i)
}
}
type PressureGauge struct {
ch chan struct{}
}
func New(limit int) *PressureGauge {
// we create a struct that contains a buffered channel with a number of “tokens” and a function to run.
ch := make(chan struct{}, limit)
for i := 0; i < limit; i++ {
ch <- struct{}{}
}
return &PressureGauge{
ch: ch,
}
}
func (pg *PressureGauge) Process(f func()) error {
// The select tries to read a token from the channel.
// If it can, the function runs, and the token is returned.
// If it can’t read a token, the default case runs, and an error is returned instead.
select {
case <-pg.ch:
f()
pg.ch <- struct{}{}
return nil
default:
return errors.New("no more capacity")
}
}
// Using it in a http server
func doThingThatShouldBeLimited() string {
time.Sleep(2 * time.Second)
return "done"
}
func main() {
pg := New(10)
http.HandleFunc("/request", func(w http.ResponseWriter, r *http.Request) {
err := pg.Process(func() {
w.Write([]byte(doThingThatShouldBeLimited()))
})
if err != nil {
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte("Too many requests"))
}
})
http.ListenAndServe(":8080", nil)
}
func timeLimit() (int, error) {
var result int
var err error
done := make(chan struct{})
go func() {
result, err = doSomeWork()
close(done)
}()
select {
case <-done:
return result, err
case <-time.After(2 * time.Second):
return 0, errors.New("work timed out")
}
}
// Reading from or writing to a nil channel causes your code to hang forever.
// in and in2 are channels, done is a done channel.
for {
select {
case v, ok := <-in:
if !ok {
in = nil // the case will never succeed again!
continue
}
// process the v that was read from in
case v, ok := <-in2:
if !ok {
in2 = nil // the case will never succeed again!
continue
}
// process the v that was read from in2
case <-done:
return
}
}
func processAndGather(in <-chan int, processor func(int) int, num int) []int {
out := make(chan int, num)
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func() {
defer wg.Done()
for v := range in {
out <- processor(v)
}
}()
}
go func() {
wg.Wait()
close(out)
}()
var result []int
for v := range out {
result = append(result, v)
}
return result
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment