Skip to content

Instantly share code, notes, and snippets.

@hauxe
Created May 29, 2018 09:46
Show Gist options
  • Save hauxe/474a506b552ec5edb1e99e8c0e98a1ef to your computer and use it in GitHub Desktop.
Save hauxe/474a506b552ec5edb1e99e8c0e98a1ef to your computer and use it in GitHub Desktop.
file io pipeline and broadcast
package main
import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
"github.com/hauxe/gom/broadcast"
)
func writeSomething(w io.WriteCloser) {
defer w.Close()
for i := 0; i < 10; i++ {
w.Write([]byte(fmt.Sprintf("Sample text: %d\n", i)))
}
}
func write(wg *sync.WaitGroup, receiver *broadcast.Receiver, w io.Writer, lastReader *io.Reader) {
for {
_, err := receiver.Read(func(v interface{}) (interface{}, error) {
r, ok := v.(io.Reader)
if !ok {
return nil, errors.New("not a valid reader")
}
*lastReader = io.TeeReader(r, w)
return *lastReader, nil
})
wg.Done()
if err != nil {
return
}
}
}
func main() {
// setup pipeline
reader, writer := io.Pipe()
defer reader.Close()
// broadcaster
broadcaster := broadcast.NewBroadcaster()
fileComsumer, err := broadcaster.Listen()
if err != nil {
panic(err)
}
stdoutComsumer, err := broadcaster.Listen()
if err != nil {
panic(err)
}
file, err := os.OpenFile("./test.txt", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0777)
if err != nil {
panic(err)
}
defer file.Close()
var lastReader io.Reader
wg := new(sync.WaitGroup)
wg.Add(2)
go write(wg, fileComsumer, file, &lastReader)
go write(wg, stdoutComsumer, os.Stdout, &lastReader)
// write something
go writeSomething(writer)
// broadcast
broadcaster.Write(reader)
wg.Wait()
wg.Add(2)
// trigger read
ioutil.ReadAll(lastReader)
broadcaster.Close()
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment