Last active
June 9, 2017 19:12
-
-
Save benkolera/9427813 to your computer and use it in GitHub Desktop.
Websocket <-> TCP Proxy : First Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Main where | |
import Prelude hiding (mapM_) | |
import Control.Concurrent (forkIO) | |
import Control.Concurrent.STM (STM,atomically) | |
import Control.Concurrent.STM.TQueue | |
( TQueue | |
, newTQueue | |
, readTQueue | |
, writeTQueue ) | |
import Control.Concurrent.STM.TVar (TVar,newTVar,readTVar,modifyTVar) | |
import Control.Error (EitherT,tryIO,eitherT,left,fmapLT,failWith) | |
import Control.Exception (handle,IOException,finally) | |
import Control.Monad (join) | |
import Data.ByteString (ByteString,empty) | |
import Data.ByteString.Char8 (pack,unpack) | |
import Data.Foldable (mapM_) | |
import Data.Map (Map) | |
import qualified Data.Map as M | |
import Data.UUID (UUID) | |
import Data.UUID.V4 (nextRandom) | |
import Network.HTTP.Types (decodePath) | |
import Network.Simple.TCP (closeSock) | |
import Network.Wai (Application) | |
import qualified Network.Wai.Handler.Warp as Warp | |
import qualified Network.Wai.Handler.WebSockets as WaiWS | |
import qualified Network.WebSockets as WS | |
import Pipes | |
import Pipes.Network.TCP | |
( fromSocketTimeout | |
, toSocketTimeout | |
, connectSock | |
, SockAddr | |
, Socket ) | |
port :: Int | |
port = 5002 | |
timeout :: Int | |
timeout = 30000000 | |
type Port = Int | |
type Hostname = String | |
data ProxyConnection = ProxyConnection { | |
webSocket :: WS.Connection | |
, tcpSocket :: Socket | |
, tcpQueue :: TQueue ByteString | |
} | |
type ProxyState = Map UUID ProxyConnection | |
main :: IO () | |
main = do | |
state <- atomically $ newTVar M.empty | |
server state | |
server :: TVar ProxyState -> IO () | |
server state = Warp.runSettings Warp.defaultSettings { | |
Warp.settingsPort = port , | |
Warp.settingsIntercept = WaiWS.intercept (handleNewConnection state) | |
} app | |
app :: Network.Wai.Application | |
app = undefined | |
data ConnectError = | |
TokenNotProvided | |
| TokenNotFound | |
| ApiConnectionFailed Port Hostname IOException | |
| UpstreamConnectionFailed Port Hostname IOException | |
deriving Show | |
lookupToken :: String -> EitherT ConnectError IO (Port,Hostname) | |
lookupToken "ass" = return (1337,"localhost") | |
lookupToken _ = left TokenNotFound | |
connectUpstream :: (Port,Hostname) -> EitherT ConnectError IO (Socket,SockAddr) | |
connectUpstream (p,hn) = | |
fmapLT (UpstreamConnectionFailed p hn) . tryIO $ connectSock hn (show p) | |
socketFromRequest :: WS.PendingConnection -> EitherT ConnectError IO (Socket,SockAddr) | |
socketFromRequest p = parseRequestToken p >>= lookupToken >>= connectUpstream | |
where | |
parseRequestToken = | |
failWith TokenNotProvided . | |
fmap unpack . | |
join . | |
lookup (pack "token") . | |
snd . | |
decodePath . | |
WS.requestPath . | |
WS.pendingRequest | |
insertNewConnection :: TVar ProxyState -> UUID -> ProxyConnection -> STM () | |
insertNewConnection st uuid conn = modifyTVar st (M.insert uuid conn) | |
setupPipes :: TVar ProxyState -> UUID -> Socket -> IO () | |
setupPipes state uuid tcp = do | |
_ <- forkIO (runEffect $ untilProxyGone produceWebSocketBytes >-> tcpConsumer) | |
_ <- forkIO (finally (runEffect $ tcpProducer >-> untilProxyGone consumeTcpBytes) tcpCleanUp) | |
return () | |
where | |
tcpProducer = fromSocketTimeout timeout tcp 4096 | |
tcpConsumer = toSocketTimeout timeout tcp | |
tcpCleanUp = do | |
closeSock tcp | |
webSockMaybe <- atomically $ findConnection state uuid | |
mapM_ ((`WS.sendClose` empty) . webSocket) webSockMaybe | |
atomically $ deleteConnection state uuid | |
untilProxyGone f = do | |
conn <- liftIO . atomically $ findConnection state uuid | |
maybe (return ()) (\c -> f c >> untilProxyGone f) conn | |
produceWebSocketBytes conn = do | |
bytes <- liftIO . atomically . readTQueue . tcpQueue $ conn | |
yield bytes | |
consumeTcpBytes conn = do | |
bytes <- await | |
liftIO $ WS.sendBinaryData (webSocket conn) bytes | |
handleNewConnection :: TVar ProxyState -> WS.ServerApp | |
handleNewConnection state pending = | |
eitherT reject accept (socketFromRequest pending) | |
where | |
reject err = do | |
print err | |
WS.rejectRequest pending empty | |
accept (tcp,_) = do | |
ws <- WS.acceptRequest pending | |
uuid <- nextRandom | |
atomically $ do | |
q <- newTQueue | |
insertNewConnection state uuid (ProxyConnection ws tcp q) | |
setupPipes state uuid tcp | |
clientLoop state uuid | |
findConnection :: TVar ProxyState -> UUID -> STM (Maybe ProxyConnection) | |
findConnection state uuid = fmap (M.lookup uuid) . readTVar $ state | |
deleteConnection :: TVar ProxyState -> UUID -> STM () | |
deleteConnection state uuid = modifyTVar state (M.delete uuid) | |
clientLoop :: TVar ProxyState -> UUID -> IO () | |
clientLoop state uuid = do | |
connMaybe <- atomically $ findConnection state uuid | |
case connMaybe of | |
Nothing -> return () | |
Just conn -> do | |
_ <- handle (disconnect conn) (WS.receiveData (webSocket conn) >>= handleMsg conn) | |
clientLoop state uuid | |
where | |
disconnect conn WS.ConnectionClosed = do | |
closeSock . tcpSocket $ conn | |
liftIO $ atomically $ modifyTVar state (M.delete uuid) | |
handleMsg conn bs = atomically $ writeTQueue (tcpQueue conn) bs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment