Created
November 29, 2018 13:37
-
-
Save jarnaldich/3c00aae02bf88a0f633e8256551b5a39 to your computer and use it in GitHub Desktop.
[Concurrent Curl in Haskell] Concurrency in Haskell #haskell #concurrency #parallel #network
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env stack | |
{- stack --resolver lts-12.20 --install-ghc runghc | |
--package random --package async --package stm-chans --package wreq --package cassava | |
-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
{-# LANGUAGE NamedFieldPuns #-} | |
{-# LANGUAGE DeriveGeneric #-} | |
import Control.Exception | |
import Control.Concurrent.Async (mapConcurrently, concurrently) | |
import Control.Concurrent.STM (atomically) | |
import Control.Concurrent.STM.TBMChan (readTBMChan, writeTBMChan, newTBMChan, closeTBMChan) | |
import System.Random (randomRIO) | |
import Text.Printf (printf) | |
import Network.Wreq | |
import Control.Lens | |
import Data.Time.Clock (diffUTCTime, getCurrentTime) | |
import Data.Csv | |
import qualified Data.ByteString.Lazy as BL | |
import qualified Data.Vector as V | |
workerCount = 32 | |
workloadCount = 10000 | |
-- Automagically parsed from type definition | |
data Tile = Tile { x :: !Int, y :: !Int, z :: !Int } deriving (Show) | |
instance FromNamedRecord Tile where | |
parseNamedRecord r = Tile <$> r.:"x" <*> r.:"y" <*> r.:"z" | |
data Measure = Measure { tile :: !Tile | |
, elapsedTime :: !Double | |
, status :: !Int | |
, contentType :: !String | |
, server :: !String | |
} deriving (Show) | |
data MeasurePair = MeasurePair { pairTile :: !Tile | |
, server1 :: !String | |
, server2 :: !String | |
, contentType1 :: !String | |
, contentType2 :: !String | |
, status1 :: !Int | |
, status2 :: !Int | |
, elapsedTime1 :: !Double | |
, elapsedTime2 :: !Double } | |
instance Show MeasurePair where | |
show MeasurePair { pairTile, server1, server2, contentType1, contentType2, status1, status2, elapsedTime1, elapsedTime2 } = | |
printf "%d,%d,%d,%s,%s,%s,%s,%d,%d,%f,%f" | |
(z pairTile) (x pairTile) (y pairTile) | |
server1 server2 | |
contentType1 contentType2 | |
status1 status2 | |
elapsedTime1 elapsedTime2 | |
buildUrl :: String -> Tile -> String | |
buildUrl server Tile {x,y,z} = printf "http://%s/icc_mapesmultibase/utm/wmts/orto/UTM25831/%d/%d/%d.jpeg" server z x y | |
buildResultLine :: Tile -> Measure -> Measure -> String | |
buildResultLine t m1 m2 = | |
printf "%d,%d,%d,%s,%s,%s,%s,%s,%s,%d,%d,%f,%f" | |
(x t) (y t) (z t) | |
(server m1) (server m2) | |
(contentType m1) (contentType m2) | |
(status m1) (status m2) | |
(elapsedTime m1) (elapsedTime m2) | |
measure server tile = do | |
startTime <- getCurrentTime | |
let url = buildUrl server tile | |
r <- get url | |
let status = r ^. responseStatus . statusCode | |
let contentType = show $ r ^. responseHeader "content-type" | |
endTime <- getCurrentTime | |
let elapsedTime = realToFrac (diffUTCTime endTime startTime) :: Double | |
return Measure { tile, elapsedTime, status, contentType, server } | |
-- Not meaningfully changed from above, just some slight style | |
-- tweaks. Compare and contrast with the previous version if desired | |
-- :) | |
worker requestChan responseChan workerId = do | |
let loop = do | |
mint <- atomically $ readTBMChan requestChan | |
case mint of | |
Nothing -> return () | |
Just t -> do | |
eitherM1 <- try $ measure "172.30.2.9" t | |
eitherM2 <- try $ measure "172.30.2.8" t | |
case (eitherM1, eitherM2) of | |
(Right m1, Right m2) -> atomically $ do | |
writeTBMChan responseChan $ MeasurePair { pairTile=t | |
, server1=(server m1), server2=(server m2) | |
, contentType1=(contentType m1), contentType2=(contentType m2) | |
, status1=(status m1), status2=(status m2) | |
, elapsedTime1=(elapsedTime m1), elapsedTime2=(elapsedTime m2) } | |
-- writeTBMChan responseChan m2 | |
(Left (e :: SomeException) , _) -> return () | |
(_, Left (e2 :: SomeException)) -> return () | |
loop | |
loop | |
main = do | |
-- Create our communication channels. Now the response channel is | |
-- also bounded and closable. | |
requestChan <- atomically $ newTBMChan (workerCount * 2) | |
responseChan <- atomically $ newTBMChan (workerCount * 2) | |
-- We're going to have three main threads. Let's define them all | |
-- here. Note that we're _defining_ an action to be run, not | |
-- running it yet! We'll run them below. | |
let | |
-- runWorkers is going to run all of the worker threads | |
runWorkers = do | |
-- mapConcurrently runs each function in a separate thread | |
-- with a different argument from the list, and then waits | |
-- for them all to finish. If any of them throw an | |
-- exception, all of the other threads are killed, and | |
-- then the exception is rethrown. | |
mapConcurrently (worker requestChan responseChan) [1..workerCount] | |
-- Workers are all done, so close the response channel | |
atomically $ closeTBMChan responseChan | |
-- Fill up the request channel, exactly the same as before | |
fillRequests = do | |
csvData <-BL.readFile "tiles_25831_v2.csv" | |
case decodeByName csvData of | |
Left err -> putStrLn err | |
Right (_, v) -> V.forM_ v $ \ t -> | |
atomically $ writeTBMChan requestChan t | |
atomically $ closeTBMChan requestChan | |
-- Print each result | |
printResults = do | |
-- Grab a response if available | |
mres <- atomically $ readTBMChan responseChan | |
case mres of | |
-- No response available, so exit | |
Nothing -> return () | |
-- We got a response, so... | |
Just m -> do | |
-- Print it... | |
putStrLn $ show m | |
-- And loop! | |
printResults | |
-- Now that we've defined our actions, we can use concurrently to | |
-- run all of them. This works just like mapConcurrently: it forks | |
-- a thread for each action and waits for all threads to exit | |
-- successfully. If any thread dies with an exception, the other | |
-- threads are killed and the exception is rethrown. | |
runWorkers `concurrently` fillRequests `concurrently` printResults | |
return () | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment