Skip to content

Instantly share code, notes, and snippets.

@raihankhan
Last active August 3, 2024 20:16
Show Gist options
  • Save raihankhan/d8ba36175dba10bc139f488d5c7c1f04 to your computer and use it in GitHub Desktop.
Save raihankhan/d8ba36175dba10bc139f488d5c7c1f04 to your computer and use it in GitHub Desktop.
A simple pattern for asynchronous publisher-consumer model in Golang.
package main
import (
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// This function blocks the main goroutine to terminate until
// It is not terminated through system interruptions
func keepProcessAlive() {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGSTOP)
<-quit
fmt.Println("Terminating main thread")
}
func main() {
// waitgroups simply wait for
// publisher and consumer goroutines to finish
// Initialize a waitgroup for one goroutine with Add(1)
// they can block a thread with Wait() method
// blockers can be withdrawn with Done() method
var publisherWg sync.WaitGroup
var consumerWg sync.WaitGroup
// A channel can be a bridge between
// multiple go routines for communication
// data can be sent through them recieved from them as well
// following buffer channel can hold upto 5 data of int type at once
buffer := make(chan int, 5)
// A function like this publishes data through the buffer channel at random interval
// In production env this can be modified to recieve data from a source, may be a client
// and send to a db consumer via the channel
publisherFunc := func(wg *sync.WaitGroup, buffer chan int) {
defer wg.Done()
for data := 1; data <= 10; data++ {
buffer <- data
fmt.Printf("published data : %d\n", data)
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
}
fmt.Println("Producer session Terminates")
}
// A function like this consumes data from the buffer channel at random interval
// In production env this can be modified to consumer data from the channel,
// and insert it to db
consumerFunc := func(wg *sync.WaitGroup, buffer chan int) {
defer wg.Done()
for data := range buffer {
fmt.Printf("Consumed data : %d\n", data)
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
}
fmt.Println("Consumer session Terminates")
}
// Consumer will always keep listening to the consumer until it's closed
// Once Producer session terminates the channel should be closed as well
terminateChannel := func(wg *sync.WaitGroup, buffer chan int) {
wg.Wait()
close(buffer)
fmt.Println("Closed channel")
}
// Start A producer session, A Consumer session and A channel terminator
publisherWg.Add(1)
go publisherFunc(&publisherWg, buffer)
consumerWg.Add(1)
go consumerFunc(&consumerWg, buffer)
go terminateChannel(&publisherWg, buffer)
keepProcessAlive()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment