Skip to content

Instantly share code, notes, and snippets.

@brandur
Last active April 15, 2024 15:48
Show Gist options
  • Star 19 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save brandur/0ed11ad9480809aad0dacecfcac41790 to your computer and use it in GitHub Desktop.
Save brandur/0ed11ad9480809aad0dacecfcac41790 to your computer and use it in GitHub Desktop.
Go Pipelines

Go Pipelines

This document demonstrates a basic pipeline in Go and talks about a risk in in implementing them. Keep in mind that:

  • Using FizzBuzz here is contrived, but quite a nice way to demonstrate the concept. Obviously there is no advantage to running the basic workload of FizzBuzz in a Goroutine, but if the task involved heavy computation or long lived I/O, it starts to make a lot more sense.
  • The problem discussed only applies to Goroutines that actually need to have a way to stop. If they're ran for the entire duration of a program, then this doesn't apply.

The patterns are re-used from this official Go blog post on pipelines, with some additional commentary.

FizzBuzz

Here's a very basic demonstration of FizzBuzz using a pipeline pattern in Go (which amounts to a Goroutine and a channel):

package main

import "fmt"

type Result struct {
	num int
	display string
}

func fizzbuzz(out chan Result) {
	for num := 1; ; num++ {
		switch {
		case num%3 == 0 && num%5 == 0:
			out <- Result{num, "FizzBuzz"}
		case num%3 == 0:
			out <- Result{num, "Fizz"}
		case num%5 == 0:
			out <- Result{num, "Buzz"}
		default:
			out <- Result{num, fmt.Sprintf("%d", num)}
		}
	}
}

func main() {
	out := make(chan Result)

	go fizzbuzz(out)

	for res := range out {
		if res.num >= 100 {
			break
		}

		fmt.Println(res.display)
	}
}

We create a channel to receive results on (out), then fire up a Goroutine (fizzbuzz), and start iterating through the results. When we've had enough (i.e. hit our upper bound of 100), we break the loop and exit.

This works great:

...
88
89
FizzBuzz
91
92
Fizz
94
Buzz
Fizz
97
98
Fizz

Make the Generator Stoppable

The above works fine if we're just trying to demonstrate it with a simple run of our program, but we leak our Goroutine because it's never shut down. If we want to make our FizzBuzz pipeline re-usable within the lifetime of the program, we need to introduce some way to stop it so that it's resources can be reclaimed.

Go's philosphy when it comes to Goroutines is that they should end their own execution, rather than being ended by a different context, so we can introduce the idea of a stop channel (done) to end the pipeline's execution:

package main

import (
	"fmt"
	"time"
)

type Result struct {
	num     int
	display string
}

func fizzbuzz(out chan Result, done chan struct{}) {
	for num := 1; ; num++ {
		select {
		case <-done:
			break
		default:
		}

		switch {
		case num%3 == 0 && num%5 == 0:
			out <- Result{num, "FizzBuzz"}
		case num%3 == 0:
			out <- Result{num, "Fizz"}
		case num%5 == 0:
			out <- Result{num, "Buzz"}
		default:
			out <- Result{num, fmt.Sprintf("%d", num)}
		}
	}

	fmt.Println("Left FizzBuzz.")
}

func main() {
	out := make(chan Result)
	done := make(chan struct{})

	go fizzbuzz(out, done)

	for res := range out {
		if res.num >= 100 {
			break
		}

		fmt.Println(res.display)
	}

	close(done)

	// Sleep to give fizzbuzz a chance to print and return.
	time.Sleep(1 * time.Second)
}

We use Go's close as a signal that the fizzbuzz Goroutine should end execution by taking advantage of the fact that a Goroutine waiting on a channel that closes immediately falls through with its type's zero value. close is a superior approach compared to injecting a "done" value into the channel because it signals all waiting Goroutines no matter how many there happen to be.

Experienced Go users will notice the problem above immediately: we've introduced a race condition with the possibility of a leaked Goroutine. Although there's a chance that the program will end as expected, more usually, the pipeline Goroutine will continue after its injected a result into out, loop around, go right passed its check on done (which hasn't been closed yet), and block on sending the next result to out. Every time we use fizzbuzz we'll leak a new Goroutine, until eventually our program runs out of memory and crashes.

Check All Blocking Operations

The solution to the problem above is the use of select every time we perform a potentially blocking operation (for example, injecting a value into a channel). Here's a working version of the same program that doesn't leak a Goroutine:

package main

import (
	"fmt"
	"time"
)

type Result struct {
	num     int
	display string
}

func fizzbuzz(out chan Result, done chan struct{}) {
outerLoop:
	for num := 1; ; num++ {
		switch {
		case num%3 == 0 && num%5 == 0:
			select {
			case <-done:
				break outerLoop
			case out <- Result{num, "FizzBuzz"}:
			}
		case num%3 == 0:
			select {
			case <-done:
				break outerLoop
			case out <- Result{num, "Fizz"}:
			}
		case num%5 == 0:
			select {
			case <-done:
				break outerLoop
			case out <- Result{num, "Buzz"}:
			}
		default:
			select {
			case <-done:
				break outerLoop
			case out <- Result{num, fmt.Sprintf("%d", num)}:
			}
		}
	}

	fmt.Println("Left FizzBuzz.")
}

func main() {
	out := make(chan Result)
	done := make(chan struct{})

	go fizzbuzz(out, done)

	for res := range out {
		if res.num >= 100 {
			break
		}

		fmt.Println(res.display)
	}

	close(done)

	// Sleep to give fizzbuzz a chance to print and return.
	time.Sleep(1 * time.Second)
}

The key takeaway here is that we should check done on every potentially blocking operation unless we know for sure that a leak on the operation is impossible based on how the caller runs. While this does work, it also introduces some risk of error in that it becomes fairly easy to introduce a Goroutine leak without perfect understanding of the code (say when its modified by a developer who isn't the original author). Good commenting and general experience with the pipeline pattern can help mitigate this risk, but fundamentally the language doesn't offer any facilities to ensure its safety.

In this case, we can easily simplify the code significantly, but keep in mind that given a program that's less trivial in that it involves more a complex control flow and different types of messages being passed into the channel, this won't be quite as easy.

(Here's a better-written version of the above:)

package main

import (
	"fmt"
	"time"
)

type Result struct {
	num     int
	display string
}

func fizzbuzz(out chan Result, done chan struct{}) {
outerLoop:
	for num := 1; ; num++ {
		var res Result

		switch {
		case num%3 == 0 && num%5 == 0:
			res = Result{num, "FizzBuzz"}
		case num%3 == 0:
			res = Result{num, "Fizz"}
		case num%5 == 0:
			res = Result{num, "Buzz"}
		default:
			res = Result{num, fmt.Sprintf("%d", num)}
		}

		select {
		case <-done:
			break outerLoop
		case out <- res:
		}
	}

	fmt.Println("Left FizzBuzz.")
}

func main() {
	out := make(chan Result)
	done := make(chan struct{})

	go fizzbuzz(out, done)

	for res := range out {
		if res.num >= 100 {
			break
		}

		fmt.Println(res.display)
	}

	close(done)

	// Sleep to give fizzbuzz a chance to print and return.
	time.Sleep(1 * time.Second)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment