Skip to content

Instantly share code, notes, and snippets.

@jdh747
Last active September 15, 2023 02:24
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save jdh747/e8015c77186d790cf6d6ad63a5e6bb99 to your computer and use it in GitHub Desktop.
Save jdh747/e8015c77186d790cf6d6ad63a5e6bb99 to your computer and use it in GitHub Desktop.
Go Concurrency Patterns: Google IO 2012

Concurrency Patterns in Go

Notes taken from Rob Pike's presentation 'Google I/O 2012 - Go Concurrency Patterns'.

1. Generator: function that returns a channel

The code is as follows:

c := boring("boring!")  // function returning a channel

for i := 0; i < 5; i++ {
    fmt.Printf("You say: %q\n", <-c)
}

fmt.Println("You're boring; I'm leaving.")

func boring(msg string) <-chan string {  // returns receive-only channel of strings
    c := make(chan string)

    go func() {  // we launch the goroutine from inside the function
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s, %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()

    return c  // return the channel to the caller
}

The generator function act as a service, returning a channel that lets us communicate with the underlying service provided. We can even create multiple instances/routines, each operating concurrently and returning values independently like so.

func main() {
    joe := boring("Joe")
    ann := boring("Ann")

    for i := 0; i < 5; i++ {
        fmt.Println(<-joe)
        fmt.Println(<-ann)
    }

    fmt.Println("You're both boring; I'm leaving.")
}

The above approach blocks both Joe and Ann until the other has returned and released the routine. The answer to that: multiplexing with a fan-in function.

2. Multiplexing: fan-in function to stitch two channels together

We've already established our func boring(string) <-chan string, now we multiplex two channels together with a fan-in.

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)

    go func() { for { c <- <-input1 } }()  // todo: why do we double the channel operator?
    go func() { for { c <- <-input2 } }()

    return c
}

func main() {
    c := fanIn(boring("Joe"), boring("Ann"))

    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }

    fmt.Println("You're both boring; I'm leaving")
}

Now, the boring services for Ann and Joe aren't tied together and we retrieve data as it's available -- the services can independently execute.

3. Restore Sequencing: construct message types with blocking fields

Sometimes we need to re-sync concurrent, uncoupled goroutines. To achieve this, we can pass a wait channel on another channel, shown below with the type Message struct that contains a message string and a boolean channel for blocking. So the goroutine (boring) creates a channel within the channel that enables us to tell it to wait its turn. Receive all messages, then enable them again.

func boring(msg string) <-chan Message {
    c := make(chan Message)
    waitForIt := make(chan bool)

    go func() {
        for i := 0; ; i++ {
            c <- Message{ fmt.Sprintf("%s, %d", msg, i), waitForIt}
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
            <-waitForIt  // boring goroutines now block here until we set the Message#wait boolean field
        }
    }()

    return c  // return the channel to the caller
}

func fanIn(input1, input2 <-chan Message) <-chan Message {
    c := make(chan Message)

    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()

    return c
}

type Message struct {
    str string
    wait chan bool
}

func main() {
    c := fanIn(boring("Joe"), boring("Ann"))

    for i := 0; i < 5; i++ {
        msg1 := <- c; fmt.Println(msg1.str)
        msg2 := <- c; fmt.Println(msg2.str)

        msg1.wait <- true  // Setting this unblocks `boring` to now iterate through the `for` loop again
        msg2.wait <- true
    }

    fmt.Println("You're both boring; I'm leaving")
}

4. The Select Statement: switch-like control structure for concurrency that enables us to proceed based on what communication is available at any given moment

  • Like a switch, but each case is a communication -- a channel operation (send/receive).
  • Select blocks until a communication can proceed, then does. Note: pseudo-random when multiple can proceed.
  • The default proceeds if non other can.
select {
    case v1 := <- c1:
        fmt.Printf("Received %v from c1\n", v1)
    case v2 := <- c2:
        fmt.Printf("Received %v from c2\n", v1)
    case c3 <- 23:
        fmt.Printf("Sent %v from c3\n", 23)
    default:
        fmt.Printf("No one ready")
}

5. Fan-in Select: reduce the number of goroutines from the prior example

Refactoring our fan-in from #2, we can take:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)

    go func() { for { c <- <-input1 } }()  // todo: why do we double the channel operator?
    go func() { for { c <- <-input2 } }()

    return c
}

And make:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)

    go func() { 
        for {
            select {
                case s:= <- input1: c <- s
                case s:= <- input2: c <- s                
            }
        }        
    }()

    return c
}

Now, rather than create two separate goroutined, we create a single goroutine and use the select to multiplex the received channels into the single returned channel.

6. Timeout Select

We can use the time.After(...) function, which returns a channel, to terminate iterations of the for-select loop.

func main() {
    c := boring("Joe")

    for {
        select {
            case s := <- c:
                fmt.Println(s)

            case <- time.After(1 * time.Second):  // We kill the loop if we don't receive a message from the boring("Joe") within 1sec per iteration
                fmt.Println("You're too slow")
                return                
        }
    }
}

We can perform a similar operation but with a total of 5sec time elapsed over the entire loop, rather than the 1sec per message in the prior example.

func main() {
    c := boring("Joe")
    timeout := time.After(5 * time.Second):  // We kill the loop after 5sec total

    for {
        select {
            case s := <- c:
                fmt.Println(s)

            case <- timeout
                fmt.Println("You talk too much")
                return                
        }
    }
}

7. Quit Channel: passed to the goroutine

We can pass a quit channel to out routine which tells the routine to finish.

func boring(msg string, quit chan bool) <-chan String {
    c := make(chan string)

    go func() {  // we launch the goroutine from inside the function
        for i := 0; ; i++ {
            select {
                case c <- fmt.Sprintf("%s, %d", msg, i):
                    // Do nothing
                case <- quite:
                    return  // Parent routine tells us to finish, so we return from the goroutine
            }
        }
    }()

    return c  // return the channel to the caller
}

func main() {
    quit := make(chan bool)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
    quite <- true  // Tell the routine to finish
}

8. Quite Channel: two-way 'wrap it up' communication

The example above ends the goroutine rather abruptly the second we pass a value to the quit channel. What it we wanted to allow the routine to wrap things up? Well, the channel offers two-way communication, so we can wait for confirmation from the routine like so.

func boring(msg string, quit chan string) <-chan String {
    c := make(chan string)

    go func() {  // we launch the goroutine from inside the function
        for i := 0; ; i++ {
            select {
                case c <- fmt.Sprintf("%s, %d", msg, i):
                    // Do nothing
                case <- quite:
                    cleanup()
                    quit <- "See you!"
                    return
            }
        }
    }()

    return c  // return the channel to the caller
}

func main() {
    quit := make(chan string)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
    quite <- true  // Tell the routine to finish
    fmt.Printf("Joe says: %q\n", <-quit) // This blocks main until the goroutine confirms it's done
}

9. Google Search: A fake framework

Lets simulate a search function.

The below runs sequentially. There's no element of concurrency present in this iteration. We construct 3 search functions that conform to the type Search form.

var (
    Web = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
)

type Search func(query string) Result

func fakeSearch(kind string) Search {
    return func(query string) Result {
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
        return Result(fmt.Sprintf("%s result for %q\n", kind, query))
    }
}

func Google(query string) (results []Result) {
    results = append(results, Web(query))  // Currently, we block in each of these search queries
    results = append(results, Image(query))
    results = append(results, Video(query))

    return
}

func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
}

Alternatively, we could refactor the Google function to run in a goroutine and run web, image and video searches concurrently, then wait on all results.

func Google(query string) (results []Result) {
    c := make(chan Result)

    // Spin each of the searches off into their own goroutine and pipe all results back to the channel created above
    go func() { c <- Web(query)) }()
    go func() { c <- Image(query)) }()
    go func() { c <- Video(query)) }()

    // Pull each result out of the channel as it becomes available and append it to the results splice returned
    for i := 0; i < 3; i++ {
        result := <- c
        results = append(results, result)
    }

    return
}

Here we only wait for the slowest search to complete. But what if we want to timeout communication if a server isn't returning quick enough?... Apply timeout and select and refactor the Google(...) function again.

func Google(query string) (results []Result) {
    c := make(chan Result)

    // Spin each of the searches off into their own goroutine and pipe all results back to the channel created above
    go func() { c <- Web(query)) }()
    go func() { c <- Image(query)) }()
    go func() { c <- Video(query)) }()

    // Now we pull each Result from the channel, but timeout if searches take longer than 80ms
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
            case result <- c:
                results = append(results, result)
            case <- timeout:
                fmt.Println("Timed out")
                return
        }
    }

    return
}

This could be annoying -- what if we keep killing slow servers before results are returned? We can avoid timeout by replicating servers and returning the first response.

// Returns the first response received from one of the many replica Search functions received
func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }  // A function that abstract calling a given Search function with the given query, then piping the result into channel c

    // Iterate over all the given replica Search functions and spin off into their own goroutines
    for i := range replicas {
        go searchReplica(i)
    }

    // Return the first result received only
    return <- c
}

Lets add all this together to create a new, final Google function that combines multiplexing multiple channels, timeouts, and replicated backend searches

func Google(query string) (results []Result) {
    c := make(chan Result)

    go func() { c <- First(query, Web1, Web2) }()
    go func() { c <- First(query, Image1, Image2) }()
    go func() { c <- First(query, Video1, Video2) }()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
            case result <- c:
                results = append(results, result)
            case <- timeout:
                fmt.Println("Timed out")
                return
        }
    }

    return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment