Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Go streams
package main
import (
"bufio"
"fmt"
"io"
"net"
"strings"
)
func main() {
source := TCPSocketSource(":8080", '\n')
transform := StringTransform(strings.ToUpper)
// partition := Partition(func(input interface{}) interface{} { return strings.Contains(input.(string), "BOB") })
sink := StdoutSink()
Pipe(source).To(transform)
Pipe(transform).To(sink)
// Pipe(source)
// .To(PartitionBy("id"))
// .To(Transform(strings.ToUpper))
// .To(MergeOrderedBy(length)
// .To(StdoutSink)
// Pipe(source).To(PartitionBy("id")).To(Transform(strings.ToUpper)).To(MergeOrderedBy(length).To(StdoutSink)
// ---> Transform --\
// Source ---> PartitionBy ---> Transform --- MergeOrderedBy(Length) ---> Sink
// ---> Transform --/
select {}
}
type Source interface {
Output() <-chan interface{}
}
type PartitionedSource interface {
Output() map[interface{}]chan interface{}
}
type PartitionedSink interface {
Input(map[interface{}]chan interface{})
}
type Flow interface {
Sink
Source
}
type PartitionedFlow interface {
PartitionedSource
PartitionedSink
}
type JoinerFlow interface {
PartitionedSink
Source
}
type SplitterFlow interface {
Sink
PartitionedSource
}
type Sink interface {
Input(<-chan interface{})
}
type Pipable interface {
To(Sink)
}
type partitioned struct {
input <-chan interface{}
partitioner func(interface{}) interface{}
}
func (p *partitioned) Input(input <-chan interface{}) {
p.input = input
}
func (p *partitioned) Output() map[interface{}]chan interface{} {
partitions := make(map[interface{}]chan interface{})
go func() {
for {
select {
case message := <-p.input:
partitionKey := p.partitioner(message)
parition, ok := partitions[partitionKey]
if !ok {
partitions[partitionKey] = make(chan interface{})
}
parition <- message
}
}
}()
return partitions
}
func Partition(partitioner func(interface{}) interface{}) SplitterFlow {
return &partitioned{input: nil, partitioner: partitioner}
}
type piper struct {
source Source
}
func Pipe(source Source) Pipable {
return &piper{source}
}
func (p *piper) To(sink Sink) {
go func() {
fmt.Printf("Creating pipeline from %#v to %#v\n", p.source, sink)
sink.Input(p.source.Output())
}()
}
type tcpSocketSource struct {
addr string
delimiter byte
}
func (t *tcpSocketSource) Output() <-chan interface{} {
inner := make(chan interface{})
go func() {
listener, err := net.Listen("tcp", t.addr)
if err != nil {
panic(err)
}
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println(fmt.Errorf("Error on TCP socket connect: %s", err))
}
go func() {
for {
message, err := bufio.NewReader(conn).ReadString(t.delimiter)
if err != nil {
if err != io.EOF {
fmt.Println(fmt.Errorf("Error on TCP socket read: %s", err))
}
} else {
inner <- strings.Replace(message, string(t.delimiter), "", -1)
}
}
}()
}
}()
return inner
}
func TCPSocketSource(addr string, delimiter byte) Source {
return &tcpSocketSource{addr, delimiter}
}
type stringTransform struct {
fn func(string) string
inner chan interface{}
}
func (t *stringTransform) Input(input <-chan interface{}) {
for {
select {
case message := <-input:
parsed, ok := message.(string)
if !ok {
panic("wrong message type, expected string")
}
t.inner <- t.fn(parsed)
}
}
}
func (t *stringTransform) Output() <-chan interface{} {
return t.inner
}
func StringTransform(fn func(string) string) Flow {
return &stringTransform{fn, make(chan interface{})}
}
type stdoutSink struct{}
func (t *stdoutSink) Input(input <-chan interface{}) {
for {
select {
case message := <-input:
fmt.Println(message)
}
}
}
func StdoutSink() Sink {
return &stdoutSink{}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment