Last active
January 25, 2017 10:31
-
-
Save romshark/65492a3b4c1a0c9819e90e0d10d1f79a to your computer and use it in GitHub Desktop.
This is an asynchronous IPC server test suite written in Go. It uses nanomsg for the transport layer, depends on the go-nanomsg Go-binding (https://github.com/op/go-nanomsg) and requires libnanomsg to be installed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
"bytes" | |
nanomsg "github.com/op/go-nanomsg" | |
) | |
type Server struct { | |
name string | |
srv *nanomsg.PullSocket | |
clients map[string] *nanomsg.PushSocket | |
} | |
func (server *Server) handle(msg []byte) { | |
var target string | |
fmt.Printf("SRV rcv: '%s' (%d)\n", msg, len(msg)) | |
var responseSock *nanomsg.PushSocket | |
//identify client | |
switch(string(msg)) { | |
case "lolik": | |
responseSock = server.clients["lolik"] | |
target = "lolik" | |
time.Sleep(time.Second * 3) | |
case "bolik": | |
responseSock = server.clients["bolik"] | |
target = "bolik" | |
time.Sleep(time.Second * 2) | |
default: | |
panic(fmt.Errorf("unknown caller: '%s'", msg)) | |
} | |
fmt.Printf("SRV send to %s\n", target) | |
_, err := responseSock.Send(msg, 0) | |
if err != nil { | |
panic(fmt.Errorf("could not send: ", err)) | |
} | |
fmt.Printf("SRV %s received its response\n", target) | |
} | |
func (server *Server) Serve( | |
name string, | |
wg *sync.WaitGroup, | |
knownClients []string, | |
) { | |
defer wg.Done() | |
var buf []byte | |
var err error | |
//prepare connections | |
server.name = name | |
server.clients = make(map[string] *nanomsg.PushSocket) | |
for _, clName := range knownClients { | |
server.clients[clName], err = nanomsg.NewPushSocket() | |
if err != nil { | |
panic(fmt.Errorf("could not create server: %s", err)) | |
} | |
var sockAddr bytes.Buffer | |
sockAddr.WriteString("ipc://") | |
sockAddr.WriteString(server.name) | |
sockAddr.WriteRune('_') | |
sockAddr.WriteString(clName) | |
//connect | |
fmt.Printf("try cl-push '%s'\n", sockAddr.String()) | |
_, err = server.clients[clName].Connect(sockAddr.String()) | |
if err != nil { | |
panic(fmt.Errorf("could not connect to cl-push %s: %s", clName, err)) | |
} | |
} | |
//prepare server | |
server.srv, err = nanomsg.NewPullSocket() | |
if err != nil { | |
panic(fmt.Errorf("could not create server: %s", err)) | |
} | |
server.srv.SetName(server.name) | |
//server.srv.SetRecvTimeout(time.Second * 2) | |
var srvAddr bytes.Buffer | |
srvAddr.WriteString("ipc://") | |
srvAddr.WriteString(server.name) | |
fmt.Printf("bind server: '%s'\n", srvAddr.String()) | |
endpoint, err := server.srv.Bind(srvAddr.String()) | |
if err != nil { | |
panic(fmt.Errorf("could not bind server: %s", err)) | |
} | |
defer server.srv.Shutdown(endpoint) | |
defer server.srv.Close() | |
for { | |
//await request | |
fmt.Printf("SRV listening...\n") | |
buf, err = server.srv.Recv(0) | |
if err != nil { | |
panic(fmt.Errorf("could not receive: %s", err)) | |
} | |
//handle request | |
go server.handle(buf) | |
} | |
fmt.Printf("SRV done\n") | |
} | |
func client(serverName string, wg *sync.WaitGroup, name string) { | |
defer wg.Done() | |
var buf []byte | |
//pull socket | |
pullSock, err := nanomsg.NewPullSocket() | |
if err != nil { | |
panic(fmt.Errorf("could not create client: %s", err)) | |
} | |
defer pullSock.Close() | |
var pullName bytes.Buffer | |
pullName.WriteString(name) | |
pullName.WriteRune('_') | |
pullName.WriteString(serverName) | |
pullName.WriteString("_pull") | |
pullSock.SetName(pullName.String()) | |
//bind pull socket | |
var pullAddr bytes.Buffer | |
pullAddr.WriteString("ipc://") | |
pullAddr.WriteString(serverName) | |
pullAddr.WriteRune('_') | |
pullAddr.WriteString(name) | |
fmt.Printf("CLT bind pull socket: '%s'\n", pullAddr.String()) | |
_, err = pullSock.Bind(pullAddr.String()) | |
if err != nil { | |
panic(fmt.Errorf("could not bind server: %s", err)) | |
} | |
//pusher socket | |
pushSock, err := nanomsg.NewPushSocket() | |
if err != nil { | |
panic(fmt.Errorf("could not create client: %s", err)) | |
} | |
defer pushSock.Close() | |
var pushName bytes.Buffer | |
pushName.WriteString(name) | |
pushName.WriteString("_push") | |
pushSock.SetName(pushName.String()) | |
//connect pusher | |
var pushAddr bytes.Buffer | |
pushAddr.WriteString("ipc://") | |
pushAddr.WriteString(serverName) | |
fmt.Printf("CLT connect pusher: %s\n", pushAddr.String()) | |
_, err = pushSock.Connect(pushAddr.String()) | |
if err != nil { | |
panic(fmt.Errorf("could not connect to server: %s", err)) | |
} | |
//send | |
fmt.Printf("CLT %s sending...\n", name) | |
_, err = pushSock.Send([]byte(name), 0) | |
if err != nil { | |
panic(fmt.Errorf("could not send: %s", err)) | |
} | |
fmt.Printf("CLT %s sent, receiving...\n", name) | |
//receive | |
buf, err = pullSock.Recv(0) | |
if err != nil { | |
panic(fmt.Errorf("could not receive: %s", err)) | |
} | |
fmt.Printf("CLT %s rcv: '%s' (%d)\n", name, buf, len(buf)) | |
fmt.Printf("CLT %s done\n", name) | |
} | |
func main() { | |
fmt.Println("testapp") | |
var server Server | |
var wg sync.WaitGroup | |
wg.Add(3) | |
go server.Serve("testapp", &wg, []string {"lolik", "bolik"}) | |
go client("testapp", &wg, "lolik") | |
go client("testapp", &wg, "bolik") | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment