Skip to content

Instantly share code, notes, and snippets.

@jsphbtst
Created July 31, 2023 13:27
Show Gist options
  • Save jsphbtst/e114b097dd78838f99b8bdbc4fd4d5fc to your computer and use it in GitHub Desktop.
Save jsphbtst/e114b097dd78838f99b8bdbc4fd4d5fc to your computer and use it in GitHub Desktop.
Fan-Out/In Go Concurrency Pattern
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"os"
"sync"
"time"
)
// User is our basic data type
type User struct {
Email string
}
// transform function for modifying the user
func transform(user User) User {
// Let's just append "-transformed" to the name to simulate transformation
user.Email = user.Email + "-transformed"
return user
}
// Stage 1: Load user data (simulated here with a range over an array)
func loadUsersPipe(out chan<- User) {
usersCsv, err := os.Open("test_accounts.csv")
if err != nil {
panic("Failed to read `test_accounts.csv` file")
}
defer usersCsv.Close()
csvReader := csv.NewReader(usersCsv)
for {
rec, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
rowValue := rec[1]
isHeader := rowValue == "Email"
if isHeader {
continue
}
out <- User{rowValue}
}
close(out)
}
// Stage 2: Transform user data concurrently
func transformUsersPipe(in <-chan User, out chan<- User, wg *sync.WaitGroup) {
defer wg.Done()
for user := range in {
out <- transform(user)
}
}
type SafeSlice struct {
sync.Mutex
users []User
}
// Append adds a user to the slice, safely.
func (s *SafeSlice) Append(u User) {
s.Lock()
s.users = append(s.users, u)
s.Unlock()
}
// Stage 3: Store results in a slice
func storeUsersPipe(in <-chan User, s *SafeSlice) {
for user := range in {
s.Append(user)
}
}
func main() {
start := time.Now()
// Create channels: channel size depends on requirements, it will block once filled anyway
loadChan := make(chan User, 5)
storeChan := make(chan User, 5)
// Use waitgroup to make sure all transformations are done before storing
var wg sync.WaitGroup
go loadUsersPipe(loadChan)
// Start multiple transformation goroutines; this will determine parallel goroutines running
// turning `numberOfParallelGoroutines` = 1 will makes this a flat pipeline,
// as this step is technically the fanning out step
numberOfParallelGoroutines := 3
for i := 0; i < numberOfParallelGoroutines; i++ {
wg.Add(1)
go transformUsersPipe(loadChan, storeChan, &wg)
}
// SafeSlice
s := &SafeSlice{}
// Wait for all transformations to finish, then close the store channel
go func() {
wg.Wait()
close(storeChan)
}()
go storeUsersPipe(storeChan, s)
// Wait until all transformations are stored
wg.Wait()
// Print out the transformed users
for _, user := range s.users {
fmt.Println("Stored user:", user)
}
end := time.Since(start)
fmt.Println("Program took: ", end)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment