Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
chaining channels in go
package main
import (
"unicode/utf8"
)
type Item string
type Stream chan Item
type Acc string
func NewWordStream(words []Item) Stream {
stream := make(Stream)
go func() {
for _, w := range words {
stream <- w
}
close(stream)
}()
return stream
}
func (in Stream) Map(fn func(Item) Item) Stream {
out := make(Stream)
go func() {
for x := range in {
out <- fn(x)
}
close(out)
}()
return out
}
func (in Stream) Filter(fn func(Item) bool) Stream {
out := make(Stream)
go func() {
for x := range in {
if fn(x) {
out <- x
}
}
close(out)
}()
return out
}
func (in Stream) Reduce(fn func(Acc, Item) (Acc, Acc)) chan Acc {
out := make(chan Acc)
go func() {
var val, acc Acc
for x := range in {
acc, val = fn(acc, x)
if val != "" {
out <- val
}
}
close(out)
}()
return out
}
func (in Stream) Sniff(fn func(Item)) Stream {
out := make(Stream)
go func() {
for x := range in {
fn(x)
out <- x
}
close(out)
}()
return out
}
func (in Stream) Split() (Stream, Stream) {
out1, out2 := make(Stream), make(Stream)
go func() {
for x := range in {
out1 <- x
out2 <- x
}
close(out1)
close(out2)
}()
return out1, out2
}
func (in Stream) ReduceAll(fn func(Acc, Item) Acc) Acc {
var acc Acc
for x := range in {
acc = fn(acc, x)
}
return acc
}
func (in Stream) Collect() []Item {
a := make([]Item, 0, 5)
for x := range in {
a = append(a, x)
}
return a
}
func (in Stream) Do(fn func(Item)) {
for x := range in {
fn(x)
}
}
func main() {
reverse := func(word Item) Item {
s := string(word)
o := make([]rune, utf8.RuneCountInString(s))
i := len(o)
for _, c := range s {
i--
o[i] = c
}
return Item(o)
}
longerThan := func(n int) func(Item) bool {
return func(w Item) bool {
return len(w) > n
}
}
not := func(s string) func(Item) bool {
item := Item(s)
return func(w Item) bool {
return item != w
}
}
words := []Item{"this", "is", "pretty", "cool", "1", "2", "3"}
NewWordStream(words).Map(reverse).Filter(longerThan(2)).Map(reverse).Filter(not("this")).Do(func(w Item) {
println(w)
})
}
package main
import (
"errors"
"log"
)
type Stream struct {
in chan string
close chan bool
err chan error
}
func (s *Stream) Close() { s.close <- true }
func (s *Stream) Fail(err error) { s.Close(); s.Drain(); s.err <- err }
func (s *Stream) Drain() {
for _ = range s.in {
}
}
func NewWordStream(words []string) *Stream {
s := &Stream{make(chan string), make(chan bool), make(chan error)}
go func() {
loop:
for _, w := range words {
select {
case <-s.close:
break loop
case s.in <- w:
continue
}
}
close(s.in)
}()
return s
}
func (s *Stream) Map(fn func(string) (string, error)) *Stream {
out := make(chan string)
go func() {
for x := range s.in {
x, err := fn(x)
if err != nil {
s.Fail(err)
} else {
out <- x
}
}
close(out)
}()
return &Stream{out, s.close, s.err}
}
func (s *Stream) Filter(fn func(string) (bool, error)) *Stream {
out := make(chan string)
go func() {
for x := range s.in {
ok, err := fn(x)
if err != nil {
s.Fail(err)
} else if ok {
out <- x
}
}
close(out)
}()
return &Stream{out, s.close, s.err}
}
func (s *Stream) Do(fn func(string)) error {
var err error
loop:
for {
select {
case x, ok := <-s.in:
if !ok {
break loop
}
fn(x)
case err = <-s.err:
}
}
return err
}
func main() {
words := []string{"this", "is", "pretty", "cool", "1", "2", "3"}
longerThan3 := func(s string) (bool, error) { return len(s) > 3, nil }
wrap := func(s string) (string, error) {
if s == "cool" {
return "", errors.New("not cool")
}
return "<" + s + ">", nil
}
err := NewWordStream(words).Map(wrap).Filter(longerThan3).Do(func(w string) { log.Print(w, " ") })
if err != nil {
log.Fatalf("Error: %s", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment