Skip to content

Instantly share code, notes, and snippets.

@DeadlySurgeon
Last active November 6, 2023 17:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DeadlySurgeon/d908808fad7cfb0a5ebee4b90c2e30f0 to your computer and use it in GitHub Desktop.
Save DeadlySurgeon/d908808fad7cfb0a5ebee4b90c2e30f0 to your computer and use it in GitHub Desktop.
Go Concurrency Outline

Concurrency

This talk will go over concurrency with Go, and hopefully scratch the surface of how to write thread safe code as well as efficient worker delegation. We will be using the bult in sync package.

Goroutines

If you've worked with concurrency in other languages, you might be familiar with threading. In Go, we do not manage threads like we would in C or in Java. Instead, we throw our concurrent code in whats known as a Goroutine. A Goroutine is a coroutine, a process that runs based on a scheduler. Go itself handles each routine and ensures that it runs in a timely and scheduled manor. Go can leverage multiple CPU cores and threads to run these coroutines, however as a developer running Go you will likely not need to care too much about this, and just ensure that your code is thread safe.

To launch a function in Go inside of a Goroutine, you may use the go keyword:

func process() { ... }

go process() // Launches in a goroutine

A Goroutine will run until the function provided to it is done working. Something to note is that if a panic rises in a Goroutine, it will go up the stack and terminate the application so ensure that if you think it might panic you add a defer func() { recover() }().

A fun fact about Goroutines is that they only cost about 2~4 KB on the heap, so you can spin up hundreds of thousands of them before you'll start noticing performance issues. This is one of the reasons that Go is so powerful for server side backend; it can scale up really efficiently for http muxes.

WaitGroups

(Written out of order so excuse other comments on Wait Groups being explained latter)

In Go, we have the sync package that contains a lot of helpers for writing safe, concurrent code. One of these invaluable helpers is the sync.WaitGroup. A Wait Group is just what it sounds like, a group to wait on. In the most basic of explanations, we may think of a Wait Group to be a safe counter. We count up as we add more async processes, count down as these processes close, and have somewhere that we can block until all processes are done. It is written to be thread safe.

Here is how a WaitGroup might be used:

wg := &sync.WaitGroup{}

// Spawn 10 workers
for i := 0; i < 10; i++ {
    wg.Add(1) // Mark that we've added a worker
    go func() {
        defer wg.Done() // Tell the Wait Group we're done
        // Do some kind of work
    }()
}

wg.Wait() // Block until the counter in the Wait Group is 0

While in this example I have written it where we inc on each loop, we can just as easily call wg.Add(10). One of the reasons you might find it inside of a for loop instead of outside, is incase we need to change the amount our for loop loops for without having to worry about forgetting to edit what we give wg.Add(x).

Locks and Mutexes

When we have two concurrent processes access a resource at the same time, it can lead to problems if one sets a value while one reads a value. There are different strategies to overcome these problems and the one we'll talk about now is Mutexes. A Mutex, is a program object that prevents multiple processes from accessing the same shared resource at the same time, by having a lock system. When a process wants to access a resource, it will request to lock the resource to ensure that it is the only one accessing it at that current time. Then, when the process is done accessing the resource it may unlock it, leading to the next process that requested a lock to perform the lock.

In Go, we have the sync package (https://pkg.go.dev/sync) to help us here. In this package, we have Mutex, as well as RWMutex. The regular Mutex struct is used for general locking, whereas the RWMutex can be used for efficient read and write locks, where the read lock calls do not block each other. We may get to RWMutexes later however now we will just cover regular mutexes.

(Sample)[https://go.dev/play/p/tIu9FMSApSO]

In this next sample, we will showcase what happens when we have multiple goroutines accessing a variable, altering and using it. In this example, each routine incs the variable i, and then prints it out. If you run this, you'll see that we don't have a consistent count up.

In this example we'll use a Wait Group. To briefly touch on a wait group, it simply allows us to add a (safe) counter, and mechanism for waiting until the counter reaches zero. We can use this to ensure that we wait for all of our Goroutines finish running.

package main

import (
	"fmt"
	"sync"
)

func main() {
	var i int

	wg := &sync.WaitGroup{}

	for ii := 0; ii < 5; ii++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 0; j < 5; j++ {
				i++
				fmt.Println(i)
			}
		}()
	}

	wg.Wait()
	fmt.Println(i)
}

If you run this sample, it is very likely you'll get mixed results on the output. Instead of getting a clean count, 1-25, you'll get some skipping. Here is an example of what I got:

1
5
7
6
9
...

Part of the reason we see this is because between these two lines of code:

i++
fmt.Println(i)

Between i++ and fmt.Println(i) another goroutine is incrementing i. So here is what happens:

Routine 1: Add 1 to i, changing it to 1
Routine 2: Add 1 to i, changing it to 2
Routine 1: Print i, outputting in 2

To fix this, we can now add mutexes. It will make sense to keep the mutex close to the variable we plan on locking, as well as in the correct scope. Here is where we can add Mutexes:

// You can edit this code!
// Click here and start typing.
package main

import (
	"fmt"
	"sync"
)

func main() {
	var lock sync.Mutex
	var i int

	wg := &sync.WaitGroup{}

	for ii := 0; ii < 5; ii++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 0; j < 5; j++ {
				lock.Lock()
				i++
				fmt.Println(i)
				lock.Unlock()
			}
		}()
	}

	wg.Wait()
	fmt.Println(i)
}

As you might see, we added a var lock sync.Mutex, and then access it later with lock.Lock() and lock.Unlock(). Now, our output will look like the following:

1
2
3
4
...

However, while this code works well for us, there is something to note about Mutexes: the way we're locking and unlocking is bad practice, and can result in whats known as Deadlocking, where all processes and routines stay locked. The correct way to write lock and unlocks, is to (almost) always follow a call to mutex.Lock() with a deferred unlock.

lock.Lock()
defer lock.Unlock()

This way, if our code panics or returns accidently betweeen lock and unlock, we can ensure we don't deadlock other processes. We must however look out for a few things, such as only unlocking once per lock in our process, and ensuring that we call this in a function that will actually execute the defer. If, in our example, we change our lock and unlock to the defer example without altering anything else, we'll get this:

for j := 0; j < 5; j++ {
	lock.Lock()
	defer lock.Unlock()
	i++
	fmt.Println(i)
}

This code will deadlock. Because defer only executes at the end of a scope, leading to the next call of our for loop to call Lock before our Unlock is called. So, we must either wrap our code in an anonymous function, or pull out our code into it's own function. In our example, we'll write an anonymous function.

// You can edit this code!
// Click here and start typing.
package main

import (
	"fmt"
	"sync"
)

func main() {
	var lock sync.Mutex
	var i int

	wg := &sync.WaitGroup{}

	for ii := 0; ii < 5; ii++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 0; j < 5; j++ {
				func() {
                    			lock.Lock()
					defer lock.Unlock()
					i++
	                		fmt.Println(i)
                		}()
			}
		}()
	}

	wg.Wait()
	fmt.Println(i)
}

Now, we'll see that we'll still get our expected results and our Unlock will be safe from panics or other problems.

You might notice that our code has now gotten to a point where it is several layers deep and less readable. It is recommended that we break out parts of our code into separate functions to make it more readable and easier to test.

Built In Channels

Go has a safe queue system built into the system known as Channels. Channels are a FIFO, optionally buffered thread safe queue. To initialize a channel, we use the make function:

c := make(chan int)

This will make an unbuffered channel that handles the int type. Unbuffered, meaning that the writers will be blocked until a read can be called.

c := make(chan int)
c <- 5

This example deadlocks, since there is nothing to read c.

To make a buffered channel, we can add the buffer size as a second parameter to the make directive.

c := make(chan int, 1)
c <- 5

This example won't lock, because we have a buffer for c <- 5 to go into.

Channels instead of Mutexes

In some concurrent cases, you might find channels be used instead of Mutexes. The idea being is that you give your workers a channel to report on, and then have a process that reads from that channel to do the report. Below is an example:

package main

import (
	"fmt"
	"sync"
)

type Work struct {
	WorkerID int
	Work     int
}

func main() {
	workReturn := make(chan Work, 5)
	defer close(workReturn) // Ensure that our resource is closed

	go func() {
		// Will run into workReturn is closed.
		for work := range workReturn {
			fmt.Printf("Worker(%v): %v\n", work.WorkerID, work.Work)
		}
	}()

	// Waitgroup for workers
	wg := &sync.WaitGroup{}
	for i := 0; i < 5; i++ {
		wg.Add(1)

        // Workers as anonymous functions
		go func(id int) {
			defer wg.Done()
			for i := 0; i < 5; i++ {
				workReturn <- Work{WorkerID: id, Work: i}
			}
		}(i)
	}

	wg.Wait()
}

This example is a bit complicated, but should help showcase workers doing "work" and reporting back via our channel. There are a few things missing from here in this example, however, such as scope blocking. When the main function returns, all routines are discarded and all work will be lost. One of the things we'll need to add (later) is a mechanism to ensure that all work has been read from workReturn before we return from main. We can do this with contexts or channels.

A trick with channels, is that if you are trying to read from a blocked channel, and that channel is closed, your block becomes unblocked. We can use this as a trigger to tell when something finishes, however we definitely need to be aware that it will require us to forego defers in some cases, and will result in (significantly) more complex code.

package main

import (
	"fmt"
	"sync"
)

type Work struct {
	WorkerID int
	Work     int
}

func main() {
	workReturn := make(chan Work, 5)
	workProcessCloser := make(chan struct{})

	var workSeen int

	go func() {
		defer close(workProcessCloser)
		// Will run into workReturn is closed.
		for work := range workReturn {
			workSeen++
			fmt.Printf("Worker(%v): %v\n", work.WorkerID, work.Work)
		}
	}()

	// Waitgroup for workers
	wg := &sync.WaitGroup{}
	for i := 0; i < 5; i++ {
		wg.Add(1)

		// Workers as anonymous functions
		go func(id int) {
			defer wg.Done()
			for i := 0; i < 5; i++ {
				workReturn <- Work{WorkerID: id, Work: i}
			}
		}(i)
	}
	wg.Wait()           // Wait for all work
	close(workReturn)   // Close work return so the forloop above ends after all data is out
	<-workProcessCloser // Wait for work processor to be done

	fmt.Println("Work Seen:", workSeen)
}

To try to explain what happens:

  • We spawn our work processor with a defer statement on close(workProcessCloser) which will execute when our processor is done
  • We spawn our workers
  • We block on Wait()
  • Workers process all work, and write to the workReturn queue
  • Work Proccessor processes work, increases workSeen variable
  • All workers are done processing the work, Wait() is no longer blocking
  • We close workReturn
  • We block on workProcessCloser
  • The work processor runs out of work, for loop finishes
  • The work processor closes workProcessCloser
  • workProcessCloser stops blocking, our main function returns
  • We print out workSeen since we know it's safe now

It might take a while for some people to get their head wrapped around all of this, and while Go does a lot to make concurrency easier, it is still may be challenging to implement safely and securely. An important factor to any concurrent code is to ensure vigorous testing and peer review.

RWMutexes

Since I've already gone over how Mutexs work, this will be rather brief and without code example. A RWMutex, is just a mutex that shares locks when a READ happens. RWMutex has two sets of locks and unlocks: RLock, RUnlock, Lock, Unlock. When you do a READ, you may use RLock and RUnlock, so that other calls to READ aren't blocked, while they will block calls to Lock. This can be useful if you plan on having a significantly higher amount of reading over writing. The same will be said with Lock, where if you do Lock, all calls to RLock will block until Unlock is called.

Notes on locking

It is important to know that when a resource is locked, it is blocking other processes. If you have a long running task, you need to ensure that you aren't blocking for longer than you need to, else you may end up with significant performance degradation.

Consider the following:

var resourceLock sync.Mutex
var resources map[string]bool

func ProcessResource(uuid string) error {
    resourceLock.Lock()
    defer resourceLock.Unlock()

    resource, err := getResource(uuid)
    if err != nil {
        return err
    }

    // Mark Resource as seen
    resources[resource.ID] = true

    return nil
}

In this imaginary scenario, getResource makes an HTTP call to the outside world. It could even make a call to a database. But imagine that it takes 5 whole seconds for whatever reason. Any other call to ProcessResource will now be waiting. If we move the lock to only where we need it, we can cut down on that time where other processes will be blocked:

func ProcessResource(uuid string) error {
    resource, err := getResource(uuid)
    if err != nil {
        return err
    }

    resourceLock.Lock()
    defer resourceLock.Unlock()

    // Mark Resource as seen
    resources[resource.ID] = true

    return nil
}

Now, our function doesn't block other functions for longer than it needs to.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment