Skip to content

Instantly share code, notes, and snippets.

@sordina
Last active December 6, 2019 00:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sordina/52e07f34abd6effd6dbbf5c6a782658a to your computer and use it in GitHub Desktop.
Save sordina/52e07f34abd6effd6dbbf5c6a782658a to your computer and use it in GitHub Desktop.
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE BlockArguments #-}
-- From: https://patchbay.pub
--
-- Normal Usage:
-- Shell 1: curl localhost:9000/test1
-- Shell 2: curl -s localhost:9000/test1 -X POST -T /dev/stdin
--
-- Pub-Sub Usage:
-- Shell 1: curl 'localhost:9000/test2?pubsub=true'
-- Shell 2: curl 'localhost:9000/test2?pubsub=true'
-- Shell 3: curl -s 'localhost:9000/test2?pubsub=true' -X POST -T /dev/stdin
module Test where
import qualified Control.Concurrent.STM as STM
import Control.Monad
import Control.Arrow (first, second)
import Network.Wai
import Network.Wai.Handler.Warp
import Network.HTTP.Types
import Data.Map
import Data.Maybe
import Data.ByteString
import Data.ByteString.Builder
import Data.CaseInsensitive
import qualified Data.ByteString.Char8 as C8
import Numeric.Natural
type Stream a = Q (Maybe a)
type Channels = Map ByteString (Stream ByteString)
data Q a = TQ (STM.TBQueue a) | TC (STM.TChan a)
main :: IO ()
main = do
Prelude.putStrLn "Server Running on http://localhost:9000/"
channels <- STM.newTVarIO Data.Map.empty
run 9000 (handler channels)
handler :: STM.TVar Channels -> Application
handler channels req final = do
let rpi = rawPathInfo req
let qs = queryString req
Prelude.putStrLn "Path:"
print rpi
Prelude.putStrLn "Query:"
print qs
let pubsub = maybe False (== "true") $ join $ Prelude.lookup "pubsub" qs
let bufferSize = maybe 10 (fromIntegral . fst) $ ((join $ Prelude.lookup "buffer" qs) >>= C8.readInt)
let headers = mkHeaders qs
when pubsub do Prelude.putStrLn "Pub-Sub Enabled!"
channel' <- STM.atomically do
cs <- STM.readTVar channels
case Data.Map.lookup rpi cs of
Just ch -> return ch
Nothing -> do
c <- if pubsub
then newTChan --- "Unbounded..."
else newTBQueue bufferSize --- "Blocking..."
STM.modifyTVar channels (Data.Map.insert rpi c)
return c
case requestMethod req of
"GET" -> do
channel <- STM.atomically $ maybeDupQ channel'
final do
responseStream status200 (("Transfer-Encoding", "chunked") : headers) $ \send flush ->
consumeStream channel $ \c -> do
send (byteString c)
flush
"POST" -> do
let channel = channel'
consumeBody req $ \b -> STM.atomically $ writeQ channel (Just b)
STM.atomically do writeQ channel Nothing
final (responseLBS status200 [] "\n")
_ -> error "Unsupported HTTP method"
consumeStream :: Stream a -> (a -> IO ()) -> IO ()
consumeStream c f = STM.atomically (readQ c) >>= maybe (return ()) (\x -> f x >> consumeStream c f)
consumeBody :: Request -> (ByteString -> IO ()) -> IO ()
consumeBody req f = do
b <- getRequestBodyChunk req
when (not (Data.ByteString.null b)) do f b >> consumeBody req f
newTBQueue :: Natural -> STM.STM (Q a)
newTBQueue n = TQ <$> STM.newTBQueue n
newTChan :: STM.STM (Q a)
newTChan = TC <$> STM.newTChan
readQ :: Q a -> STM.STM a
readQ (TQ tq) = STM.readTBQueue tq
readQ (TC tc) = STM.readTChan tc
writeQ :: Q a -> a -> STM.STM ()
writeQ (TQ tq) = STM.writeTBQueue tq
writeQ (TC tc) = STM.writeTChan tc
maybeDupQ :: Q a -> STM.STM (Q a)
maybeDupQ (TC tc) = TC <$> STM.dupTChan tc
maybeDupQ x = return x
mkHeaders :: Query -> [(CI ByteString, ByteString)]
mkHeaders qs = Prelude.map (second (fromMaybe "") . first (Data.CaseInsensitive.mk . Data.ByteString.drop 7)) $ Prelude.filter (Data.ByteString.isPrefixOf "header-" . fst) qs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment