Created
May 29, 2018 09:46
-
-
Save hauxe/474a506b552ec5edb1e99e8c0e98a1ef to your computer and use it in GitHub Desktop.
file io pipeline and broadcast
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"os" | |
"sync" | |
"github.com/hauxe/gom/broadcast" | |
) | |
func writeSomething(w io.WriteCloser) { | |
defer w.Close() | |
for i := 0; i < 10; i++ { | |
w.Write([]byte(fmt.Sprintf("Sample text: %d\n", i))) | |
} | |
} | |
func write(wg *sync.WaitGroup, receiver *broadcast.Receiver, w io.Writer, lastReader *io.Reader) { | |
for { | |
_, err := receiver.Read(func(v interface{}) (interface{}, error) { | |
r, ok := v.(io.Reader) | |
if !ok { | |
return nil, errors.New("not a valid reader") | |
} | |
*lastReader = io.TeeReader(r, w) | |
return *lastReader, nil | |
}) | |
wg.Done() | |
if err != nil { | |
return | |
} | |
} | |
} | |
func main() { | |
// setup pipeline | |
reader, writer := io.Pipe() | |
defer reader.Close() | |
// broadcaster | |
broadcaster := broadcast.NewBroadcaster() | |
fileComsumer, err := broadcaster.Listen() | |
if err != nil { | |
panic(err) | |
} | |
stdoutComsumer, err := broadcaster.Listen() | |
if err != nil { | |
panic(err) | |
} | |
file, err := os.OpenFile("./test.txt", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0777) | |
if err != nil { | |
panic(err) | |
} | |
defer file.Close() | |
var lastReader io.Reader | |
wg := new(sync.WaitGroup) | |
wg.Add(2) | |
go write(wg, fileComsumer, file, &lastReader) | |
go write(wg, stdoutComsumer, os.Stdout, &lastReader) | |
// write something | |
go writeSomething(writer) | |
// broadcast | |
broadcaster.Write(reader) | |
wg.Wait() | |
wg.Add(2) | |
// trigger read | |
ioutil.ReadAll(lastReader) | |
broadcaster.Close() | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment