Last active
December 6, 2019 00:01
-
-
Save sordina/52e07f34abd6effd6dbbf5c6a782658a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE BlockArguments #-} | |
-- From: https://patchbay.pub | |
-- | |
-- Normal Usage: | |
-- Shell 1: curl localhost:9000/test1 | |
-- Shell 2: curl -s localhost:9000/test1 -X POST -T /dev/stdin | |
-- | |
-- Pub-Sub Usage: | |
-- Shell 1: curl 'localhost:9000/test2?pubsub=true' | |
-- Shell 2: curl 'localhost:9000/test2?pubsub=true' | |
-- Shell 3: curl -s 'localhost:9000/test2?pubsub=true' -X POST -T /dev/stdin | |
module Test where | |
import qualified Control.Concurrent.STM as STM | |
import Control.Monad | |
import Control.Arrow (first, second) | |
import Network.Wai | |
import Network.Wai.Handler.Warp | |
import Network.HTTP.Types | |
import Data.Map | |
import Data.Maybe | |
import Data.ByteString | |
import Data.ByteString.Builder | |
import Data.CaseInsensitive | |
import qualified Data.ByteString.Char8 as C8 | |
import Numeric.Natural | |
type Stream a = Q (Maybe a) | |
type Channels = Map ByteString (Stream ByteString) | |
data Q a = TQ (STM.TBQueue a) | TC (STM.TChan a) | |
main :: IO () | |
main = do | |
Prelude.putStrLn "Server Running on http://localhost:9000/" | |
channels <- STM.newTVarIO Data.Map.empty | |
run 9000 (handler channels) | |
handler :: STM.TVar Channels -> Application | |
handler channels req final = do | |
let rpi = rawPathInfo req | |
let qs = queryString req | |
Prelude.putStrLn "Path:" | |
print rpi | |
Prelude.putStrLn "Query:" | |
print qs | |
let pubsub = maybe False (== "true") $ join $ Prelude.lookup "pubsub" qs | |
let bufferSize = maybe 10 (fromIntegral . fst) $ ((join $ Prelude.lookup "buffer" qs) >>= C8.readInt) | |
let headers = mkHeaders qs | |
when pubsub do Prelude.putStrLn "Pub-Sub Enabled!" | |
channel' <- STM.atomically do | |
cs <- STM.readTVar channels | |
case Data.Map.lookup rpi cs of | |
Just ch -> return ch | |
Nothing -> do | |
c <- if pubsub | |
then newTChan --- "Unbounded..." | |
else newTBQueue bufferSize --- "Blocking..." | |
STM.modifyTVar channels (Data.Map.insert rpi c) | |
return c | |
case requestMethod req of | |
"GET" -> do | |
channel <- STM.atomically $ maybeDupQ channel' | |
final do | |
responseStream status200 (("Transfer-Encoding", "chunked") : headers) $ \send flush -> | |
consumeStream channel $ \c -> do | |
send (byteString c) | |
flush | |
"POST" -> do | |
let channel = channel' | |
consumeBody req $ \b -> STM.atomically $ writeQ channel (Just b) | |
STM.atomically do writeQ channel Nothing | |
final (responseLBS status200 [] "\n") | |
_ -> error "Unsupported HTTP method" | |
consumeStream :: Stream a -> (a -> IO ()) -> IO () | |
consumeStream c f = STM.atomically (readQ c) >>= maybe (return ()) (\x -> f x >> consumeStream c f) | |
consumeBody :: Request -> (ByteString -> IO ()) -> IO () | |
consumeBody req f = do | |
b <- getRequestBodyChunk req | |
when (not (Data.ByteString.null b)) do f b >> consumeBody req f | |
newTBQueue :: Natural -> STM.STM (Q a) | |
newTBQueue n = TQ <$> STM.newTBQueue n | |
newTChan :: STM.STM (Q a) | |
newTChan = TC <$> STM.newTChan | |
readQ :: Q a -> STM.STM a | |
readQ (TQ tq) = STM.readTBQueue tq | |
readQ (TC tc) = STM.readTChan tc | |
writeQ :: Q a -> a -> STM.STM () | |
writeQ (TQ tq) = STM.writeTBQueue tq | |
writeQ (TC tc) = STM.writeTChan tc | |
maybeDupQ :: Q a -> STM.STM (Q a) | |
maybeDupQ (TC tc) = TC <$> STM.dupTChan tc | |
maybeDupQ x = return x | |
mkHeaders :: Query -> [(CI ByteString, ByteString)] | |
mkHeaders qs = Prelude.map (second (fromMaybe "") . first (Data.CaseInsensitive.mk . Data.ByteString.drop 7)) $ Prelude.filter (Data.ByteString.isPrefixOf "header-" . fst) qs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment