Skip to content

Instantly share code, notes, and snippets.

@majek
Created July 17, 2015 12:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save majek/5ad981a6f647719a9749 to your computer and use it in GitHub Desktop.
Save majek/5ad981a6f647719a9749 to your computer and use it in GitHub Desktop.
a.go
package main
import (
"fmt"
"os"
"strings"
"math/rand"
"github.com/gdamore/mangos"
mangospub "github.com/gdamore/mangos/protocol/pub"
mangossub "github.com/gdamore/mangos/protocol/sub"
"github.com/gdamore/mangos/transport/ipc"
"github.com/gdamore/mangos/transport/tcp"
)
func NanomsgSub(nanomsgAddr string) mangos.Socket {
sub, err := mangossub.NewSocket()
if err != nil {
fmt.Fprintf(os.Stderr, "[!] nanomsg.NewSubSocket() %v\n", err.Error())
os.Exit(3)
}
sub.SetOption(mangos.OptionWriteQLen, 2048)
sub.SetOption(mangos.OptionReadQLen, 2048)
sub.SetOption(mangos.OptionNoDelay, false)
fmt.Fprintf(os.Stderr, "[+] Receiving from nanomsg PUB on %q\n", nanomsgAddr)
sub.AddTransport(ipc.NewTransport())
sub.AddTransport(tcp.NewTransport())
if err := sub.Dial(nanomsgAddr); err != nil {
fmt.Fprintf(os.Stderr, "[!] nanomsg.Dial(%q) %v\n", nanomsgAddr, err.Error())
os.Exit(3)
}
err = sub.SetOption(mangos.OptionSubscribe, []byte(""))
if err != nil {
fmt.Fprintf(os.Stderr, "[!] nanomsg.Subscribe() %v\n", err.Error())
os.Exit(3)
}
return sub
}
func NanomsgPub(nanomsgOutAddr string) mangos.Socket {
pub, err := mangospub.NewSocket()
if err != nil {
fmt.Fprintf(os.Stderr, "[!] nanomsg.NewSubSocket() %v\n", err.Error())
os.Exit(3)
}
pub.SetOption(mangos.OptionWriteQLen, 2048)
pub.SetOption(mangos.OptionReadQLen, 2048)
pub.SetOption(mangos.OptionNoDelay, false)
if strings.HasPrefix(nanomsgOutAddr, "ipc:///") {
// ignore error
os.Remove(nanomsgOutAddr[6:])
}
fmt.Fprintf(os.Stderr, "[+] Publishing to nanomsg SUB on %q\n", nanomsgOutAddr)
pub.AddTransport(ipc.NewTransport())
pub.AddTransport(tcp.NewTransport())
if err := pub.Listen(nanomsgOutAddr); err != nil {
fmt.Fprintf(os.Stderr, "[!] nanomsg.Listen(%q) %v\n", nanomsgOutAddr, err.Error())
os.Exit(3)
}
return pub
}
func main() {
pub := NanomsgPub("ipc:///tmp/xx")
sub := NanomsgSub("ipc:///tmp/xx")
go func() {
var b [4096]byte
for {
l := rand.Intn(256)
b[0] = uint8(l)
pub.Send(b[:l])
}
}()
for {
p, _ := sub.Recv()
if int(p[0]) != len(p) {
fmt.Printf("data=%d len=%d\n", int(p[0]), len(p))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment