Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
{-# LANGUAGE DeriveFunctor #-}
module PollingConsumer where
import Data.Time.Clock
import Control.Monad.Trans.Free (Free, FreeF(..), liftF, runFree)
import Control.Concurrent (threadDelay)
import System.Random (getStdRandom, random, randomR)
import Text.Printf (printf)
-- "Types prevent typos" - https://twitter.com/hmemcpy/status/867647943108681728
newtype PollDuration = PollDuration NominalDiffTime deriving (Eq, Show)
newtype IdleDuration = IdleDuration NominalDiffTime deriving (Eq, Show)
newtype HandleDuration = HandleDuration NominalDiffTime deriving (Eq, Show)
data CycleDuration = CycleDuration
{ pollDuration :: PollDuration, handleDuration :: HandleDuration }
deriving (Eq, Show)
-- State machine state
data PollingState msg =
Ready [CycleDuration]
| ReceivedMessage [CycleDuration] PollDuration msg
| NoMessage [CycleDuration] PollDuration
| Stopped [CycleDuration]
deriving (Show)
-- Instruction set
data PollingInstruction msg next =
CurrentTime (UTCTime -> next)
| Poll ((Maybe msg, PollDuration) -> next)
| Handle msg (HandleDuration -> next)
| Idle IdleDuration (IdleDuration -> next)
deriving (Functor)
type PollingProgram msg = Free (PollingInstruction msg)
currentTime :: PollingProgram msg UTCTime
currentTime = liftF (CurrentTime id)
poll :: PollingProgram msg (Maybe msg, PollDuration)
poll = liftF (Poll id)
handle :: msg -> PollingProgram msg HandleDuration
handle msg = liftF (Handle msg id)
idle :: IdleDuration -> PollingProgram msg IdleDuration
idle d = liftF (Idle d id)
-- Support functions
calculateExpectedDuration :: NominalDiffTime
-> [CycleDuration]
-> NominalDiffTime
calculateExpectedDuration estimatedDuration [] = estimatedDuration
calculateExpectedDuration _ statistics =
toEnum $ fromEnum $ avg + stdDev * 3
where
fromCycleDuration :: CycleDuration -> Float
fromCycleDuration (CycleDuration (PollDuration pd) (HandleDuration hd)) =
toEnum $ fromEnum $ pd + hd
durations = fmap fromCycleDuration statistics
l = toEnum $ length durations
avg = sum durations / l
stdDev = sqrt (sum (fmap (\x -> (x - avg) ** 2) durations) / l)
shouldIdle :: IdleDuration -> UTCTime -> PollingProgram msg Bool
shouldIdle (IdleDuration d) stopBefore = do
now <- currentTime
return $ d `addUTCTime` now < stopBefore
shouldPoll :: NominalDiffTime
-> UTCTime
-> [CycleDuration]
-> PollingProgram msg Bool
shouldPoll estimatedDuration stopBefore statistics = do
let expectedHandleDuration =
calculateExpectedDuration estimatedDuration statistics
now <- currentTime
return $ expectedHandleDuration `addUTCTime` now < stopBefore
-- Transitions
transitionFromReady :: NominalDiffTime
-> UTCTime
-> [CycleDuration]
-> PollingProgram msg (PollingState msg)
transitionFromReady estimatedDuration stopBefore statistics = do
b <- shouldPoll estimatedDuration stopBefore statistics
if b
then do
pollResult <- poll
case pollResult of
(Just msg, pd) -> return $ ReceivedMessage statistics pd msg
(Nothing , pd) -> return $ NoMessage statistics pd
else return $ Stopped statistics
transitionFromNoMessage :: IdleDuration
-> UTCTime
-> [CycleDuration]
-> PollingProgram msg (PollingState msg)
transitionFromNoMessage d stopBefore statistics = do
b <- shouldIdle d stopBefore
if b
then idle d >> return (Ready statistics)
else return $ Stopped statistics
transitionFromReceived :: [CycleDuration]
-> PollDuration
-> msg
-> PollingProgram msg (PollingState msg)
transitionFromReceived statistics pd msg = do
hd <- handle msg
return $ Ready (CycleDuration pd hd : statistics)
transitionFromStopped :: Monad m => [CycleDuration] -> m (PollingState msg)
transitionFromStopped statistics = return $ Stopped statistics
transition :: NominalDiffTime
-> IdleDuration
-> UTCTime
-> PollingState msg
-> PollingProgram msg (PollingState msg)
transition estimatedDuration idleDuration stopBefore state =
case state of
Ready stats -> transitionFromReady estimatedDuration stopBefore stats
ReceivedMessage stats pd msg -> transitionFromReceived stats pd msg
NoMessage stats _ -> transitionFromNoMessage idleDuration stopBefore stats
Stopped stats -> transitionFromStopped stats
-- 'UI'
report :: PollingState a -> Int
report (Ready statistics) = length statistics
report (ReceivedMessage statistics _ _) = length statistics
report (NoMessage statistics _) = length statistics
report (Stopped statistics) = length statistics
-- Cheating; pretend that unit is a message type
type Message = ()
-- Implementations; cheating across the board, pretending to do real work
pollImp :: IO (Maybe Message, PollDuration)
pollImp = do
started <- getCurrentTime
pd <- getStdRandom (randomR (100000, 1000000))
putStrLn "Polling"
threadDelay pd
hasMessage <- getStdRandom random
stopped <- getCurrentTime
let pd' = PollDuration $ stopped `diffUTCTime` started
if hasMessage
then return (Just (), pd')
else return (Nothing, pd')
handleImp :: a -> IO HandleDuration
handleImp _ = do
started <- getCurrentTime
hd <- getStdRandom (randomR (100000, 1000000))
putStrLn " Handling"
threadDelay hd
stopped <- getCurrentTime
return $ HandleDuration $ stopped `diffUTCTime ` started
idleImp :: IdleDuration -> IO IdleDuration
idleImp (IdleDuration d) = do
started <- getCurrentTime
putStrLn " Sleeping"
threadDelay $ fromEnum $ d / 1000000
stopped <- getCurrentTime
return $ IdleDuration $ stopped `diffUTCTime ` started
-- Impure interpreter
interpret :: PollingProgram Message a -> IO a
interpret program =
case runFree program of
Pure r -> return r
Free (CurrentTime next) -> getCurrentTime >>= interpret . next
Free (Poll next) -> pollImp >>= interpret . next
Free (Handle msg next) -> handleImp msg >>= interpret . next
Free (Idle d next) -> idleImp d >>= interpret . next
-- Execution
run :: NominalDiffTime
-> IdleDuration
-> UTCTime
-> PollingState Message
-> IO (PollingState Message)
run estimatedDuration idleDuration stopBefore state = do
ns <- interpret $ transition estimatedDuration idleDuration stopBefore state
case ns of
Stopped _ -> return ns
_ -> run estimatedDuration idleDuration stopBefore ns
main :: IO ()
main = do
timeAtEntry <- getCurrentTime
let estimatedDuration = 2
let idleDuration = IdleDuration 5
let stopBefore = addUTCTime 60 timeAtEntry
s <- run estimatedDuration idleDuration stopBefore $ Ready []
timeAtExit <- getCurrentTime
putStrLn $ "Elapsed time: " ++ show (diffUTCTime timeAtExit timeAtEntry)
putStrLn $ printf "%d message(s) handled." $ report s

moodmosaic commented Jul 19, 2017

I've created a gist with the contents of a Stack project for building the above module.

Usage:

$ stack build
$ stack ghci
:set prompt "λ "
λ main

Output:

Polling
 Handling
Polling
 Handling
Polling
 Handling
Polling
 Handling
Polling
 Sleeping
Polling
 Sleeping
Polling
 Handling
Polling
 Sleeping
Polling
 Handling
Polling
 Sleeping
Polling
 Sleeping
Polling
 Sleeping
Polling
 Sleeping
Polling
 Handling
Polling
 Sleeping
Polling
 Sleeping
Polling
 Handling
Elapsed time: 58.5215037s
8 message(s) handled.
λ
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment