Skip to content

Instantly share code, notes, and snippets.

@mh-cbon
Last active June 15, 2017 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mh-cbon/8afc7ca4e8a5fdde0a460f0411f13e75 to your computer and use it in GitHub Desktop.
Save mh-cbon/8afc7ca4e8a5fdde0a460f0411f13e75 to your computer and use it in GitHub Desktop.
golang stream pull
package main
import (
"errors"
"fmt"
"io"
"strings"
)
func main() {
fmt.Println("Hello, playground")
s := (&Stream{}).Map(strings.ToUpper).Filter(func(s string) bool { return true }).Each(func(s string) {
fmt.Println("writer got", s)
})
// Push one data on the buffer
s.Push("one")
// read one data
{
v, err := s.Read()
fmt.Printf("reader got %q %v\n", v, err)
}
// read one more data
{
v, err := s.Read()
fmt.Printf("reader got %q %v\n", v, err)
} // got EOF!
// Push another data on the buffer
s.Push("two")
// read new data
{
v, err := s.Read()
fmt.Printf("reader got %q %v\n", v, err)
}
// read again, get an EOF
// or write/read a data
{
v, err := s.Write("uppercase")
fmt.Printf("reader got %q %v\n", v, err)
}
}
type Stream struct {
bridges []*StreamBridge
buffer []string
}
type SkipErr struct{ error }
func (s *Stream) add(bridge *StreamBridge) *Stream {
k := len(s.bridges)
if k > 0 {
bridge.from = s.bridges[k-1]
s.bridges[k-1].to = bridge
} else {
bridge.from = &StreamPull{s}
}
s.bridges = append(s.bridges, bridge)
return s
}
func (s *Stream) Filter(f func(string) bool) *Stream {
bridge := &StreamBridge{
handle: func(s string) (string, error) {
fmt.Println("Filter got", s)
if f(s) == false {
return s, SkipErr{errors.New("skip")}
}
return s, nil
},
}
return s.add(bridge)
}
func (s *Stream) Each(f func(string)) *Stream {
bridge := &StreamBridge{
handle: func(s string) (string, error) {
fmt.Println("Each got", s)
f(s)
return s, nil
},
}
return s.add(bridge)
}
func (s *Stream) Map(f func(string) string) *Stream {
bridge := &StreamBridge{
handle: func(s string) (string, error) {
fmt.Println("Map got", s)
return f(s), nil
},
}
return s.add(bridge)
}
func (s *Stream) Push(one string) *Stream {
s.buffer = append(s.buffer, one)
return s
}
func (s *Stream) Read() (string, error) {
k := len(s.bridges)
if k == 0 {
p := &StreamPull{s}
return p.Pull()
}
return s.bridges[k-1].Pull()
}
func (s *Stream) Write(in string) (string, error) {
return s.Push(in).Read()
}
type StreamPuller interface {
Pull() (string, error)
Push(string) (string, error)
}
type StreamPull struct {
*Stream
}
func (s *StreamPull) Pull() (string, error) {
var v string
if len(s.buffer) > 0 {
v = s.buffer[0]
s.buffer = s.buffer[1:]
return s.Push(v)
}
return v, io.EOF
}
func (s *StreamPull) Push(in string) (string, error) {
if len(s.bridges) > 0 {
return s.bridges[0].Push(in)
}
return in, io.EOF
}
type StreamBridge struct {
from StreamPuller
to StreamPuller
handle func(string) (string, error)
}
func (s *StreamBridge) Pull() (string, error) {
return s.from.Pull()
}
func (s *StreamBridge) Push(in string) (string, error) {
selfV, selfErr := s.handle(in)
if selfErr != nil || s.to == nil {
return selfV, selfErr
}
return s.to.Push(selfV)
}
// todo asyncBridge
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment