Skip to content

Instantly share code, notes, and snippets.

@benkolera
Last active June 9, 2017 19:12
Show Gist options
  • Save benkolera/9427813 to your computer and use it in GitHub Desktop.
Save benkolera/9427813 to your computer and use it in GitHub Desktop.
Websocket <-> TCP Proxy : First Go
module Main where
import Prelude hiding (mapM_)
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (STM,atomically)
import Control.Concurrent.STM.TQueue
( TQueue
, newTQueue
, readTQueue
, writeTQueue )
import Control.Concurrent.STM.TVar (TVar,newTVar,readTVar,modifyTVar)
import Control.Error (EitherT,tryIO,eitherT,left,fmapLT,failWith)
import Control.Exception (handle,IOException,finally)
import Control.Monad (join)
import Data.ByteString (ByteString,empty)
import Data.ByteString.Char8 (pack,unpack)
import Data.Foldable (mapM_)
import Data.Map (Map)
import qualified Data.Map as M
import Data.UUID (UUID)
import Data.UUID.V4 (nextRandom)
import Network.HTTP.Types (decodePath)
import Network.Simple.TCP (closeSock)
import Network.Wai (Application)
import qualified Network.Wai.Handler.Warp as Warp
import qualified Network.Wai.Handler.WebSockets as WaiWS
import qualified Network.WebSockets as WS
import Pipes
import Pipes.Network.TCP
( fromSocketTimeout
, toSocketTimeout
, connectSock
, SockAddr
, Socket )
port :: Int
port = 5002
timeout :: Int
timeout = 30000000
type Port = Int
type Hostname = String
data ProxyConnection = ProxyConnection {
webSocket :: WS.Connection
, tcpSocket :: Socket
, tcpQueue :: TQueue ByteString
}
type ProxyState = Map UUID ProxyConnection
main :: IO ()
main = do
state <- atomically $ newTVar M.empty
server state
server :: TVar ProxyState -> IO ()
server state = Warp.runSettings Warp.defaultSettings {
Warp.settingsPort = port ,
Warp.settingsIntercept = WaiWS.intercept (handleNewConnection state)
} app
app :: Network.Wai.Application
app = undefined
data ConnectError =
TokenNotProvided
| TokenNotFound
| ApiConnectionFailed Port Hostname IOException
| UpstreamConnectionFailed Port Hostname IOException
deriving Show
lookupToken :: String -> EitherT ConnectError IO (Port,Hostname)
lookupToken "ass" = return (1337,"localhost")
lookupToken _ = left TokenNotFound
connectUpstream :: (Port,Hostname) -> EitherT ConnectError IO (Socket,SockAddr)
connectUpstream (p,hn) =
fmapLT (UpstreamConnectionFailed p hn) . tryIO $ connectSock hn (show p)
socketFromRequest :: WS.PendingConnection -> EitherT ConnectError IO (Socket,SockAddr)
socketFromRequest p = parseRequestToken p >>= lookupToken >>= connectUpstream
where
parseRequestToken =
failWith TokenNotProvided .
fmap unpack .
join .
lookup (pack "token") .
snd .
decodePath .
WS.requestPath .
WS.pendingRequest
insertNewConnection :: TVar ProxyState -> UUID -> ProxyConnection -> STM ()
insertNewConnection st uuid conn = modifyTVar st (M.insert uuid conn)
setupPipes :: TVar ProxyState -> UUID -> Socket -> IO ()
setupPipes state uuid tcp = do
_ <- forkIO (runEffect $ untilProxyGone produceWebSocketBytes >-> tcpConsumer)
_ <- forkIO (finally (runEffect $ tcpProducer >-> untilProxyGone consumeTcpBytes) tcpCleanUp)
return ()
where
tcpProducer = fromSocketTimeout timeout tcp 4096
tcpConsumer = toSocketTimeout timeout tcp
tcpCleanUp = do
closeSock tcp
webSockMaybe <- atomically $ findConnection state uuid
mapM_ ((`WS.sendClose` empty) . webSocket) webSockMaybe
atomically $ deleteConnection state uuid
untilProxyGone f = do
conn <- liftIO . atomically $ findConnection state uuid
maybe (return ()) (\c -> f c >> untilProxyGone f) conn
produceWebSocketBytes conn = do
bytes <- liftIO . atomically . readTQueue . tcpQueue $ conn
yield bytes
consumeTcpBytes conn = do
bytes <- await
liftIO $ WS.sendBinaryData (webSocket conn) bytes
handleNewConnection :: TVar ProxyState -> WS.ServerApp
handleNewConnection state pending =
eitherT reject accept (socketFromRequest pending)
where
reject err = do
print err
WS.rejectRequest pending empty
accept (tcp,_) = do
ws <- WS.acceptRequest pending
uuid <- nextRandom
atomically $ do
q <- newTQueue
insertNewConnection state uuid (ProxyConnection ws tcp q)
setupPipes state uuid tcp
clientLoop state uuid
findConnection :: TVar ProxyState -> UUID -> STM (Maybe ProxyConnection)
findConnection state uuid = fmap (M.lookup uuid) . readTVar $ state
deleteConnection :: TVar ProxyState -> UUID -> STM ()
deleteConnection state uuid = modifyTVar state (M.delete uuid)
clientLoop :: TVar ProxyState -> UUID -> IO ()
clientLoop state uuid = do
connMaybe <- atomically $ findConnection state uuid
case connMaybe of
Nothing -> return ()
Just conn -> do
_ <- handle (disconnect conn) (WS.receiveData (webSocket conn) >>= handleMsg conn)
clientLoop state uuid
where
disconnect conn WS.ConnectionClosed = do
closeSock . tcpSocket $ conn
liftIO $ atomically $ modifyTVar state (M.delete uuid)
handleMsg conn bs = atomically $ writeTQueue (tcpQueue conn) bs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment