Skip to content

Instantly share code, notes, and snippets.

@dimiro1
Created September 15, 2016 03:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dimiro1/b9c4d81d9c8a4eef2cbfe1228c90eeea to your computer and use it in GitHub Desktop.
Save dimiro1/b9c4d81d9c8a4eef2cbfe1228c90eeea to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/rs/xlog"
)
// Component represents a runnable component
type Component interface {
Run(ctx context.Context)
}
// Network is a graph network
// has a map ot registered elements
type Network struct {
components map[string]Component
}
// Register is used to give a name to the Component, so it can be used in the Connect function
// ex: RegisterComponent("read", c)
func (n *Network) Register(name string, c Component) error {
if n.components == nil {
n.components = make(map[string]Component)
}
n.components[name] = c
return nil
}
// Connect connect the input and output of the given components
// in -> out
// Ex: Connect("read.out -> write.in")
// Panic if element is not registered???
func (n *Network) Connect(definition string) error {
return nil
}
// Run start all components
func (n *Network) Run(ctx context.Context) {
for _, c := range n.components {
go c.Run(ctx)
}
}
type readComponent struct {
in chan StringWithContext `port:"in"`
out chan string `port:"out"`
err chan string `port:"error"`
}
func (r *readComponent) Run(ctx context.Context) {
for {
select {
case message := <-r.in:
log := xlog.FromContext(message.ctx)
log.Info("Reading", xlog.F{
"message": message.in,
})
content, err := ioutil.ReadFile(message.in)
if err != nil {
log.Error("Error", xlog.F{
"error": err,
})
r.err <- err.Error()
continue
}
r.out <- string(content)
case <-ctx.Done():
return
}
}
}
type sendRequestComponent struct {
url chan StringWithContext `port:"url"`
message chan string `port:"message"`
err chan string `port:"err"`
}
func (s *sendRequestComponent) Run(ctx context.Context) {
for {
select {
case url := <-s.url:
response, err := http.Get(url.in)
if err != nil {
s.err <- err.Error()
continue
}
data, err := ioutil.ReadAll(response.Body)
if err != nil {
s.err <- err.Error()
continue
}
s.message <- string(data)
response.Body.Close()
case <-ctx.Done():
return
}
}
}
type writeComponent struct {
in chan string `port:"in"`
}
func (w *writeComponent) Run(ctx context.Context) {
for {
select {
case content := <-w.in:
fmt.Println(content)
case <-ctx.Done():
return
}
}
}
type StringWithContext struct {
ctx context.Context
in string
}
func NewStringWithContext(ctx context.Context, str string) StringWithContext {
return StringWithContext{
ctx: ctx,
in: str,
}
}
func main() {
conf := xlog.Config{
Level: xlog.LevelInfo,
Fields: xlog.F{
"role": "FBP",
},
Output: xlog.NewOutputChannel(xlog.NewConsoleOutput()),
}
logger := xlog.New(conf)
// Essa instaciação de componentes pode ser feita no registro/connect dos componentes
buffer := 10 // Opcional. Pode ser 0.
urlPort := make(chan StringWithContext, buffer)
readInPort := make(chan StringWithContext, buffer)
readErrPort := make(chan string, buffer)
writeInPort := make(chan string, buffer)
s := &sendRequestComponent{
url: urlPort,
message: writeInPort,
}
read := &readComponent{
in: readInPort,
out: writeInPort,
err: readErrPort,
}
write := &writeComponent{
in: writeInPort,
}
errorWriter := &writeComponent{
in: readErrPort,
}
ctx := xlog.NewContext(context.Background(), logger)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
n := &Network{}
n.Register("SendRequest", s)
n.Register("Read", read)
n.Register("Write", write)
n.Register("ErrorWrite", errorWriter)
// Isto não faz nada por enquanto
n.Connect("SendRequest.message -> Write.in")
n.Connect("Read.out -> Write.in")
n.Connect("Read.err -> ErrorWrite.in")
n.Run(ctx)
files := []string{
"/Users/claudemiro/my-noflo-example-app/package.json",
"Hello World",
}
for i := 0; i < 2; i++ {
readInPort <- NewStringWithContext(ctx, files[i])
time.Sleep(500 * time.Millisecond)
}
urlPort <- NewStringWithContext(ctx, "https://httpbin.org/ip")
time.Sleep(2 * time.Second)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment