This talk will go over concurrency with Go, and hopefully scratch the surface of
how to write thread safe code as well as efficient worker delegation. We will be
using the bult in sync
package.
If you've worked with concurrency in other languages, you might be familiar with threading. In Go, we do not manage threads like we would in C or in Java. Instead, we throw our concurrent code in whats known as a Goroutine. A Goroutine is a coroutine, a process that runs based on a scheduler. Go itself handles each routine and ensures that it runs in a timely and scheduled manor. Go can leverage multiple CPU cores and threads to run these coroutines, however as a developer running Go you will likely not need to care too much about this, and just ensure that your code is thread safe.
To launch a function in Go inside of a Goroutine, you may use the go
keyword:
func process() { ... }
go process() // Launches in a goroutine
A Goroutine will run until the function provided to it is done working.
Something to note is that if a panic rises in a Goroutine, it will go up the
stack and terminate the application so ensure that if you think it might panic
you add a defer func() { recover() }()
.
A fun fact about Goroutines is that they only cost about 2~4 KB on the heap, so you can spin up hundreds of thousands of them before you'll start noticing performance issues. This is one of the reasons that Go is so powerful for server side backend; it can scale up really efficiently for http muxes.
(Written out of order so excuse other comments on Wait Groups being explained latter)
In Go, we have the sync package that contains a lot of helpers for writing
safe, concurrent code. One of these invaluable helpers is the sync.WaitGroup
.
A Wait Group is just what it sounds like, a group to wait on. In the most basic
of explanations, we may think of a Wait Group to be a safe counter. We count up
as we add more async processes, count down as these processes close, and have
somewhere that we can block until all processes are done. It is written to be
thread safe.
Here is how a WaitGroup might be used:
wg := &sync.WaitGroup{}
// Spawn 10 workers
for i := 0; i < 10; i++ {
wg.Add(1) // Mark that we've added a worker
go func() {
defer wg.Done() // Tell the Wait Group we're done
// Do some kind of work
}()
}
wg.Wait() // Block until the counter in the Wait Group is 0
While in this example I have written it where we inc on each loop, we can just
as easily call wg.Add(10)
. One of the reasons you might find it inside of a
for loop instead of outside, is incase we need to change the amount our for loop
loops for without having to worry about forgetting to edit what we give
wg.Add(x)
.
When we have two concurrent processes access a resource at the same time, it can lead to problems if one sets a value while one reads a value. There are different strategies to overcome these problems and the one we'll talk about now is Mutexes. A Mutex, is a program object that prevents multiple processes from accessing the same shared resource at the same time, by having a lock system. When a process wants to access a resource, it will request to lock the resource to ensure that it is the only one accessing it at that current time. Then, when the process is done accessing the resource it may unlock it, leading to the next process that requested a lock to perform the lock.
In Go, we have the sync package (https://pkg.go.dev/sync) to help us here. In this package, we have Mutex, as well as RWMutex. The regular Mutex struct is used for general locking, whereas the RWMutex can be used for efficient read and write locks, where the read lock calls do not block each other. We may get to RWMutexes later however now we will just cover regular mutexes.
(Sample)[https://go.dev/play/p/tIu9FMSApSO]
In this next sample, we will showcase what happens when we have multiple goroutines accessing a variable, altering and using it. In this example, each routine incs the variable i, and then prints it out. If you run this, you'll see that we don't have a consistent count up.
In this example we'll use a Wait Group. To briefly touch on a wait group, it simply allows us to add a (safe) counter, and mechanism for waiting until the counter reaches zero. We can use this to ensure that we wait for all of our Goroutines finish running.
package main
import (
"fmt"
"sync"
)
func main() {
var i int
wg := &sync.WaitGroup{}
for ii := 0; ii < 5; ii++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 5; j++ {
i++
fmt.Println(i)
}
}()
}
wg.Wait()
fmt.Println(i)
}
If you run this sample, it is very likely you'll get mixed results on the output. Instead of getting a clean count, 1-25, you'll get some skipping. Here is an example of what I got:
1
5
7
6
9
...
Part of the reason we see this is because between these two lines of code:
i++
fmt.Println(i)
Between i++
and fmt.Println(i)
another goroutine is incrementing i
. So
here is what happens:
Routine 1: Add 1 to i, changing it to 1
Routine 2: Add 1 to i, changing it to 2
Routine 1: Print i, outputting in 2
To fix this, we can now add mutexes. It will make sense to keep the mutex close to the variable we plan on locking, as well as in the correct scope. Here is where we can add Mutexes:
// You can edit this code!
// Click here and start typing.
package main
import (
"fmt"
"sync"
)
func main() {
var lock sync.Mutex
var i int
wg := &sync.WaitGroup{}
for ii := 0; ii < 5; ii++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 5; j++ {
lock.Lock()
i++
fmt.Println(i)
lock.Unlock()
}
}()
}
wg.Wait()
fmt.Println(i)
}
As you might see, we added a var lock sync.Mutex
, and then access it later
with lock.Lock()
and lock.Unlock()
. Now, our output will look like the
following:
1
2
3
4
...
However, while this code works well for us, there is something to note about
Mutexes: the way we're locking and unlocking is bad practice, and can result in
whats known as Deadlocking, where all processes and routines stay locked. The
correct way to write lock and unlocks, is to (almost) always follow a call to
mutex.Lock()
with a deferred unlock.
lock.Lock()
defer lock.Unlock()
This way, if our code panics or returns accidently betweeen lock and unlock, we can ensure we don't deadlock other processes. We must however look out for a few things, such as only unlocking once per lock in our process, and ensuring that we call this in a function that will actually execute the defer. If, in our example, we change our lock and unlock to the defer example without altering anything else, we'll get this:
for j := 0; j < 5; j++ {
lock.Lock()
defer lock.Unlock()
i++
fmt.Println(i)
}
This code will deadlock. Because defer
only executes at the end of a scope,
leading to the next call of our for
loop to call Lock
before our Unlock
is
called. So, we must either wrap our code in an anonymous function, or pull out
our code into it's own function. In our example, we'll write an anonymous
function.
// You can edit this code!
// Click here and start typing.
package main
import (
"fmt"
"sync"
)
func main() {
var lock sync.Mutex
var i int
wg := &sync.WaitGroup{}
for ii := 0; ii < 5; ii++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 5; j++ {
func() {
lock.Lock()
defer lock.Unlock()
i++
fmt.Println(i)
}()
}
}()
}
wg.Wait()
fmt.Println(i)
}
Now, we'll see that we'll still get our expected results and our Unlock will be safe from panics or other problems.
You might notice that our code has now gotten to a point where it is several layers deep and less readable. It is recommended that we break out parts of our code into separate functions to make it more readable and easier to test.
Go has a safe queue system built into the system known as Channels. Channels are
a FIFO, optionally buffered thread safe queue. To initialize a channel, we use
the make
function:
c := make(chan int)
This will make an unbuffered channel that handles the int
type. Unbuffered,
meaning that the writers will be blocked until a read can be called.
c := make(chan int)
c <- 5
This example deadlocks, since there is nothing to read c
.
To make a buffered channel, we can add the buffer size as a second parameter to
the make
directive.
c := make(chan int, 1)
c <- 5
This example won't lock, because we have a buffer for c <- 5
to go into.
In some concurrent cases, you might find channels be used instead of Mutexes. The idea being is that you give your workers a channel to report on, and then have a process that reads from that channel to do the report. Below is an example:
package main
import (
"fmt"
"sync"
)
type Work struct {
WorkerID int
Work int
}
func main() {
workReturn := make(chan Work, 5)
defer close(workReturn) // Ensure that our resource is closed
go func() {
// Will run into workReturn is closed.
for work := range workReturn {
fmt.Printf("Worker(%v): %v\n", work.WorkerID, work.Work)
}
}()
// Waitgroup for workers
wg := &sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
// Workers as anonymous functions
go func(id int) {
defer wg.Done()
for i := 0; i < 5; i++ {
workReturn <- Work{WorkerID: id, Work: i}
}
}(i)
}
wg.Wait()
}
This example is a bit complicated, but should help showcase workers doing "work"
and reporting back via our channel. There are a few things missing from here in
this example, however, such as scope blocking. When the main
function returns,
all routines are discarded and all work will be lost. One of the things we'll
need to add (later) is a mechanism to ensure that all work has been read from
workReturn
before we return from main
. We can do this with contexts
or
channels.
A trick with channels, is that if you are trying to read from a blocked channel, and that channel is closed, your block becomes unblocked. We can use this as a trigger to tell when something finishes, however we definitely need to be aware that it will require us to forego defers in some cases, and will result in (significantly) more complex code.
package main
import (
"fmt"
"sync"
)
type Work struct {
WorkerID int
Work int
}
func main() {
workReturn := make(chan Work, 5)
workProcessCloser := make(chan struct{})
var workSeen int
go func() {
defer close(workProcessCloser)
// Will run into workReturn is closed.
for work := range workReturn {
workSeen++
fmt.Printf("Worker(%v): %v\n", work.WorkerID, work.Work)
}
}()
// Waitgroup for workers
wg := &sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
// Workers as anonymous functions
go func(id int) {
defer wg.Done()
for i := 0; i < 5; i++ {
workReturn <- Work{WorkerID: id, Work: i}
}
}(i)
}
wg.Wait() // Wait for all work
close(workReturn) // Close work return so the forloop above ends after all data is out
<-workProcessCloser // Wait for work processor to be done
fmt.Println("Work Seen:", workSeen)
}
To try to explain what happens:
- We spawn our work processor with a defer statement on
close(workProcessCloser)
which will execute when our processor is done - We spawn our workers
- We block on
Wait()
- Workers process all work, and write to the
workReturn
queue - Work Proccessor processes work, increases
workSeen
variable - All workers are done processing the work,
Wait()
is no longer blocking - We close
workReturn
- We block on
workProcessCloser
- The work processor runs out of work, for loop finishes
- The work processor closes
workProcessCloser
workProcessCloser
stops blocking, ourmain
function returns- We print out
workSeen
since we know it's safe now
It might take a while for some people to get their head wrapped around all of this, and while Go does a lot to make concurrency easier, it is still may be challenging to implement safely and securely. An important factor to any concurrent code is to ensure vigorous testing and peer review.
Since I've already gone over how Mutexs work, this will be rather brief and
without code example. A RWMutex, is just a mutex that shares locks when a READ
happens. RWMutex has two sets of locks and unlocks: RLock
, RUnlock
, Lock
,
Unlock
. When you do a READ, you may use RLock
and RUnlock
, so that other
calls to READ aren't blocked, while they will block calls to Lock
. This can be
useful if you plan on having a significantly higher amount of reading over
writing. The same will be said with Lock
, where if you do Lock
, all calls to
RLock
will block until Unlock
is called.
It is important to know that when a resource is locked, it is blocking other processes. If you have a long running task, you need to ensure that you aren't blocking for longer than you need to, else you may end up with significant performance degradation.
Consider the following:
var resourceLock sync.Mutex
var resources map[string]bool
func ProcessResource(uuid string) error {
resourceLock.Lock()
defer resourceLock.Unlock()
resource, err := getResource(uuid)
if err != nil {
return err
}
// Mark Resource as seen
resources[resource.ID] = true
return nil
}
In this imaginary scenario, getResource
makes an HTTP call to the outside
world. It could even make a call to a database. But imagine that it takes 5
whole seconds for whatever reason. Any other call to ProcessResource
will now
be waiting. If we move the lock to only where we need it, we can cut down on
that time where other processes will be blocked:
func ProcessResource(uuid string) error {
resource, err := getResource(uuid)
if err != nil {
return err
}
resourceLock.Lock()
defer resourceLock.Unlock()
// Mark Resource as seen
resources[resource.ID] = true
return nil
}
Now, our function doesn't block other functions for longer than it needs to.