Created
July 31, 2023 13:27
-
-
Save jsphbtst/e114b097dd78838f99b8bdbc4fd4d5fc to your computer and use it in GitHub Desktop.
Fan-Out/In Go Concurrency Pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/csv" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"sync" | |
"time" | |
) | |
// User is our basic data type | |
type User struct { | |
Email string | |
} | |
// transform function for modifying the user | |
func transform(user User) User { | |
// Let's just append "-transformed" to the name to simulate transformation | |
user.Email = user.Email + "-transformed" | |
return user | |
} | |
// Stage 1: Load user data (simulated here with a range over an array) | |
func loadUsersPipe(out chan<- User) { | |
usersCsv, err := os.Open("test_accounts.csv") | |
if err != nil { | |
panic("Failed to read `test_accounts.csv` file") | |
} | |
defer usersCsv.Close() | |
csvReader := csv.NewReader(usersCsv) | |
for { | |
rec, err := csvReader.Read() | |
if err == io.EOF { | |
break | |
} | |
if err != nil { | |
log.Fatal(err) | |
} | |
rowValue := rec[1] | |
isHeader := rowValue == "Email" | |
if isHeader { | |
continue | |
} | |
out <- User{rowValue} | |
} | |
close(out) | |
} | |
// Stage 2: Transform user data concurrently | |
func transformUsersPipe(in <-chan User, out chan<- User, wg *sync.WaitGroup) { | |
defer wg.Done() | |
for user := range in { | |
out <- transform(user) | |
} | |
} | |
type SafeSlice struct { | |
sync.Mutex | |
users []User | |
} | |
// Append adds a user to the slice, safely. | |
func (s *SafeSlice) Append(u User) { | |
s.Lock() | |
s.users = append(s.users, u) | |
s.Unlock() | |
} | |
// Stage 3: Store results in a slice | |
func storeUsersPipe(in <-chan User, s *SafeSlice) { | |
for user := range in { | |
s.Append(user) | |
} | |
} | |
func main() { | |
start := time.Now() | |
// Create channels: channel size depends on requirements, it will block once filled anyway | |
loadChan := make(chan User, 5) | |
storeChan := make(chan User, 5) | |
// Use waitgroup to make sure all transformations are done before storing | |
var wg sync.WaitGroup | |
go loadUsersPipe(loadChan) | |
// Start multiple transformation goroutines; this will determine parallel goroutines running | |
// turning `numberOfParallelGoroutines` = 1 will makes this a flat pipeline, | |
// as this step is technically the fanning out step | |
numberOfParallelGoroutines := 3 | |
for i := 0; i < numberOfParallelGoroutines; i++ { | |
wg.Add(1) | |
go transformUsersPipe(loadChan, storeChan, &wg) | |
} | |
// SafeSlice | |
s := &SafeSlice{} | |
// Wait for all transformations to finish, then close the store channel | |
go func() { | |
wg.Wait() | |
close(storeChan) | |
}() | |
go storeUsersPipe(storeChan, s) | |
// Wait until all transformations are stored | |
wg.Wait() | |
// Print out the transformed users | |
for _, user := range s.users { | |
fmt.Println("Stored user:", user) | |
} | |
end := time.Since(start) | |
fmt.Println("Program took: ", end) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment