Skip to content

Instantly share code, notes, and snippets.

@mh-cbon
Last active June 7, 2017 07:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mh-cbon/2062efef48e246592bdb0665f0ab8547 to your computer and use it in GitHub Desktop.
Save mh-cbon/2062efef48e246592bdb0665f0ab8547 to your computer and use it in GitHub Desktop.
golang, simple stream
package main
import (
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
)
func main() {
ds := []string{"a", "b", "c", "d"}
pseudoReader := Reader{ds: ds}
pseudoWriter := Writer{}
writeAnyway := func(s string) error {
_, err := pseudoWriter.Write(s)
return err
}
sink := Stream{}.
Async(PseudoQuery(2)).
Limit(2).
Map(strings.ToUpper).
Write(writeAnyway)
readWriter := Readable(pseudoReader.Read, sink)
fmt.Println("rwErr", readWriter())
fmt.Println()
}
type stringHandler struct {
in string
push func(string) error
}
func PseudoQuery(n int) (func(in string, push func(string) error) error, func() error) {
eof := make(chan bool)
data := make(chan stringHandler, n-1)
err := make(chan error, n-1)
var wg sync.WaitGroup
go func() {
for {
select {
case <-eof:
return
case d := <-data:
<-time.After(1 * time.Second)
e := d.push(d.in)
wg.Done()
fmt.Println("done " + d.in)
err <- e
}
}
}()
dataHandler := func(in string, push func(string) error) error {
wg.Add(1)
data <- stringHandler{in, push}
fmt.Println("doing " + in)
select {
case e := <-err:
if e == io.EOF {
eof <- true
}
return e
default:
}
return nil
}
flushHandler := func() error {
wg.Wait()
go func() { eof <- true }()
return nil
}
return dataHandler, flushHandler
}
func Readable(reader func() (string, error), sink Stream) (readWriter func() error) {
var index = -1
readWriter = func() (readWriteErr error) {
var chunk string
for {
chunk, readWriteErr = reader()
if len(chunk) > 0 {
readWriteErr = sink.TryPush(chunk) // forced stop on write err ?
}
if readWriteErr != nil {
break
}
index++
}
if x, ok := readWriteErr.(AtIndexError); ok {
x.SetIndex(index)
readWriteErr = x
}
sink.Flush()
return
}
return
}
type Reader struct {
ds []string
i int
}
func (r *Reader) Read() (string, error) {
if r.i >= len(r.ds) {
return "", io.EOF
}
ret := r.ds[r.i]
r.i++
fmt.Printf("Read: %v\n", ret)
return ret, nil
}
type Writer struct{}
func (r Writer) Write(s string) (int, error) {
fmt.Printf("Write: %v\n", s)
return len(s), nil
}
var EarlyEnd = errors.New("early end")
type AtIndexError struct {
error
atindex int
}
func (e *AtIndexError) SetIndex(i int) { e.atindex = i }
func (e AtIndexError) GetIndex() int { return e.atindex }
func (e AtIndexError) Error() string { return fmt.Sprintf("%v %v", e.error.Error(), e.atindex) }
type Stream struct {
ds []string
op []func(in string, push func(string) error) error
flush []func() error
}
func (s Stream) Async(fn func(in string, push func(string) error) error, flush func() error) Stream {
s.op = append(s.op, fn)
s.flush = append(s.flush, flush)
return s
}
func (s Stream) Map(fn func(string) string) Stream {
s.op = append(s.op, func(in string, push func(string) error) error {
out := fn(in)
return push(out)
})
s.flush = append(s.flush, func() error { return nil })
return s
}
func (s Stream) Each(fn func(string)) Stream {
s.op = append(s.op, func(in string, push func(string) error) error {
fn(in)
return push(in)
})
s.flush = append(s.flush, func() error { return nil })
return s
}
func (s Stream) Filter(f ...func(string) bool) Stream {
s.op = append(s.op, func(in string, push func(string) error) error {
ok := false
for _, fn := range f {
ok = fn(in)
if !ok {
break
}
}
if ok {
return push(in)
}
return nil
})
s.flush = append(s.flush, func() error { return nil })
return s
}
func (s Stream) Limit(n int) Stream {
done := 0
s.op = append(s.op, func(in string, push func(string) error) error {
if done < n {
done++
return push(in)
}
return AtIndexError{error: EarlyEnd}
})
s.flush = append(s.flush, func() error { return nil })
return s
}
func (s Stream) TryPush(in string) error {
var callop func(e int, in string) error
callop = func(e int, in string) (err error) {
if len(s.op) == e {
return io.EOF // ?
}
return s.op[e](in, func(out string) error {
return callop(e+1, out)
})
}
return callop(0, in)
}
func (s Stream) Push(in string) {
s.TryPush(in)
}
func (s Stream) All() (err error) {
for _, v := range s.ds {
if err := s.TryPush(v); err != nil {
break
}
}
if err == nil {
err = s.Flush()
}
return err
}
func (s Stream) Flush() (err error) {
for _, f := range s.flush {
// should do multi error here.
err = f()
}
return err
}
func (s Stream) Write(sink func(string) error) Stream {
s.op = append(s.op, func(in string, push func(string) error) (writeErr error) {
return sink(in)
})
return s
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment