Created
September 19, 2014 10:06
-
-
Save portnov/808835f1fdbd53444713 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE OverloadedStrings #-} | |
import Control.Monad | |
import Control.Applicative | |
import Data.Binary | |
import qualified Data.Map as M | |
import Network.Socket hiding (recv) | |
import qualified Data.ByteString as B | |
import qualified Data.ByteString.Lazy as L | |
import Network.Socket.ByteString (recv, sendAll) | |
import Control.Concurrent | |
import Control.Concurrent.STM | |
import Data.Word | |
import System.Environment | |
import System.Random | |
import System.IO | |
import Data.Time | |
import Data.Time.Format | |
import System.Locale | |
fromStrict :: B.ByteString -> L.ByteString | |
fromStrict s = L.fromChunks [s] | |
toStrict :: L.ByteString -> B.ByteString | |
toStrict s = B.concat $ L.toChunks s | |
data Message = Message { | |
isRequest :: Bool | |
, getStan :: Word64 | |
} | |
deriving (Eq) | |
instance Show Message where | |
show msg | |
| isRequest msg = "[Request #" ++ show (getStan msg) ++ "]" | |
| otherwise = "[Response #" ++ show (getStan msg) ++ "]" | |
instance Binary Message where | |
get = Message <$> get <*> get | |
put (Message rq stan) = put rq >> put stan | |
data ExtPort = | |
ClientPort Socket | |
| ServerPort Socket Socket | |
deriving (Eq,Show) | |
data Pool = Pool { | |
pHost :: String | |
, pMinPort :: Int | |
, pMaxPort :: Int | |
, pPorts :: TVar( M.Map Int (ExtPort, Int) ) | |
} | |
deriving (Eq) | |
createPool :: String -> Int -> Int -> IO Pool | |
createPool host minPort maxPort = do | |
var <- atomically $ newTVar M.empty | |
return $ Pool host minPort maxPort var | |
getSocket :: ExtPort -> Socket | |
getSocket (ClientPort sock) = sock | |
getSocket (ServerPort conn sock) = conn | |
readN :: Word16 -> Socket -> IO B.ByteString | |
readN sz sock = go sz | |
where | |
go n = do | |
d <- recv sock (fromIntegral n) | |
let m = fromIntegral $ B.length d | |
if m < n | |
then go (n-m) | |
else return d | |
readMessage :: ExtPort -> IO Message | |
readMessage port = do | |
let sock = getSocket port | |
lenStr <- readN 2 sock | |
let len = decode (fromStrict lenStr) :: Word16 | |
bin <- readN len sock | |
let msg = decode $ fromStrict bin | |
--putStrLn $ "Read message #" ++ show msg ++ " from port " ++ show port | |
return msg | |
writeMessage :: ExtPort -> Message -> IO () | |
writeMessage port msg = do | |
let sock = getSocket port | |
let bin = encode msg | |
let len = fromIntegral (L.length bin) :: Word16 | |
let lenStr = encode len | |
sendAll sock $ toStrict $ lenStr `L.append` bin | |
--putStrLn $ "Wrote message #" ++ show msg ++ " to port " ++ show port | |
withServerPort serveraddr initStan talk = do | |
sock <- socket (addrFamily serveraddr) Stream defaultProtocol | |
bindSocket sock (addrAddress serveraddr) | |
listen sock 100 | |
(conn, _) <- accept sock | |
talk initStan conn | |
sClose conn | |
sClose sock | |
talker isClient host minPort maxPort initStan = do | |
chanFrom <- atomically $ newTChan | |
chanTo <- atomically $ newTChan | |
let getter = if isClient | |
then getClientAddr | |
else getServerAddr | |
pool <- createPool host minPort maxPort | |
if isClient | |
then receiverClient pool chanTo chanFrom | |
else receiverServer pool chanTo chanFrom | |
forkIO $ do | |
--threadDelay $ 1000*1000 | |
writer isClient getter pool chanTo chanFrom | |
workers isClient initStan chanTo chanFrom | |
getPort :: (String -> Int -> IO ExtPort) -> Pool -> IO (Int, ExtPort) | |
getPort getSocket p = do | |
port <- randomRIO (pMinPort p, pMaxPort p) | |
ports <- atomically $ readTVar (pPorts p) | |
--putStrLn $ "Ports: " ++ show (M.keys ports) ++ ", selecting #" ++ show port | |
case M.lookup port ports of | |
Just (sock, n) -> do | |
--putStrLn $ "Using existing port #" ++ show port | |
atomically $ do | |
ports <- readTVar (pPorts p) | |
writeTVar (pPorts p) $ M.insert port (sock, n+1) ports | |
return (port, sock) | |
Nothing -> do | |
sock <- getSocket (pHost p) port | |
atomically $ do | |
ports <- readTVar (pPorts p) | |
writeTVar (pPorts p) $ M.insert port (sock, 1) ports | |
return (port, sock) | |
getPortWrite :: Pool -> IO (Int, ExtPort) | |
getPortWrite p = do | |
portsMap <- atomically $ do | |
m <- readTVar (pPorts p) | |
when (M.null m) retry | |
return m | |
let ports = M.keys portsMap | |
--putStrLn $ "Ports: " ++ show ports | |
portIdx <- randomRIO (0, length ports - 1) | |
let port = ports !! portIdx | |
let Just (sock, n) = M.lookup port portsMap | |
--putStrLn $ "Using existing port #" ++ show port | |
atomically $ do | |
ports <- readTVar (pPorts p) | |
writeTVar (pPorts p) $ M.insert port (sock, n+1) ports | |
return (port, sock) | |
putPort :: Pool -> Int -> IO () | |
putPort p port = atomically $ do | |
ports <- readTVar (pPorts p) | |
case M.lookup port ports of | |
Just (s,n) -> writeTVar (pPorts p) $ M.insert port (s, n-1) ports | |
Nothing -> fail $ "Try to close unopened port" | |
openAllServerPorts :: Pool -> IO () | |
openAllServerPorts p = do | |
forM_ [pMinPort p .. pMaxPort p] $ \portNumber -> do | |
sock <- getServerAddr (pHost p) portNumber | |
atomically $ do | |
ports <- readTVar (pPorts p) | |
writeTVar (pPorts p) $ M.insert portNumber (sock, 1) ports | |
closePort :: ExtPort -> IO () | |
closePort (ClientPort sock) = sClose sock | |
closePort (ServerPort conn sock) = do | |
sClose conn | |
sClose sock | |
receiverServer p chanTo chanFrom = do | |
forM_ [pMinPort p .. pMaxPort p] $ \portNumber -> forkIO $ do | |
port <- getServerAddr (pHost p) portNumber | |
ports <- atomically $ do | |
ports <- readTVar (pPorts p) | |
let ports' = M.insert portNumber (port, 1) ports | |
writeTVar (pPorts p) ports' | |
return ports' | |
--putStrLn $ "Listening: " ++ show ports | |
forever $ receiver chanFrom port | |
receiverClient pool chanTo chanFrom = do | |
forkIO $ forever $ do | |
ports <- atomically $ do | |
m <- readTVar (pPorts pool) | |
return $ map fst $ M.elems m | |
--putStrLn $ "Ports to receive: " ++ show (length ports) | |
t <- atomically $ newTVar (length ports) | |
forM_ ports $ \port -> forkIO $ do | |
receiver chanFrom port | |
atomically $ modifyTVar t $ \n -> (n-1) | |
atomically $ do | |
n <- readTVar t | |
when (n > 0) retry | |
return () | |
receiver chanFrom port = do | |
--putStrLn $ "Receiver: " ++ show port | |
msg <- readMessage port | |
atomically $ writeTChan chanFrom msg | |
writer :: Bool -> (String -> Int -> IO ExtPort) -> Pool -> TChan Message -> TChan Message -> IO () | |
writer isClient getter pool chanTo chanFrom = forever $ do | |
--threadDelay $ 100*1000 | |
(portNumber,port) <- if not isClient | |
then do | |
--putStrLn $ "Writer calls getPortWrite" | |
getPortWrite pool | |
else do | |
--putStrLn $ "Writer calls getPort" | |
getPort getter pool | |
msg <- atomically $ readTChan chanTo | |
writeMessage port msg | |
putPort pool portNumber | |
workers isClient initStan chanTo chanFrom = do | |
var <- atomically $ newTVar $ M.empty | |
cnt <- atomically $ newTVar initStan | |
let logName = if isClient | |
then "client-control.log" | |
else "server-control.log" | |
log <- openFile logName WriteMode | |
forkIO $ forever $ controller var log | |
replicateM_ 50 $ forkIO $ forever $ generator var cnt chanTo | |
replicateM_ 50 $ forkIO $ forever $ worker var chanTo chanFrom | |
threadDelay $ 100 * 1000*1000 | |
controller var log = do | |
threadDelay $ 1000*1000 | |
(old,new,avg) <- do | |
atomically $ do | |
m <- readTVar var | |
let m' = M.mapMaybe control m | |
let avg = sum (map dt $ M.elems m) / fromIntegral (M.size m) | |
writeTVar var m' | |
return (M.size m, M.size m', avg) | |
time <- getCurrentTime | |
let timeStr = formatTime defaultTimeLocale "%F %T.%q" time | |
hPutStrLn log $ timeStr ++ "\t" ++ show old ++ "\t" ++ show (old-new) ++ "\t" ++ show avg | |
hFlush log | |
where | |
control p@(_,Nothing) = Just p | |
control _ = Nothing | |
dt (t1, Just t2) = diffUTCTime t2 t1 | |
dt _ = 0 | |
generator var cnt chanTo = do | |
delay <- randomRIO (100,500) | |
threadDelay $ delay*100 | |
stan <- atomically $ do | |
x <- readTVar cnt | |
let y = x+10 | |
writeTVar cnt y | |
return y | |
let msg = Message True stan | |
time <- getCurrentTime | |
atomically $ do | |
writeTChan chanTo msg | |
writeStartTime var stan time | |
writeStartTime var stan time = do | |
m <- readTVar var | |
let m' = M.insert stan (time, Nothing) m | |
writeTVar var m' | |
writeEndTime var stan = do | |
time <- getCurrentTime | |
atomically $ do | |
m <- readTVar var | |
case M.lookup stan m of | |
Nothing -> fail $ "Response without request: #" ++ show stan | |
Just (start,_) -> do | |
let m' = M.insert stan (start, Just time) m | |
writeTVar var m' | |
worker var chanTo chanFrom = do | |
msg <- atomically $ readTChan chanFrom | |
if isRequest msg | |
then do | |
res <- processRequest msg | |
atomically $ writeTChan chanTo res | |
else do | |
let stan = getStan msg | |
processResponse var stan msg | |
processRequest msg = do | |
delay <- randomRIO (1,5) | |
threadDelay $ delay*1000 | |
let res = msg {getStan = getStan msg , isRequest = False} | |
putStr "<" | |
return res | |
processResponse var stan msg = do | |
putStrLn $ "Received response for request #" ++ show stan | |
writeEndTime var stan | |
putStr ">" | |
return () | |
getClientAddr host port = do | |
addrinfos <- getAddrInfo Nothing (Just host) (Just $ show port) | |
--putStrLn $ "AddrInfos: " ++ show addrinfos | |
let serveraddr = head addrinfos | |
sock <- socket (addrFamily serveraddr) Stream defaultProtocol | |
--putStrLn $ "Socket: " ++ show sock | |
connect sock (addrAddress serveraddr) | |
putStrLn $ "Creating new client port #" ++ show port | |
return $ ClientPort sock | |
getServerAddr host port = do | |
addrinfos <- getAddrInfo (Just (defaultHints {addrFlags = [AI_PASSIVE]})) Nothing (Just $ show port) | |
let serveraddr = head addrinfos | |
sock <- socket (addrFamily serveraddr) Stream defaultProtocol | |
bindSocket sock (addrAddress serveraddr) | |
listen sock 100 | |
(conn, _) <- accept sock | |
putStrLn $ "Creating new server port #" ++ show port | |
return $ ServerPort conn sock | |
main :: IO () | |
main = withSocketsDo $ do | |
[cmd] <- getArgs | |
hSetBuffering stdout NoBuffering | |
let isClient = cmd == "client" | |
if isClient | |
then putStrLn "Starting client" | |
else putStrLn "Starting server" | |
let initStan = if isClient | |
then 10000 | |
else 20000 | |
talker isClient "127.0.0.1" 3001 3001 initStan |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment