Skip to content

Instantly share code, notes, and snippets.

@kazu-yamamoto
Created January 7, 2020 07:59
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save kazu-yamamoto/370d3ff3e0bc28bbfe6eb2ecc67feb95 to your computer and use it in GitHub Desktop.
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
-- |
-- To reproduce with curl, run mainTls (from ghci) and use
--
-- @
-- time curl -v --http2-prior-knowledge -H "Accept:text/event-stream" "http://127.0.0.1:8080"
-- @
--
module Events where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Catch
import Data.Binary.Builder (fromByteString)
import qualified Data.ByteString.Char8 as B8
import Data.IORef
import Network.HTTP.Types.Status
import Network.Wai (responseLBS, Application)
import Network.Wai.EventSource (ServerEvent(..), eventSourceAppIO)
import Network.Wai.Handler.Warp (run)
import System.Random
-- -------------------------------------------------------------------------- --
-- Main
main :: IO ()
main = do
c <- newIORef 10
withIntProducer 5 $ \p -> run 8080 (updatesHandler c p)
-- -------------------------------------------------------------------------- --
-- Application
updatesHandler :: IORef Int -> IntProducer -> Application
updatesHandler count prod req respond = withLimit respond $ do
c <- readIORef count -- can race but we don't care
print $ "start: " <> show c
cv <- return 0 >>= newIORef
-- An update stream is closed after @timeout@ seconds. We add some jitter to
-- availablility of streams is uniformily distributed over time and not
-- predictable.
--
jitter <- randomRIO @Double (0.9, 1.1)
timer <- registerDelay (round $ jitter * realToFrac timeout * 1_000_000)
do
r <- eventSourceAppIO (go timer cv) req respond
print $ "exit: " <> show c
return r
`catch` \(e :: SomeException) -> do
print e
throwM e
where
timeout = 240 :: Int
f :: Int -> ServerEvent
f c = ServerEvent (Just $ fromByteString "Event") Nothing [fromByteString $ B8.pack (show c)]
go :: TVar Bool -> IORef Int -> IO ServerEvent
go timer cv = do
c <- readIORef cv
-- await either a timeout or a new event
maybeInt <- atomically $ do
t <- readTVar timer
if t
then return Nothing
else Just <$> awaitNewInt prod c
case maybeInt of
Nothing -> return CloseEvent
Just c' -> do
writeIORef cv $! c'
return (f c')
withLimit respond' inner = bracket
(atomicModifyIORef' count $ \x -> (x - 1, x - 1))
(const $ atomicModifyIORef' count $ \x -> (x + 1, ()))
(\x -> if x <= 0 then ret503 respond' else inner)
ret503 respond' = do
respond' $ responseLBS status503 [] "No more update streams available currently. Retry later."
-- -------------------------------------------------------------------------- --
-- Producer of Integers
newtype IntProducer = IntProducer (TVar Int)
withIntProducer :: Int -> (IntProducer -> IO r) -> IO r
withIntProducer s inner = do
var <- newTVarIO 0
race (producer var) (inner $ IntProducer var) >>= \case
Left () -> error "producer failed"
Right a -> return a
where
producer var = forever $ do
jitter <- randomRIO @Double (0.9, 1.1)
threadDelay $ round $ jitter * 1_000_000
x <- randomRIO (0,s-1)
atomically $ writeTVar var x
awaitNewInt :: IntProducer -> Int -> STM Int
awaitNewInt (IntProducer var) c = do
x <- readTVar var
when (x == c) retry
return x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment