Skip to content

Instantly share code, notes, and snippets.

@jarnaldich
Created November 29, 2018 13:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jarnaldich/3c00aae02bf88a0f633e8256551b5a39 to your computer and use it in GitHub Desktop.
Save jarnaldich/3c00aae02bf88a0f633e8256551b5a39 to your computer and use it in GitHub Desktop.
[Concurrent Curl in Haskell] Concurrency in Haskell #haskell #concurrency #parallel #network
#!/usr/bin/env stack
{- stack --resolver lts-12.20 --install-ghc runghc
--package random --package async --package stm-chans --package wreq --package cassava
-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE DeriveGeneric #-}
import Control.Exception
import Control.Concurrent.Async (mapConcurrently, concurrently)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (readTBMChan, writeTBMChan, newTBMChan, closeTBMChan)
import System.Random (randomRIO)
import Text.Printf (printf)
import Network.Wreq
import Control.Lens
import Data.Time.Clock (diffUTCTime, getCurrentTime)
import Data.Csv
import qualified Data.ByteString.Lazy as BL
import qualified Data.Vector as V
workerCount = 32
workloadCount = 10000
-- Automagically parsed from type definition
data Tile = Tile { x :: !Int, y :: !Int, z :: !Int } deriving (Show)
instance FromNamedRecord Tile where
parseNamedRecord r = Tile <$> r.:"x" <*> r.:"y" <*> r.:"z"
data Measure = Measure { tile :: !Tile
, elapsedTime :: !Double
, status :: !Int
, contentType :: !String
, server :: !String
} deriving (Show)
data MeasurePair = MeasurePair { pairTile :: !Tile
, server1 :: !String
, server2 :: !String
, contentType1 :: !String
, contentType2 :: !String
, status1 :: !Int
, status2 :: !Int
, elapsedTime1 :: !Double
, elapsedTime2 :: !Double }
instance Show MeasurePair where
show MeasurePair { pairTile, server1, server2, contentType1, contentType2, status1, status2, elapsedTime1, elapsedTime2 } =
printf "%d,%d,%d,%s,%s,%s,%s,%d,%d,%f,%f"
(z pairTile) (x pairTile) (y pairTile)
server1 server2
contentType1 contentType2
status1 status2
elapsedTime1 elapsedTime2
buildUrl :: String -> Tile -> String
buildUrl server Tile {x,y,z} = printf "http://%s/icc_mapesmultibase/utm/wmts/orto/UTM25831/%d/%d/%d.jpeg" server z x y
buildResultLine :: Tile -> Measure -> Measure -> String
buildResultLine t m1 m2 =
printf "%d,%d,%d,%s,%s,%s,%s,%s,%s,%d,%d,%f,%f"
(x t) (y t) (z t)
(server m1) (server m2)
(contentType m1) (contentType m2)
(status m1) (status m2)
(elapsedTime m1) (elapsedTime m2)
measure server tile = do
startTime <- getCurrentTime
let url = buildUrl server tile
r <- get url
let status = r ^. responseStatus . statusCode
let contentType = show $ r ^. responseHeader "content-type"
endTime <- getCurrentTime
let elapsedTime = realToFrac (diffUTCTime endTime startTime) :: Double
return Measure { tile, elapsedTime, status, contentType, server }
-- Not meaningfully changed from above, just some slight style
-- tweaks. Compare and contrast with the previous version if desired
-- :)
worker requestChan responseChan workerId = do
let loop = do
mint <- atomically $ readTBMChan requestChan
case mint of
Nothing -> return ()
Just t -> do
eitherM1 <- try $ measure "172.30.2.9" t
eitherM2 <- try $ measure "172.30.2.8" t
case (eitherM1, eitherM2) of
(Right m1, Right m2) -> atomically $ do
writeTBMChan responseChan $ MeasurePair { pairTile=t
, server1=(server m1), server2=(server m2)
, contentType1=(contentType m1), contentType2=(contentType m2)
, status1=(status m1), status2=(status m2)
, elapsedTime1=(elapsedTime m1), elapsedTime2=(elapsedTime m2) }
-- writeTBMChan responseChan m2
(Left (e :: SomeException) , _) -> return ()
(_, Left (e2 :: SomeException)) -> return ()
loop
loop
main = do
-- Create our communication channels. Now the response channel is
-- also bounded and closable.
requestChan <- atomically $ newTBMChan (workerCount * 2)
responseChan <- atomically $ newTBMChan (workerCount * 2)
-- We're going to have three main threads. Let's define them all
-- here. Note that we're _defining_ an action to be run, not
-- running it yet! We'll run them below.
let
-- runWorkers is going to run all of the worker threads
runWorkers = do
-- mapConcurrently runs each function in a separate thread
-- with a different argument from the list, and then waits
-- for them all to finish. If any of them throw an
-- exception, all of the other threads are killed, and
-- then the exception is rethrown.
mapConcurrently (worker requestChan responseChan) [1..workerCount]
-- Workers are all done, so close the response channel
atomically $ closeTBMChan responseChan
-- Fill up the request channel, exactly the same as before
fillRequests = do
csvData <-BL.readFile "tiles_25831_v2.csv"
case decodeByName csvData of
Left err -> putStrLn err
Right (_, v) -> V.forM_ v $ \ t ->
atomically $ writeTBMChan requestChan t
atomically $ closeTBMChan requestChan
-- Print each result
printResults = do
-- Grab a response if available
mres <- atomically $ readTBMChan responseChan
case mres of
-- No response available, so exit
Nothing -> return ()
-- We got a response, so...
Just m -> do
-- Print it...
putStrLn $ show m
-- And loop!
printResults
-- Now that we've defined our actions, we can use concurrently to
-- run all of them. This works just like mapConcurrently: it forks
-- a thread for each action and waits for all threads to exit
-- successfully. If any thread dies with an exception, the other
-- threads are killed and the exception is rethrown.
runWorkers `concurrently` fillRequests `concurrently` printResults
return ()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment