Skip to content

Instantly share code, notes, and snippets.

@romshark
Last active January 25, 2017 10:31
Show Gist options
  • Save romshark/65492a3b4c1a0c9819e90e0d10d1f79a to your computer and use it in GitHub Desktop.
Save romshark/65492a3b4c1a0c9819e90e0d10d1f79a to your computer and use it in GitHub Desktop.
This is an asynchronous IPC server test suite written in Go. It uses nanomsg for the transport layer, depends on the go-nanomsg Go-binding (https://github.com/op/go-nanomsg) and requires libnanomsg to be installed.
package main
import (
"fmt"
"sync"
"time"
"bytes"
nanomsg "github.com/op/go-nanomsg"
)
type Server struct {
name string
srv *nanomsg.PullSocket
clients map[string] *nanomsg.PushSocket
}
func (server *Server) handle(msg []byte) {
var target string
fmt.Printf("SRV rcv: '%s' (%d)\n", msg, len(msg))
var responseSock *nanomsg.PushSocket
//identify client
switch(string(msg)) {
case "lolik":
responseSock = server.clients["lolik"]
target = "lolik"
time.Sleep(time.Second * 3)
case "bolik":
responseSock = server.clients["bolik"]
target = "bolik"
time.Sleep(time.Second * 2)
default:
panic(fmt.Errorf("unknown caller: '%s'", msg))
}
fmt.Printf("SRV send to %s\n", target)
_, err := responseSock.Send(msg, 0)
if err != nil {
panic(fmt.Errorf("could not send: ", err))
}
fmt.Printf("SRV %s received its response\n", target)
}
func (server *Server) Serve(
name string,
wg *sync.WaitGroup,
knownClients []string,
) {
defer wg.Done()
var buf []byte
var err error
//prepare connections
server.name = name
server.clients = make(map[string] *nanomsg.PushSocket)
for _, clName := range knownClients {
server.clients[clName], err = nanomsg.NewPushSocket()
if err != nil {
panic(fmt.Errorf("could not create server: %s", err))
}
var sockAddr bytes.Buffer
sockAddr.WriteString("ipc://")
sockAddr.WriteString(server.name)
sockAddr.WriteRune('_')
sockAddr.WriteString(clName)
//connect
fmt.Printf("try cl-push '%s'\n", sockAddr.String())
_, err = server.clients[clName].Connect(sockAddr.String())
if err != nil {
panic(fmt.Errorf("could not connect to cl-push %s: %s", clName, err))
}
}
//prepare server
server.srv, err = nanomsg.NewPullSocket()
if err != nil {
panic(fmt.Errorf("could not create server: %s", err))
}
server.srv.SetName(server.name)
//server.srv.SetRecvTimeout(time.Second * 2)
var srvAddr bytes.Buffer
srvAddr.WriteString("ipc://")
srvAddr.WriteString(server.name)
fmt.Printf("bind server: '%s'\n", srvAddr.String())
endpoint, err := server.srv.Bind(srvAddr.String())
if err != nil {
panic(fmt.Errorf("could not bind server: %s", err))
}
defer server.srv.Shutdown(endpoint)
defer server.srv.Close()
for {
//await request
fmt.Printf("SRV listening...\n")
buf, err = server.srv.Recv(0)
if err != nil {
panic(fmt.Errorf("could not receive: %s", err))
}
//handle request
go server.handle(buf)
}
fmt.Printf("SRV done\n")
}
func client(serverName string, wg *sync.WaitGroup, name string) {
defer wg.Done()
var buf []byte
//pull socket
pullSock, err := nanomsg.NewPullSocket()
if err != nil {
panic(fmt.Errorf("could not create client: %s", err))
}
defer pullSock.Close()
var pullName bytes.Buffer
pullName.WriteString(name)
pullName.WriteRune('_')
pullName.WriteString(serverName)
pullName.WriteString("_pull")
pullSock.SetName(pullName.String())
//bind pull socket
var pullAddr bytes.Buffer
pullAddr.WriteString("ipc://")
pullAddr.WriteString(serverName)
pullAddr.WriteRune('_')
pullAddr.WriteString(name)
fmt.Printf("CLT bind pull socket: '%s'\n", pullAddr.String())
_, err = pullSock.Bind(pullAddr.String())
if err != nil {
panic(fmt.Errorf("could not bind server: %s", err))
}
//pusher socket
pushSock, err := nanomsg.NewPushSocket()
if err != nil {
panic(fmt.Errorf("could not create client: %s", err))
}
defer pushSock.Close()
var pushName bytes.Buffer
pushName.WriteString(name)
pushName.WriteString("_push")
pushSock.SetName(pushName.String())
//connect pusher
var pushAddr bytes.Buffer
pushAddr.WriteString("ipc://")
pushAddr.WriteString(serverName)
fmt.Printf("CLT connect pusher: %s\n", pushAddr.String())
_, err = pushSock.Connect(pushAddr.String())
if err != nil {
panic(fmt.Errorf("could not connect to server: %s", err))
}
//send
fmt.Printf("CLT %s sending...\n", name)
_, err = pushSock.Send([]byte(name), 0)
if err != nil {
panic(fmt.Errorf("could not send: %s", err))
}
fmt.Printf("CLT %s sent, receiving...\n", name)
//receive
buf, err = pullSock.Recv(0)
if err != nil {
panic(fmt.Errorf("could not receive: %s", err))
}
fmt.Printf("CLT %s rcv: '%s' (%d)\n", name, buf, len(buf))
fmt.Printf("CLT %s done\n", name)
}
func main() {
fmt.Println("testapp")
var server Server
var wg sync.WaitGroup
wg.Add(3)
go server.Serve("testapp", &wg, []string {"lolik", "bolik"})
go client("testapp", &wg, "lolik")
go client("testapp", &wg, "bolik")
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment