Skip to content

Instantly share code, notes, and snippets.

@georgeee
Last active January 27, 2019 11:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save georgeee/dcccb6a718b96f6036a4457eab2c0f66 to your computer and use it in GitHub Desktop.
Save georgeee/dcccb6a718b96f6036a4457eab2c0f66 to your computer and use it in GitHub Desktop.
ZeroMQ haskell playground

Small playground for zeromq

Launches three servers and three clients.

Connection topology:

client 1  -> [server 1, server 2]
client 2  -> [server 2, server 3]
client 3  -> [server 1, server 3]

A protocol in use between peers is described in the comment in file.

Client implementations differ a bit, while implementation of server is uniform.

Code works both with inproc:// and tcp:// transports. For inproc transport a proper synchronization with QSem was added.

To launch on nix:

nix-shell -p "haskellPackages.ghcWithPackages (pkgs: with pkgs; [ zeromq4-haskell universum async ]) --run "runghc test.hs"

Note on acc wire

Wire acc is: PUB on client side and SUB on server side.

While ZMQ handles subscriptions well (i.e. when client connects to the server, subscription is pushed to client immediately). But PUB doesn't queue up messages, so there is a race condition between first transaction being published by client and connection from client to server be fully established (recall that connection establishing is in fact asynchronous).

-- nix-shell -p "haskellPackages.ghcWithPackages (pkgs: with pkgs; [ zeromq4-haskell universum async ]) --run ghci
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import Universum
import qualified System.ZMQ4 as Z
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (mapConcurrently_)
import Control.Concurrent.MVar (withMVar)
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent.QSem (newQSem, waitQSem, signalQSem)
import Data.List ((!!))
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BC
-- Protocol connects multiple clients to multiple servers
-- between each client and server pair there are three wires:
--
-- |ROUTER| <-------> |DEALER| (Req wire)
-- SERVER |SUB| <-------- |PUB| CLIENT (Acc wire)
-- |PUB| --------> |SUB| (Pub wire)
--
-- Req wire is a bidirectional channel between client and server which
-- allows client to send requests to server and get subsequent responses.
-- While client can be connected with many servers, a single
-- request-response cycle will be handled with only one chosen server.
--
-- Acc wire is a unidirectional channel which allows server to accept
-- data proposed by client (such as a new transaction).
-- When client is connected to many servers, each server will receive
-- data sent by client.
--
-- Pub wire is a unidirectional channel which allows server to push
-- data down to all connected clients.
reqSuffix = ":10001"
accSuffix = ":10002"
pubSuffix = ":10003"
s1AddrPrefix = "tcp://127.0.0.1"
s2AddrPrefix = "tcp://127.0.0.2"
s3AddrPrefix = "tcp://127.0.0.3"
-- s1AddrPrefix = "inproc://server1"
-- s2AddrPrefix = "inproc://server2"
-- s3AddrPrefix = "inproc://server3"
-- Topology:
--
-- client 1 -> [server 1, server 2]
-- client 2 -> [server 2, server 3]
-- client 3 -> [server 1, server 3]
createAndBind zctx sockType addr = do
sock <- Z.socket zctx sockType
Z.bind sock addr $> sock
createAndBindServ zctx addrPrefix =
(,,) <$> createAndBind zctx Z.Router (addrPrefix <> reqSuffix)
<*> createAndBind zctx Z.Sub (addrPrefix <> accSuffix)
<*> createAndBind zctx Z.Pub (addrPrefix <> pubSuffix)
createClient zctx =
(,,) <$> Z.socket zctx Z.Dealer
<*> Z.socket zctx Z.Pub
<*> Z.socket zctx Z.Sub
connectClient (req, acc, pub) serverAddrPrefix = do
Z.connect req (serverAddrPrefix <> reqSuffix)
Z.connect acc (serverAddrPrefix <> accSuffix)
Z.connect pub (serverAddrPrefix <> pubSuffix)
logMVar = unsafePerformIO (newMVar ())
logText = withMVar logMVar . const . putTextLn
logTextP prefix = logText . (("[" <> toText prefix <> "] ") <>)
runClient1 zctx qsem = do
waitQSem qsem
socks@(req, acc, pub) <- createClient zctx
connectClient socks s1AddrPrefix
connectClient socks s2AddrPrefix
threadDelay 1000000
logText "Client 1 started"
Z.send req [] "bla"
Z.send req [] "zord"
Z.sendMulti req ("narf" :| ["zord"])
receiveMsgs req 3 1
receiveMsgs req num clientId =
forM_ [1..num] $ \_ -> Z.receiveMulti req >>= logTextP ("Client "++show clientId) . show
runClient2 zctx qsem = do
waitQSem qsem
socks@(req, acc, pub) <- createClient zctx
connectClient socks s2AddrPrefix
connectClient socks s3AddrPrefix
threadDelay 1000000
logText "Client 2 started"
Z.send req [] "poid"
Z.send req [] "fran"
Z.sendMulti acc ("tx.type1" :| ["Transaction from client 2"])
receiveMsgs req 2 2
runClient3 zctx qsem = do
waitQSem qsem
socks@(req, acc, pub) <- createClient zctx
connectClient socks s1AddrPrefix
connectClient socks s3AddrPrefix
threadDelay 1000000
Z.subscribe pub "blk"
logText "Client 3 started"
Z.send req [] "bryak"
Z.send req [] "fran"
Z.sendMulti acc ("tx.type1" :| ["Transaction from client 3"])
receiveMsgs req 2 3
receiveMsgs pub 4 3
runServer addrPrefix zctx qsem = do
(req, acc, pub) <- createAndBindServ zctx addrPrefix
Z.subscribe acc "tx"
signalQSem qsem
let toPoll sock = Z.Sock sock [Z.In] Nothing
loop i = do
evs <- Z.poll (-1) [toPoll req, toPoll acc]
logTextP addrPrefix $ show evs
unless (null $ evs !! 1) $ do
msg <- Z.receiveMulti acc
logTextP addrPrefix $ "Tx received from client: " <> show msg
unless (null $ evs !! 0) $ do
(msgId : msg) <- Z.receiveMulti req
logTextP addrPrefix $ "Received request: " <> show msg
Z.sendMulti req (msgId :| map BS.reverse msg)
Z.sendMulti pub ("blk:" <> BC.pack addrPrefix
:| [ "blk ref " <> BC.pack (show $ 2*i)
, "blk ref " <> BC.pack (show $ 2*i + 1)])
loop (i+1)
loop 0
main = do
zctx <- Z.context
qsem <- newQSem 3
mapConcurrently_ (\f -> f zctx qsem) $
[ runClient1, runClient2, runClient3
, runServer s1AddrPrefix
, runServer s2AddrPrefix
, runServer s3AddrPrefix
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment