Skip to content

Instantly share code, notes, and snippets.

@stvoidit
Created April 3, 2022 08:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stvoidit/213a7df93f38fd2376da721f5b397aa6 to your computer and use it in GitHub Desktop.
Save stvoidit/213a7df93f38fd2376da721f5b397aa6 to your computer and use it in GitHub Desktop.
generics channels merge
func merge[T any, C chan T](cs ...C) C {
out := make(C)
var wg sync.WaitGroup
wg.Add(len(cs))
for i := range cs {
go func(c C) {
defer wg.Done()
for value := range c {
out <- value
}
}(cs[i])
}
go func() {
wg.Wait()
close(out)
}()
return out
}
@stvoidit
Copy link
Author

usage example:

package main

import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"log"
	"sync"
	"time"
)

type Msg struct {
	WriterID int
	Payload  string
}

func (m Msg) String() string {
	return fmt.Sprintf("id=%d payload=%s", m.WriterID, m.Payload)
}

type workerFunc[T any] func(int) chan T

func main() {
	var workers = make([]workerFunc[Msg], 0)
	for i := 0; i < 3; i++ {
		workers = append(workers, writer)
	}
	ch := PoolWorkers[Msg](workers...)
	for val := range ch {
		log.Println(val)
	}
}

func PoolWorkers[T any](wfs ...workerFunc[T]) <-chan T {
	var chans = make([]chan T, len(wfs))
	for i, wf := range wfs {
		chans[i] = wf(i + 1)
	}
	return merge[T](chans...)
}

func merge[T any, C chan T](cs ...C) C {
	out := make(C)
	var wg sync.WaitGroup
	wg.Add(len(cs))
	for i := range cs {
		go func(c C, wg *sync.WaitGroup) {
			defer wg.Done()
			for value := range c {
				out <- value
			}
		}(cs[i], &wg)
	}
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

func writer(id int) chan Msg {
	var ch = make(chan Msg)
	go func() {
		var buf = make([]byte, 16)
		var ticker = time.NewTicker(time.Millisecond * 100)
		defer ticker.Stop()
		defer close(ch)
		for i := 0; i < 10; i++ {
			<-ticker.C
			rand.Read(buf)
			ch <- Msg{
				WriterID: id,
				Payload:  hex.EncodeToString(buf),
			}
		}
	}()
	return ch
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment