Last active
August 3, 2024 20:16
-
-
Save raihankhan/d8ba36175dba10bc139f488d5c7c1f04 to your computer and use it in GitHub Desktop.
A simple pattern for asynchronous publisher-consumer model in Golang.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
) | |
// This function blocks the main goroutine to terminate until | |
// It is not terminated through system interruptions | |
func keepProcessAlive() { | |
quit := make(chan os.Signal, 1) | |
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGSTOP) | |
<-quit | |
fmt.Println("Terminating main thread") | |
} | |
func main() { | |
// waitgroups simply wait for | |
// publisher and consumer goroutines to finish | |
// Initialize a waitgroup for one goroutine with Add(1) | |
// they can block a thread with Wait() method | |
// blockers can be withdrawn with Done() method | |
var publisherWg sync.WaitGroup | |
var consumerWg sync.WaitGroup | |
// A channel can be a bridge between | |
// multiple go routines for communication | |
// data can be sent through them recieved from them as well | |
// following buffer channel can hold upto 5 data of int type at once | |
buffer := make(chan int, 5) | |
// A function like this publishes data through the buffer channel at random interval | |
// In production env this can be modified to recieve data from a source, may be a client | |
// and send to a db consumer via the channel | |
publisherFunc := func(wg *sync.WaitGroup, buffer chan int) { | |
defer wg.Done() | |
for data := 1; data <= 10; data++ { | |
buffer <- data | |
fmt.Printf("published data : %d\n", data) | |
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500))) | |
} | |
fmt.Println("Producer session Terminates") | |
} | |
// A function like this consumes data from the buffer channel at random interval | |
// In production env this can be modified to consumer data from the channel, | |
// and insert it to db | |
consumerFunc := func(wg *sync.WaitGroup, buffer chan int) { | |
defer wg.Done() | |
for data := range buffer { | |
fmt.Printf("Consumed data : %d\n", data) | |
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500))) | |
} | |
fmt.Println("Consumer session Terminates") | |
} | |
// Consumer will always keep listening to the consumer until it's closed | |
// Once Producer session terminates the channel should be closed as well | |
terminateChannel := func(wg *sync.WaitGroup, buffer chan int) { | |
wg.Wait() | |
close(buffer) | |
fmt.Println("Closed channel") | |
} | |
// Start A producer session, A Consumer session and A channel terminator | |
publisherWg.Add(1) | |
go publisherFunc(&publisherWg, buffer) | |
consumerWg.Add(1) | |
go consumerFunc(&consumerWg, buffer) | |
go terminateChannel(&publisherWg, buffer) | |
keepProcessAlive() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment