I was remodelling the whole worker consumer situation in the notification service. It's a thought experiment, based on what I have learned, and am still struggling to.
But this seems definitely a better way to reason about it. The code is attached below. A generic representation
There are three files in this pattern:
- _producer.go
- _producer_consumer.go
- _consumer.go
modelled after elixir/gen_stage
But i realised, i really don't know a lot about go channel and routines. As in modelling with them.
It's one way to think of I will just fan-in and fan-out. Or fire and forget. But then to write it in semantic way is a bit different. (atleast for me).
So faced with conundrums in life, I went to seek out some answers. And some realizations goes like this:
-
G o channels can be thought of as a messaging queue, which comes with a delivery guarantees There are three of those:
- unbuffered channel, giving atomic delivery guarantee. one send maps to one receive.
- delayed guarantee, make(chan int, 1) , so there is one, but you are not blocked on it, and once the buffer is empty, you know all your jobs are processesed. (It only adds to confusion)
- buffered channels, gives no delivery guarantees. Which means, while processing jobs off a buffered channel, in case your machine crashes, and your workflow were to consume each event and make some changes to store, you are doomed if your code doesn’t deal with idempotency.
-
T hink of sending signals and not data. So there are two ways.
- Actual data,
ch <- true
,ch <- struct{}
- Without data, signalling, with context
- There, is another unrelated pattern, called passing a <-done channel, to signal from child to parent that consumption is complete. Think of react-js child passing data to parent
- Actual data,
And for the bad parts:
- Unbuffered channel can block forever, causing memory leak
- Buffered channel, value greater than 1, you can lose data in the buffer, when machine crashes
- Buffered channel 1, good overall, but then suffers from atomicity, because of delayed guarantee.
You also should probably be aware of select which can wait on different channels. First one to arrive gets served, if it matches case . The most common way to do it is like this:
select {
case <- ctx.Done():
//.. close and return
case <-done:
//.. close and return
case <-time.After(duration):
//.. close and return
default:
//..loop logic
}
In case of a buffered channel, you can also detect back pressure.
ch := make(chan int, 2)
for n > 0 {
select {
case ch <- n:
log.Println("data sent")
default:
log.Fataln("no more room")
}
}
But buffering is tricky.
Even in https://go.dev/blog/pipelines , you see no use of buffered channels.
This approach has a problem: each downstream receiver needs to know the number of potentially blocked upstream senders and arrange to signal those senders on early return.
Keeping track of these counts is tedious and error-prone. Even in there example: the gen
function closes the channel.
also, we can have a pattern where,
package main
import (
"fmt"
"sync"
)
func main() {
fmt.Println("vim-go")
var wg sync.WaitGroup
ch := make(chan int)
workers := 20
gen := func(in chan int) {
i := 0
for {
ch <- i
i++
}
}
go gen(ch)
wg.Add(workers)
for workers > 0 {
go func(idx int, w *sync.WaitGroup) {
defer w.Done()
for x := range ch {
fmt.Println("worker ", idx, "result=>", x)
}
}(workers, &wg)
workers--
}
wg.Wait()
fmt.Println("closing")
}
works as good as having a buffered channel and, we get a delivery guarantee, that one sent will have one recieve or a timeout.
With buffered channel, you get a buffer. ch := make(chan int, 2
). Ofcourse you get to do twice the work in similar time.
How to use that buffer is the question, because how to you deal with this partial update becomes a question. (i mean, depends on use case).
_It feels like, buffered channel should be mostly used to aggregate. Whatever change to store has to be done after the aggregation is complete._The other problem that arises is with closing channels.
Because a go routine that doesn't know if channel has closed, is a leak. memory not gc-ed, op cycles wasted. I guess, it also depends, on whether not we want to close the channel at all.
So a db pooler doesn't need to know when it ends, so that is a leak we want, because we know that there will be only a handful of that, or just one, for{}.
But then if it were a one time task runner, the correct way would be to close the channel, once len(results)==0
,
BUT , if you have spawned off 20 routines, and you are sending data to them, how to do you handle the channels that are still producing data, because send to a closed channel is penik.
One way would be, if you know the count, you can use WaitGroup to keep track.
And have a wg.Wait()
in the end.
But, in that case, your method design needs to accomodate for that.
An example that I think would be: (correct me if I am wrong)
Converting
func Worker(n int) chan int {
recv := make(chan int, n)
for n > 0 {
go func(x int) {
recv <- x*2
}(n)
n--
}
return res
}
Into
func Worker(n int, recv chan<- int) {
var wg sync.WaitGroup
wg.Add(n)
for n > 0 {
go func(w *sync.WaitGroup, x int) {
defer w
recv <- x*2
}(&wg, n)
n--
}
wg.Wait()
close(recv)
}
Or, you can do this with a done := make(chan bool, n)
as well.
In the first case. Without definitely knowing when it ends, how are you going to close the channel.
_When using channels, I think, they should be treated as hunter-gatherer, map-reducers, transformers. And then once you have all the data, you make the permanent changes.Such a scenario might arise, where two database operations (INSERT into two tables), are happening in parallely, Now having made that concurrent, you have introduced a new world of problems of handling distributed transaction, (of course, it depends, whether they operations are mutually exclusive or not)
Having said that. I am now utterly confused. I believe there can be changes made to this code to be made more semantic.