Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@anthonynsimon
Created February 3, 2017 00:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anthonynsimon/aa8d58f36a97d4fcac6ea16c8ab73f85 to your computer and use it in GitHub Desktop.
Save anthonynsimon/aa8d58f36a97d4fcac6ea16c8ab73f85 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"fmt"
"log"
"net"
"io"
)
const (
T_START = iota
T_P
T_PU
T_PUB
T_PUB_DELIM
T_PUB_TOPIC
T_S
T_SU
T_SUB
T_SUB_DELIM
T_SUB_TOPIC
)
var (
topics = make(map[string]chan []byte)
uniqueTopic = make(chan []byte, 1024)
)
func main() {
port := 6543
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
for {
conn, err := lis.Accept()
if err != nil {
log.Println(err)
}
go handleConnection(conn)
}
}
// wire protocol
// sub [topic]
// unsub [topic]
// pub [topic] [data]
func handleConnection(conn net.Conn) {
log.Println("established connection")
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
defer conn.Close()
for {
line, err := rw.ReadString('\n')
if err == io.EOF {
return
}
if err != nil {
return
}
state := T_START
topicStart := -1
topicEnd := -1
PARSER:
for i, c := range line {
switch state {
case T_START:
switch c {
case 'p', 'P':
state = T_P
case 's', 'S':
state = T_S
default:
return
}
case T_P:
switch c {
case 'u', 'U':
state = T_PU
default:
return
}
case T_S:
switch c {
case 'u', 'U':
state = T_SU
default:
return
}
case T_PU:
switch c {
case 'b', 'B':
state = T_PUB
default:
return
}
case T_SU:
switch c {
case 'b', 'B':
state = T_SUB
default:
return
}
case T_PUB:
switch c {
case ' ':
topicStart = i+1
state = T_PUB_DELIM
default:
return
}
case T_SUB:
switch c {
case ' ':
topicStart = i+1
state = T_SUB_DELIM
default:
return
}
case T_PUB_DELIM:
switch c {
case ' ':
topicEnd = i
state = T_PUB_TOPIC
break PARSER
}
case T_SUB_DELIM:
switch c {
case ' ', '\n':
topicEnd = i
state = T_SUB_TOPIC
break PARSER
}
}
}
switch state {
case T_PUB_TOPIC:
publish(line[topicStart:topicEnd], []byte(line[topicEnd+1:]),rw)
case T_SUB_TOPIC:
subscribe(line[topicStart:topicEnd], rw)
default:
return
}
}
}
func subscribe(topic string, w *bufio.ReadWriter) {
// topicChan := ensureChannel(topic)
w.Write([]byte("Subscribed to " + topic + "\n"))
w.Flush()
go func() {
for {
data := <- uniqueTopic
w.Write(data)
w.Flush()
}
}()
}
func publish(topic string, data []byte, w *bufio.ReadWriter) {
// topicChan := ensureChannel(topic)
uniqueTopic <- data
w.Write([]byte("Published to " + topic + "\n"))
w.Flush()
}
func ensureChannel(topic string) chan []byte {
topicChan, ok := topics[topic]
if !ok {
topicChan = make(chan []byte, 1024)
topics[topic] = topicChan
}
fmt.Printf("got channel for topic: %s - %v\n", topic, topicChan)
return topicChan
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment