|
-- nix-shell -p "haskellPackages.ghcWithPackages (pkgs: with pkgs; [ zeromq4-haskell universum async ]) --run ghci |
|
|
|
{-# LANGUAGE NoImplicitPrelude #-} |
|
{-# LANGUAGE OverloadedStrings #-} |
|
|
|
import Universum |
|
import qualified System.ZMQ4 as Z |
|
import Control.Concurrent (threadDelay) |
|
import Control.Concurrent.Async (mapConcurrently_) |
|
import Control.Concurrent.MVar (withMVar) |
|
import System.IO.Unsafe (unsafePerformIO) |
|
import Control.Concurrent.QSem (newQSem, waitQSem, signalQSem) |
|
import Data.List ((!!)) |
|
import qualified Data.ByteString as BS |
|
import qualified Data.ByteString.Char8 as BC |
|
|
|
-- Protocol connects multiple clients to multiple servers |
|
-- between each client and server pair there are three wires: |
|
-- |
|
-- |ROUTER| <-------> |DEALER| (Req wire) |
|
-- SERVER |SUB| <-------- |PUB| CLIENT (Acc wire) |
|
-- |PUB| --------> |SUB| (Pub wire) |
|
-- |
|
-- Req wire is a bidirectional channel between client and server which |
|
-- allows client to send requests to server and get subsequent responses. |
|
-- While client can be connected with many servers, a single |
|
-- request-response cycle will be handled with only one chosen server. |
|
-- |
|
-- Acc wire is a unidirectional channel which allows server to accept |
|
-- data proposed by client (such as a new transaction). |
|
-- When client is connected to many servers, each server will receive |
|
-- data sent by client. |
|
-- |
|
-- Pub wire is a unidirectional channel which allows server to push |
|
-- data down to all connected clients. |
|
|
|
reqSuffix = ":10001" |
|
accSuffix = ":10002" |
|
pubSuffix = ":10003" |
|
|
|
s1AddrPrefix = "tcp://127.0.0.1" |
|
s2AddrPrefix = "tcp://127.0.0.2" |
|
s3AddrPrefix = "tcp://127.0.0.3" |
|
|
|
-- s1AddrPrefix = "inproc://server1" |
|
-- s2AddrPrefix = "inproc://server2" |
|
-- s3AddrPrefix = "inproc://server3" |
|
|
|
-- Topology: |
|
-- |
|
-- client 1 -> [server 1, server 2] |
|
-- client 2 -> [server 2, server 3] |
|
-- client 3 -> [server 1, server 3] |
|
|
|
createAndBind zctx sockType addr = do |
|
sock <- Z.socket zctx sockType |
|
Z.bind sock addr $> sock |
|
|
|
createAndBindServ zctx addrPrefix = |
|
(,,) <$> createAndBind zctx Z.Router (addrPrefix <> reqSuffix) |
|
<*> createAndBind zctx Z.Sub (addrPrefix <> accSuffix) |
|
<*> createAndBind zctx Z.Pub (addrPrefix <> pubSuffix) |
|
|
|
createClient zctx = |
|
(,,) <$> Z.socket zctx Z.Dealer |
|
<*> Z.socket zctx Z.Pub |
|
<*> Z.socket zctx Z.Sub |
|
|
|
connectClient (req, acc, pub) serverAddrPrefix = do |
|
Z.connect req (serverAddrPrefix <> reqSuffix) |
|
Z.connect acc (serverAddrPrefix <> accSuffix) |
|
Z.connect pub (serverAddrPrefix <> pubSuffix) |
|
|
|
logMVar = unsafePerformIO (newMVar ()) |
|
|
|
logText = withMVar logMVar . const . putTextLn |
|
logTextP prefix = logText . (("[" <> toText prefix <> "] ") <>) |
|
|
|
runClient1 zctx qsem = do |
|
waitQSem qsem |
|
socks@(req, acc, pub) <- createClient zctx |
|
connectClient socks s1AddrPrefix |
|
connectClient socks s2AddrPrefix |
|
threadDelay 1000000 |
|
logText "Client 1 started" |
|
Z.send req [] "bla" |
|
Z.send req [] "zord" |
|
Z.sendMulti req ("narf" :| ["zord"]) |
|
receiveMsgs req 3 1 |
|
|
|
receiveMsgs req num clientId = |
|
forM_ [1..num] $ \_ -> Z.receiveMulti req >>= logTextP ("Client "++show clientId) . show |
|
|
|
runClient2 zctx qsem = do |
|
waitQSem qsem |
|
socks@(req, acc, pub) <- createClient zctx |
|
connectClient socks s2AddrPrefix |
|
connectClient socks s3AddrPrefix |
|
threadDelay 1000000 |
|
logText "Client 2 started" |
|
Z.send req [] "poid" |
|
Z.send req [] "fran" |
|
Z.sendMulti acc ("tx.type1" :| ["Transaction from client 2"]) |
|
receiveMsgs req 2 2 |
|
|
|
runClient3 zctx qsem = do |
|
waitQSem qsem |
|
socks@(req, acc, pub) <- createClient zctx |
|
connectClient socks s1AddrPrefix |
|
connectClient socks s3AddrPrefix |
|
threadDelay 1000000 |
|
Z.subscribe pub "blk" |
|
logText "Client 3 started" |
|
Z.send req [] "bryak" |
|
Z.send req [] "fran" |
|
Z.sendMulti acc ("tx.type1" :| ["Transaction from client 3"]) |
|
receiveMsgs req 2 3 |
|
receiveMsgs pub 4 3 |
|
|
|
runServer addrPrefix zctx qsem = do |
|
(req, acc, pub) <- createAndBindServ zctx addrPrefix |
|
Z.subscribe acc "tx" |
|
signalQSem qsem |
|
let toPoll sock = Z.Sock sock [Z.In] Nothing |
|
loop i = do |
|
evs <- Z.poll (-1) [toPoll req, toPoll acc] |
|
logTextP addrPrefix $ show evs |
|
unless (null $ evs !! 1) $ do |
|
msg <- Z.receiveMulti acc |
|
logTextP addrPrefix $ "Tx received from client: " <> show msg |
|
unless (null $ evs !! 0) $ do |
|
(msgId : msg) <- Z.receiveMulti req |
|
logTextP addrPrefix $ "Received request: " <> show msg |
|
Z.sendMulti req (msgId :| map BS.reverse msg) |
|
Z.sendMulti pub ("blk:" <> BC.pack addrPrefix |
|
:| [ "blk ref " <> BC.pack (show $ 2*i) |
|
, "blk ref " <> BC.pack (show $ 2*i + 1)]) |
|
loop (i+1) |
|
loop 0 |
|
|
|
|
|
main = do |
|
zctx <- Z.context |
|
|
|
qsem <- newQSem 3 |
|
|
|
mapConcurrently_ (\f -> f zctx qsem) $ |
|
[ runClient1, runClient2, runClient3 |
|
, runServer s1AddrPrefix |
|
, runServer s2AddrPrefix |
|
, runServer s3AddrPrefix |
|
] |