Skip to content

Instantly share code, notes, and snippets.

@kostaz

kostaz/README.md

Forked from ReSTARTR/README.md
Created Oct 28, 2017
Embed
What would you like to do?
ZeroMQ sample in Go and Python. (with MonitoredQueue)

Versions

  • zeromq: stable 3.2.2
  • go: 1.0.3
    • gozmq: zmq_3_x
  • python: 2.7.3
    • pyzmq: 13.0.2

Usage

go run queue.go server
go run queue.go monitorq
go run queue.go monitor
python queue.go client
                  +----------------------------------------+
                  |                                        |
  +------+        |--------------------------+     +-----+ |
  |client|--------|9001     monitorq     9002|-----|serv | |
  |(REQ) |        |(ROUTER)   9003   (DEALER)|     |(REP)| |
  +------+        |--------------------------+     +-----+ |
                  |          |(PUB)                        |
                  |          |                             |
                  |          |                             |
                  |          |(SUB)                        |
                  |      +-------+                         |
                  |      |monitor|                         |
                  |      +-------+                         |
                  +----------------------------------------+
package main
import (
"os"
"flag"
"fmt"
"time"
zmq "github.com/alecthomas/gozmq"
)
const (
PORT_FRONT = 9001
PORT_BACK = 9002
PORT_MONITOR = 9003
)
type Goque struct{
ctx *zmq.Context
}
func NewGoque() *Goque {
ctx, _ := zmq.NewContext()
defer ctx.Close()
return &Goque{ctx}
}
func (goque *Goque) Client() {
sock, _:= goque.ctx.NewSocket(zmq.REQ)
defer sock.Close()
sock.Connect(fmt.Sprintf("tcp://localhost:%d", PORT_FRONT))
fmt.Printf("start client...\n")
for {
sendmsg := fmt.Sprintf("PING#<%d>", os.Getpid())
sock.Send([]byte(sendmsg), 0)
msg, _ := sock.Recv(0)
fmt.Println("Recv:", string(msg))
time.Sleep(1 * time.Second)
}
}
func (goque *Goque) Server() {
sock, _:= goque.ctx.NewSocket(zmq.REP)
defer sock.Close()
fmt.Println(fmt.Sprintf("tcp://localhost:%d", PORT_BACK))
sock.Connect(string(fmt.Sprintf("tcp://localhost:%d", PORT_BACK)))
fmt.Printf("start server...\n")
for {
msg, _ := sock.Recv(0)
fmt.Println("Recv:", string(msg))
sendmsg := fmt.Sprintf("PONG#<%d>", os.Getpid())
sock.Send([]byte(sendmsg), 0)
}
}
func (goque *Goque) Monitor() {
sock, _:= goque.ctx.NewSocket(zmq.SUB)
defer sock.Close()
sock.Connect(fmt.Sprintf("tcp://localhost:%d", PORT_MONITOR))
sock.SetSubscribe("")
for {
msg, _ := sock.Recv(0)
fmt.Println("MONITOR:", string(msg))
}
}
func (goque *Goque) Queue() {
front, _ := goque.ctx.NewSocket(zmq.ROUTER)
defer front.Close()
front.Bind(fmt.Sprintf("tcp://*:%d", PORT_FRONT))
back, _ := goque.ctx.NewSocket(zmq.DEALER)
defer back.Close()
back.Bind(fmt.Sprintf("tcp://*:%d", PORT_BACK))
zmq.Device(zmq.QUEUE, front, back)
}
func (goque *Goque) MonitoredQueue() {
front, _ := goque.ctx.NewSocket(zmq.ROUTER)
defer front.Close()
front.Bind(fmt.Sprintf("tcp://*:%d", PORT_FRONT))
back, _ := goque.ctx.NewSocket(zmq.DEALER)
defer back.Close()
back.Bind(fmt.Sprintf("tcp://*:%d", PORT_BACK))
mon, _ := goque.ctx.NewSocket(zmq.PUB)
defer mon.Close()
mon.Bind(fmt.Sprintf("tcp://*:%d", PORT_MONITOR))
polls := zmq.PollItems {
zmq.PollItem{Socket: front, zmq.Events: zmq.POLLIN},
zmq.PollItem{Socket: back , zmq.Events: zmq.POLLIN},
}
total := make(map[string]int)
for {
_, _ = zmq.Poll(polls, -1)
switch {
case polls[0].REvents & zmq.POLLIN != 0:
parts, _ := front.RecvMultipart(0)
back.SendMultipart(parts, 0)
mon.Send([]byte(fmt.Sprintf("IN: %d, OUT %d", total["in"], total["out"])), 0)
total["in"] += 1
case polls[1].REvents & zmq.POLLIN != 0:
parts, _ := back.RecvMultipart(0)
front.SendMultipart(parts, 0)
total["out"] += 1
}
}
}
func (goque *Goque) Run(key string) {
switch key {
case "serv":
goque.Server()
case "client":
goque.Client()
case "queue":
goque.Queue()
case "monitorq":
goque.MonitoredQueue()
case "monitor":
goque.Monitor()
default:
fmt.Printf("serv or client\n")
}
}
func usage() {
fmt.Println("Usage: go run queue.go [serv|client|queue|monitorq|monitor]\n")
flag.PrintDefaults()
os.Exit(2)
}
func main() {
flag.Usage = usage
flag.Parse()
args := flag.Args()
if len(args) < 1 {
usage()
}
goque := NewGoque()
goque.Run(args[0])
os.Exit(0)
}
# -*- coding: utf-8 -*-
# setup
# pip install pyzmq
import zmq
import os
import time
import random
def front(ctx):
sock = ctx.socket(zmq.ROUTER)
sock.bind("tcp://*:9001")
return sock
def back(ctx):
sock = ctx.socket(zmq.DEALER)
sock.bind("tcp://*:9002")
return sock
def router(ctx, front, back):
return zmq.device(zmq.QUEUE, front, back)
def start_proxy():
ctx = zmq.Context(1)
f = front(ctx)
b = back(ctx)
try:
print 'start proxy...'
#zmq.device(zmq.QUEUE, f, b)
#zmq.device(zmq.FORWARDER, f, b)
zmq.device(zmq.STREAMER, f, b)
except KeyboardInterrupt:
print 'STOPED'
finally:
f.close()
b.close()
ctx.term()
def start_monitorq():
from zmq.devices.monitoredqueue import monitored_queue
ctx = zmq.Context(1)
s_in = ctx.socket(zmq.ROUTER)
s_in.bind('tcp://*:9001')
s_out = ctx.socket(zmq.DEALER)
s_out.bind('tcp://*:9002')
s_mon = ctx.socket(zmq.PUB)
s_mon.bind('tcp://*:9003')
try:
print 'start monitored queue...'
monitored_queue(s_in, s_out, s_mon)
except KeyboardInterrupt:
pass
finally:
s_in.close()
s_out.close()
s_mon.close()
ctx.term()
def start_monitor():
ctx = zmq.Context(1)
sock = ctx.socket(zmq.SUB)
sock.setsockopt(zmq.SUBSCRIBE, "")
sock.connect('tcp://localhost:9003')
while True:
print 'MON:' , sock.recv_multipart()
def start_client():
ctx = zmq.Context(1)
sock = ctx.socket(zmq.REQ)
sock.connect('tcp://localhost:9001')
while True:
sock.send('PING:%d' % os.getpid())
print sock.recv()
time.sleep(random.random())
def start_serv():
ctx = zmq.Context(1)
sock = ctx.socket(zmq.REP)
#sock.bind('tcp://*:9001')
sock.connect('tcp://localhost:9002')
while True:
print sock.recv()
sock.send('PONG:%d' % os.getpid())
#p sock.recv()
if __name__ == '__main__':
import sys
f = {
'serv' : start_serv,
'client' : start_client,
'proxy' : start_proxy,
'monitorq' : start_monitorq,
'monitor' : start_monitor,
}.get(sys.argv[1])
f()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment