Skip to content

Instantly share code, notes, and snippets.

@portnov
Created September 19, 2014 10:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save portnov/808835f1fdbd53444713 to your computer and use it in GitHub Desktop.
Save portnov/808835f1fdbd53444713 to your computer and use it in GitHub Desktop.
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad
import Control.Applicative
import Data.Binary
import qualified Data.Map as M
import Network.Socket hiding (recv)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import Network.Socket.ByteString (recv, sendAll)
import Control.Concurrent
import Control.Concurrent.STM
import Data.Word
import System.Environment
import System.Random
import System.IO
import Data.Time
import Data.Time.Format
import System.Locale
fromStrict :: B.ByteString -> L.ByteString
fromStrict s = L.fromChunks [s]
toStrict :: L.ByteString -> B.ByteString
toStrict s = B.concat $ L.toChunks s
data Message = Message {
isRequest :: Bool
, getStan :: Word64
}
deriving (Eq)
instance Show Message where
show msg
| isRequest msg = "[Request #" ++ show (getStan msg) ++ "]"
| otherwise = "[Response #" ++ show (getStan msg) ++ "]"
instance Binary Message where
get = Message <$> get <*> get
put (Message rq stan) = put rq >> put stan
data ExtPort =
ClientPort Socket
| ServerPort Socket Socket
deriving (Eq,Show)
data Pool = Pool {
pHost :: String
, pMinPort :: Int
, pMaxPort :: Int
, pPorts :: TVar( M.Map Int (ExtPort, Int) )
}
deriving (Eq)
createPool :: String -> Int -> Int -> IO Pool
createPool host minPort maxPort = do
var <- atomically $ newTVar M.empty
return $ Pool host minPort maxPort var
getSocket :: ExtPort -> Socket
getSocket (ClientPort sock) = sock
getSocket (ServerPort conn sock) = conn
readN :: Word16 -> Socket -> IO B.ByteString
readN sz sock = go sz
where
go n = do
d <- recv sock (fromIntegral n)
let m = fromIntegral $ B.length d
if m < n
then go (n-m)
else return d
readMessage :: ExtPort -> IO Message
readMessage port = do
let sock = getSocket port
lenStr <- readN 2 sock
let len = decode (fromStrict lenStr) :: Word16
bin <- readN len sock
let msg = decode $ fromStrict bin
--putStrLn $ "Read message #" ++ show msg ++ " from port " ++ show port
return msg
writeMessage :: ExtPort -> Message -> IO ()
writeMessage port msg = do
let sock = getSocket port
let bin = encode msg
let len = fromIntegral (L.length bin) :: Word16
let lenStr = encode len
sendAll sock $ toStrict $ lenStr `L.append` bin
--putStrLn $ "Wrote message #" ++ show msg ++ " to port " ++ show port
withServerPort serveraddr initStan talk = do
sock <- socket (addrFamily serveraddr) Stream defaultProtocol
bindSocket sock (addrAddress serveraddr)
listen sock 100
(conn, _) <- accept sock
talk initStan conn
sClose conn
sClose sock
talker isClient host minPort maxPort initStan = do
chanFrom <- atomically $ newTChan
chanTo <- atomically $ newTChan
let getter = if isClient
then getClientAddr
else getServerAddr
pool <- createPool host minPort maxPort
if isClient
then receiverClient pool chanTo chanFrom
else receiverServer pool chanTo chanFrom
forkIO $ do
--threadDelay $ 1000*1000
writer isClient getter pool chanTo chanFrom
workers isClient initStan chanTo chanFrom
getPort :: (String -> Int -> IO ExtPort) -> Pool -> IO (Int, ExtPort)
getPort getSocket p = do
port <- randomRIO (pMinPort p, pMaxPort p)
ports <- atomically $ readTVar (pPorts p)
--putStrLn $ "Ports: " ++ show (M.keys ports) ++ ", selecting #" ++ show port
case M.lookup port ports of
Just (sock, n) -> do
--putStrLn $ "Using existing port #" ++ show port
atomically $ do
ports <- readTVar (pPorts p)
writeTVar (pPorts p) $ M.insert port (sock, n+1) ports
return (port, sock)
Nothing -> do
sock <- getSocket (pHost p) port
atomically $ do
ports <- readTVar (pPorts p)
writeTVar (pPorts p) $ M.insert port (sock, 1) ports
return (port, sock)
getPortWrite :: Pool -> IO (Int, ExtPort)
getPortWrite p = do
portsMap <- atomically $ do
m <- readTVar (pPorts p)
when (M.null m) retry
return m
let ports = M.keys portsMap
--putStrLn $ "Ports: " ++ show ports
portIdx <- randomRIO (0, length ports - 1)
let port = ports !! portIdx
let Just (sock, n) = M.lookup port portsMap
--putStrLn $ "Using existing port #" ++ show port
atomically $ do
ports <- readTVar (pPorts p)
writeTVar (pPorts p) $ M.insert port (sock, n+1) ports
return (port, sock)
putPort :: Pool -> Int -> IO ()
putPort p port = atomically $ do
ports <- readTVar (pPorts p)
case M.lookup port ports of
Just (s,n) -> writeTVar (pPorts p) $ M.insert port (s, n-1) ports
Nothing -> fail $ "Try to close unopened port"
openAllServerPorts :: Pool -> IO ()
openAllServerPorts p = do
forM_ [pMinPort p .. pMaxPort p] $ \portNumber -> do
sock <- getServerAddr (pHost p) portNumber
atomically $ do
ports <- readTVar (pPorts p)
writeTVar (pPorts p) $ M.insert portNumber (sock, 1) ports
closePort :: ExtPort -> IO ()
closePort (ClientPort sock) = sClose sock
closePort (ServerPort conn sock) = do
sClose conn
sClose sock
receiverServer p chanTo chanFrom = do
forM_ [pMinPort p .. pMaxPort p] $ \portNumber -> forkIO $ do
port <- getServerAddr (pHost p) portNumber
ports <- atomically $ do
ports <- readTVar (pPorts p)
let ports' = M.insert portNumber (port, 1) ports
writeTVar (pPorts p) ports'
return ports'
--putStrLn $ "Listening: " ++ show ports
forever $ receiver chanFrom port
receiverClient pool chanTo chanFrom = do
forkIO $ forever $ do
ports <- atomically $ do
m <- readTVar (pPorts pool)
return $ map fst $ M.elems m
--putStrLn $ "Ports to receive: " ++ show (length ports)
t <- atomically $ newTVar (length ports)
forM_ ports $ \port -> forkIO $ do
receiver chanFrom port
atomically $ modifyTVar t $ \n -> (n-1)
atomically $ do
n <- readTVar t
when (n > 0) retry
return ()
receiver chanFrom port = do
--putStrLn $ "Receiver: " ++ show port
msg <- readMessage port
atomically $ writeTChan chanFrom msg
writer :: Bool -> (String -> Int -> IO ExtPort) -> Pool -> TChan Message -> TChan Message -> IO ()
writer isClient getter pool chanTo chanFrom = forever $ do
--threadDelay $ 100*1000
(portNumber,port) <- if not isClient
then do
--putStrLn $ "Writer calls getPortWrite"
getPortWrite pool
else do
--putStrLn $ "Writer calls getPort"
getPort getter pool
msg <- atomically $ readTChan chanTo
writeMessage port msg
putPort pool portNumber
workers isClient initStan chanTo chanFrom = do
var <- atomically $ newTVar $ M.empty
cnt <- atomically $ newTVar initStan
let logName = if isClient
then "client-control.log"
else "server-control.log"
log <- openFile logName WriteMode
forkIO $ forever $ controller var log
replicateM_ 50 $ forkIO $ forever $ generator var cnt chanTo
replicateM_ 50 $ forkIO $ forever $ worker var chanTo chanFrom
threadDelay $ 100 * 1000*1000
controller var log = do
threadDelay $ 1000*1000
(old,new,avg) <- do
atomically $ do
m <- readTVar var
let m' = M.mapMaybe control m
let avg = sum (map dt $ M.elems m) / fromIntegral (M.size m)
writeTVar var m'
return (M.size m, M.size m', avg)
time <- getCurrentTime
let timeStr = formatTime defaultTimeLocale "%F %T.%q" time
hPutStrLn log $ timeStr ++ "\t" ++ show old ++ "\t" ++ show (old-new) ++ "\t" ++ show avg
hFlush log
where
control p@(_,Nothing) = Just p
control _ = Nothing
dt (t1, Just t2) = diffUTCTime t2 t1
dt _ = 0
generator var cnt chanTo = do
delay <- randomRIO (100,500)
threadDelay $ delay*100
stan <- atomically $ do
x <- readTVar cnt
let y = x+10
writeTVar cnt y
return y
let msg = Message True stan
time <- getCurrentTime
atomically $ do
writeTChan chanTo msg
writeStartTime var stan time
writeStartTime var stan time = do
m <- readTVar var
let m' = M.insert stan (time, Nothing) m
writeTVar var m'
writeEndTime var stan = do
time <- getCurrentTime
atomically $ do
m <- readTVar var
case M.lookup stan m of
Nothing -> fail $ "Response without request: #" ++ show stan
Just (start,_) -> do
let m' = M.insert stan (start, Just time) m
writeTVar var m'
worker var chanTo chanFrom = do
msg <- atomically $ readTChan chanFrom
if isRequest msg
then do
res <- processRequest msg
atomically $ writeTChan chanTo res
else do
let stan = getStan msg
processResponse var stan msg
processRequest msg = do
delay <- randomRIO (1,5)
threadDelay $ delay*1000
let res = msg {getStan = getStan msg , isRequest = False}
putStr "<"
return res
processResponse var stan msg = do
putStrLn $ "Received response for request #" ++ show stan
writeEndTime var stan
putStr ">"
return ()
getClientAddr host port = do
addrinfos <- getAddrInfo Nothing (Just host) (Just $ show port)
--putStrLn $ "AddrInfos: " ++ show addrinfos
let serveraddr = head addrinfos
sock <- socket (addrFamily serveraddr) Stream defaultProtocol
--putStrLn $ "Socket: " ++ show sock
connect sock (addrAddress serveraddr)
putStrLn $ "Creating new client port #" ++ show port
return $ ClientPort sock
getServerAddr host port = do
addrinfos <- getAddrInfo (Just (defaultHints {addrFlags = [AI_PASSIVE]})) Nothing (Just $ show port)
let serveraddr = head addrinfos
sock <- socket (addrFamily serveraddr) Stream defaultProtocol
bindSocket sock (addrAddress serveraddr)
listen sock 100
(conn, _) <- accept sock
putStrLn $ "Creating new server port #" ++ show port
return $ ServerPort conn sock
main :: IO ()
main = withSocketsDo $ do
[cmd] <- getArgs
hSetBuffering stdout NoBuffering
let isClient = cmd == "client"
if isClient
then putStrLn "Starting client"
else putStrLn "Starting server"
let initStan = if isClient
then 10000
else 20000
talker isClient "127.0.0.1" 3001 3001 initStan
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment