Skip to content

Instantly share code, notes, and snippets.

@amitavaghosh1
Created August 24, 2022 14:12
Show Gist options
  • Save amitavaghosh1/9529819093080b00a62843f7ffe303fe to your computer and use it in GitHub Desktop.
Save amitavaghosh1/9529819093080b00a62843f7ffe303fe to your computer and use it in GitHub Desktop.
Finding channels

I was remodelling the whole worker consumer situation in the notification service. It's a thought experiment, based on what I have learned, and am still struggling to.

But this seems definitely a better way to reason about it. The code is attached below. A generic representation

There are three files in this pattern:

  • _producer.go
  • _producer_consumer.go
  • _consumer.go

modelled after elixir/gen_stage

But i realised, i really don't know a lot about go channel and routines. As in modelling with them.

It's one way to think of I will just fan-in and fan-out. Or fire and forget. But then to write it in semantic way is a bit different. (atleast for me).

So faced with conundrums in life, I went to seek out some answers. And some realizations goes like this:

  • G o channels can be thought of as a messaging queue, which comes with a delivery guarantees There are three of those:

    • unbuffered channel, giving atomic delivery guarantee. one send maps to one receive.
    • delayed guarantee, make(chan int, 1) , so there is one, but you are not blocked on it, and once the buffer is empty, you know all your jobs are processesed. (It only adds to confusion)
    • buffered channels, gives no delivery guarantees. Which means, while processing jobs off a buffered channel, in case your machine crashes, and your workflow were to consume each event and make some changes to store, you are doomed if your code doesn’t deal with idempotency.
  • T hink of sending signals and not data. So there are two ways.

    • Actual data, ch <- true , ch <- struct{}
    • Without data, signalling, with context
    • There, is another unrelated pattern, called passing a <-done channel, to signal from child to parent that consumption is complete. Think of react-js child passing data to parent

And for the bad parts:

  • Unbuffered channel can block forever, causing memory leak
  • Buffered channel, value greater than 1, you can lose data in the buffer, when machine crashes
  • Buffered channel 1, good overall, but then suffers from atomicity, because of delayed guarantee.

You also should probably be aware of select which can wait on different channels. First one to arrive gets served, if it matches case . The most common way to do it is like this:

select {
case <- ctx.Done():
//.. close and return
case <-done:
//.. close and return
case <-time.After(duration):
//.. close and return
default:
//..loop logic
}

In case of a buffered channel, you can also detect back pressure.

ch := make(chan int, 2)
for n > 0 {
  select {
  case ch <- n:
    log.Println("data sent")
  default:
    log.Fataln("no more room")
  }
}

But buffering is tricky.

Even in https://go.dev/blog/pipelines , you see no use of buffered channels.

This approach has a problem: each downstream receiver needs to know the number of potentially blocked upstream senders and arrange to signal those senders on early return. Keeping track of these counts is tedious and error-prone. Even in there example: the gen function closes the channel.

also, we can have a pattern where,

package main

import (
	"fmt"
	"sync"
)

func main() {
	fmt.Println("vim-go")

	var wg sync.WaitGroup
	ch := make(chan int)
	workers := 20

	gen := func(in chan int) {
		i := 0
		for {
			ch <- i
			i++
		}
	}

	go gen(ch)

	wg.Add(workers)

	for workers > 0 {
		go func(idx int, w *sync.WaitGroup) {
			defer w.Done()
			for x := range ch {
				fmt.Println("worker ", idx, "result=>", x)
			}
		}(workers, &wg)

		workers--
	}
	wg.Wait()

	fmt.Println("closing")
}

works as good as having a buffered channel and, we get a delivery guarantee, that one sent will have one recieve or a timeout.

With buffered channel, you get a buffer. ch := make(chan int, 2). Ofcourse you get to do twice the work in similar time.

How to use that buffer is the question, because how to you deal with this partial update becomes a question. (i mean, depends on use case).

_It feels like, buffered channel should be mostly used to aggregate. Whatever change to store has to be done after the aggregation is complete._

The other problem that arises is with closing channels.

Because a go routine that doesn't know if channel has closed, is a leak. memory not gc-ed, op cycles wasted. I guess, it also depends, on whether not we want to close the channel at all.

So a db pooler doesn't need to know when it ends, so that is a leak we want, because we know that there will be only a handful of that, or just one, for{}.

But then if it were a one time task runner, the correct way would be to close the channel, once len(results)==0,

BUT , if you have spawned off 20 routines, and you are sending data to them, how to do you handle the channels that are still producing data, because send to a closed channel is penik.

One way would be, if you know the count, you can use WaitGroup to keep track. And have a wg.Wait() in the end.

But, in that case, your method design needs to accomodate for that.

An example that I think would be: (correct me if I am wrong)

Converting

func Worker(n int) chan int {
  recv := make(chan int, n)
  for n > 0 {
    go func(x int) {
      recv <- x*2
    }(n)
    
    n--
  }
  return res
}

Into

func Worker(n int, recv chan<- int) {
  var wg sync.WaitGroup
  wg.Add(n)
  
  for n > 0 {
    go func(w *sync.WaitGroup, x int) {
      defer w
      recv <- x*2
    }(&wg, n)
    
    n--
  }
  wg.Wait()
  close(recv)
}

Or, you can do this with a done := make(chan bool, n) as well.

In the first case. Without definitely knowing when it ends, how are you going to close the channel.

_When using channels, I think, they should be treated as hunter-gatherer, map-reducers, transformers. And then once you have all the data, you make the permanent changes.

Such a scenario might arise, where two database operations (INSERT into two tables), are happening in parallely, Now having made that concurrent, you have introduced a new world of problems of handling distributed transaction, (of course, it depends, whether they operations are mutually exclusive or not)

Having said that. I am now utterly confused. I believe there can be changes made to this code to be made more semantic.

package emailworker
type Consumer interface {
Consume(ctx context.Context, consumer ProducerConsumer)
}
type EmailConsumer struct {
nworkers int
svc SomeService
}
func (ec EmailConsumer) Consume(ctx context.Context, consumer ProducerConsumer) {
var wg sync.WaitGroup
wg.Add(ec.nworkers)
pcChan := consumer.HandleEvents(ctx)
for i := 0; i < ec.nworkers; i++ {
go func() {
errchan := make(chan error, 1)
ec.loop(ctx, &wg, pcChan, errchan)
err := <-errchan
panic(err)
}()
}
wg.Wait()
}
func (ec EmailConsumer) loop(ctx context.Context, wg *sync.WaitGroup, consumerChan chan PCResult, errchan chan error) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
for res := range consumerChan {
if res.err != nil {
// decide what to do here, if yo receive an error on an object
// you can either drop it, or panic(res.err)
// you can
errchan <- res.err
continue
}
if len(res.payloads) == 0 {
log.Println("=====nothing to do. moving on=====")
continue
}
if err := ec.svc.SomeOperation(ctx, res.data); err != nil {
// same thing as above, decide what to do
// for example purposes, we chose to ignore here
errchan <- err
return // because db is probably failing at this point, so it makes no sense to continue
// this should probably be accompanied by
close(consumerChan) // so that, nothing else gets processed.
// this might raise a panic, because some producer consumer sending data to consumer will fail to send on closed channel
}
}
}
}
}
package emailworker
type Producer interface {
Produce(context.Context) chan ProducerResult
}
type ProducerResult struct {
data interface{}
err error
}
type EmailProducer struct {
demand int
done chan bool // in order to allow propagation of events from subscribers
svc SomeService // for example
}
func (ep EmailProducer) Produce(ctx context.Context) chan ProducerResult {
resultChan := make(chan ProducerResult)
go ep.loop(ctx, resultChan)
return resultChan
}
func (ep EmailProducer) loop(ctx context.Context, resultChan chan ProducerResult) {
for {
select {
case <-ctx.Done():
logger.Infoln("context cancelled")
return
default:
logger.Infoln("doing stuff")
results, err := ep.svc.GetData(ctx)
if err != nil {
panic(err)
// or return
}
if len(results)==0 {
// do something if needed or omit this
}
var wg sync.WaitGroup
wg.Add(len(results))
for i, result := range results {
go func(workerid int, w *sync.WaitGroup, data interface{}) {
var err error
var output interface{}
// do some transformation of data
resultChan <- Result{err: err, data: output}
}(i, &wg, result)
}
wg.Wait()
}
time.Sleep(20*time.Second) // throttling
}
}
package emailworker
type ProducerConsumer interface {
HandleEvents(ctx context.Context) chan PCResult
}
type ProducerConsumerResult struct {
data interface{}
err error
// however you want your struct to be
}
type EmailProducerConsumer struct{
producer Producer
done chan bool
workers int
// whatever you need
}
func (epc EmailProducerConsumer) HandleEvents(ctx context.Context) chan ProducerConsumerResult {
resultCh := make(chan ProducerConsumerResult, epc.workers)
go func() {
producerCh := epc.producer.Produce(ctx)
for result := range producerCh {
go func(r ProducerResult, rchan chan PCResult) {
if r.err != nil {
rchan <- ProducerConsumerResult{err: r.err}
return
}
var data = r.data
// do whatever transformations on data
rchan <- ProducerConsumerResult{err: r.err, data: data}
}(result, resultCh)
}
}()
return resultCh
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment