Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@jamesthompson
Created December 1, 2020 16:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jamesthompson/8c2cc62df78fa08e03d9828b84354cf4 to your computer and use it in GitHub Desktop.
Save jamesthompson/8c2cc62df78fa08e03d9828b84354cf4 to your computer and use it in GitHub Desktop.
Example pub/sub grpc interface with fused-effects
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
module Effects.PubSub where
import qualified Control.Concurrent.Chan.Unagi.Bounded as U
import Data.ProtoLens.Message (defMessage)
import qualified Data.Text as T
import Effects.Crypto.GoogleCredentials
import Effects.Network.HTTP (Manager)
import Prelude
import Control.Lens
import Network.GRPC.Client
( CompressMode (Compressed),
IncomingEvent (..),
OutgoingEvent (..),
)
import Network.GRPC.Client.Helpers
import Network.GRPC.HTTP2.ProtoLens
import Network.GRPC.HTTP2.Types
( GRPCStatusCode (..),
grpcMessageH,
grpcStatusH,
statusCodeForTrailer,
)
import Network.HTTP2
( ErrorCodeId (..),
toErrorCodeId,
)
import Network.HTTP2.Client
( TooMuchConcurrency (..),
runClientIO,
)
import qualified Proto.Google.Pubsub.V1.Pubsub as P
import qualified Proto.Google.Pubsub.V1.Pubsub_Fields as PF
declareWrapped
[d|
newtype AckId = AckId Text deriving (Eq, Show)
newtype ClientId = ClientId Text deriving (Eq, Show)
newtype MessageId = MessageId Text deriving (Eq, Show)
newtype OrderingKey = OrderingKey Text deriving (Eq, Show)
newtype Seconds = Seconds Int32 deriving (Eq, Show)
newtype StreamBufferCapacity = StreamBufferCapacity Int deriving (Eq, Show)
newtype Subscription = Subscription Text deriving (Eq, Show)
newtype Topic = Topic Text deriving (Eq, Show)
|]
-- | PubSub error states
data PubSubError
= GrpcError (GRPCStatusCode, ByteString)
| TransportError ErrorCodeId
| TooFastSlowDown
| FatalClientErrorStreamEnded
deriving (Eq, Show)
data PubSub (m :: Type -> Type) (k :: Type) where
Publish ::
P.PublishRequest ->
PubSub m (Either PubSubError [MessageId])
Ack ::
P.AcknowledgeRequest ->
PubSub m (Either PubSubError ())
UnaryPull ::
P.PullRequest ->
PubSub m (Either PubSubError [P.ReceivedMessage])
data StreamingPubSub (subscription :: Symbol) (m :: Type -> Type) (k :: Type) where
StreamAck ::
Proxy subscription ->
[AckId] ->
StreamingPubSub subscription m ()
BumpAckDeadlines ::
Proxy subscription ->
[(AckId, Seconds)] ->
StreamingPubSub subscription m ()
StreamPull ::
Proxy subscription ->
StreamingPubSub subscription m P.ReceivedMessage
publish ::
Has PubSub sig m =>
Topic ->
Maybe OrderingKey ->
[(ByteString, Map Text Text)] ->
m (Either PubSubError [MessageId])
publish topic ordKey msgs =
send . Publish $
defMessage & PF.topic .~ (topic ^. _Wrapped')
& PF.messages
.~ ( case ordKey of
Just (OrderingKey okey) ->
fmap (set PF.orderingKey okey) msgs'
Nothing -> msgs'
)
where
msgs' =
msgs <&> \(m, attrs) ->
defMessage & PF.data' .~ m
& PF.attributes .~ attrs
ack ::
Has PubSub sig m =>
Subscription ->
[AckId] ->
m (Either PubSubError ())
ack sub ackIds =
send . Ack $
defMessage & PF.subscription .~ (sub ^. _Wrapped')
& PF.ackIds .~ (view _Wrapped' <$> ackIds)
unaryPull ::
Has PubSub sig m =>
Subscription ->
Int32 ->
m (Either PubSubError [P.ReceivedMessage])
unaryPull sub maxMessages =
send . UnaryPull $
defMessage & PF.subscription .~ (sub ^. _Wrapped')
& PF.maxMessages .~ maxMessages
streamAck ::
Has (StreamingPubSub subscription) sig m =>
Proxy subscription ->
[AckId] ->
m ()
streamAck p ackIds =
send (StreamAck p ackIds)
bumpAckDeadlines ::
Has (StreamingPubSub subscription) sig m =>
Proxy subscription ->
[(AckId, Seconds)] ->
m ()
bumpAckDeadlines p ackIdsWithSeconds =
send (BumpAckDeadlines p ackIdsWithSeconds)
streamPull ::
Has (StreamingPubSub subscription) sig m =>
Proxy subscription ->
m P.ReceivedMessage
streamPull =
send . StreamPull
-- | `http2-grpc-haskell` IO implementations
newtype PubSubIOC m a = PubSubIOC {runPubSubIOC :: ReaderC GrpcClient m a}
deriving newtype (Functor, Applicative, Monad, MonadIO)
instance
(MonadIO m, Algebra sig m) =>
Algebra (PubSub :+: sig) (PubSubIOC m)
where
alg hdl sig ctx = PubSubIOC $ case sig of
L (Publish req) -> do
grpcClient <- ask
fmap ((<$ ctx) . fmap (fmap MessageId . view PF.messageIds) . handlePubSubResp) . liftIO . runClientIO $
rawUnary
(RPC :: RPC P.Publisher "publish")
grpcClient
req
L (UnaryPull req) -> do
grpcClient <- ask
fmap ((<$ ctx) . (fmap (view PF.receivedMessages)) . handlePubSubResp) . liftIO . runClientIO $
rawUnary
(RPC :: RPC P.Subscriber "pull")
grpcClient
req
L (Ack req) -> do
grpcClient <- ask
fmap ((<$ ctx) . (() <$) . handlePubSubResp) . liftIO . runClientIO $
rawUnary
(RPC :: RPC P.Subscriber "acknowledge")
grpcClient
req
R other ->
alg (runPubSubIOC . hdl) (R other) ctx
where
handlePubSubResp resp = case resp of
Right (Right (Right (_, _, Right resp'))) -> pure resp'
Right (Right (Right (_, Just errHdrs, Left _))) ->
Left $
GrpcError $
fromMaybe (INTERNAL, "No grpc error message given") $
(,)
<$> ( find ((==) grpcStatusH . fst) errHdrs
>>= \(_, v) -> statusCodeForTrailer v
)
<*> (snd <$> find ((==) grpcMessageH . fst) errHdrs)
Right (Right (Right (_, Nothing, Left _))) -> Left $ GrpcError (INTERNAL, "No grpc error message given")
Right (Right (Left transportError)) -> Left $ TransportError (toErrorCodeId transportError)
Right (Left (TooMuchConcurrency _)) -> Left TooFastSlowDown
Left _ -> Left FatalClientErrorStreamEnded
newtype StreamingPubSubIOC (subscription :: Symbol) m a = StreamingPubSubIOC
{ runStreamingPubSubIOC :: ReaderC (U.OutChan P.ReceivedMessage, U.InChan P.StreamingPullRequest) m a
}
deriving newtype (Functor, Applicative, Monad, MonadIO)
instance
(MonadIO m, Algebra sig m, KnownSymbol subscription) =>
Algebra (StreamingPubSub subscription :+: sig) (StreamingPubSubIOC subscription m)
where
alg hdl sig ctx = StreamingPubSubIOC $ case sig of
L (StreamAck p ids) -> do
(_, chan) <- ask @(U.OutChan P.ReceivedMessage, U.InChan P.StreamingPullRequest)
fmap (<$ ctx) . liftIO $
U.writeChan chan $
defMessage & PF.subscription .~ (T.pack $ symbolVal p)
& PF.ackIds .~ (view _Wrapped' <$> ids)
L (BumpAckDeadlines p ids) -> do
(_, chan) <- ask @(U.OutChan P.ReceivedMessage, U.InChan P.StreamingPullRequest)
fmap (<$ ctx) . liftIO $
U.writeChan chan $
defMessage & PF.subscription .~ (T.pack $ symbolVal p)
& PF.modifyDeadlineSeconds .~ (view (_2 . _Wrapped') <$> ids)
& PF.modifyDeadlineAckIds .~ (view (_1 . _Wrapped') <$> ids)
L (StreamPull _) -> do
(chan, _) <- ask @(U.OutChan P.ReceivedMessage, U.InChan P.StreamingPullRequest)
fmap (<$ ctx) . liftIO $ U.readChan chan
R other ->
alg (runStreamingPubSubIOC . hdl) (R other) ctx
-- | Run a pubsub computation given a standard TLS 'Manager'
runPubSub ::
MonadIO m =>
-- | GRPC client TLS Manager
Manager ->
-- | PubSub computation
PubSubIOC m a ->
m a
runPubSub mgr p = do
s <- liftIO . runClientIO $ initCredsStore mgr
case s of
Right s' -> do
grpcClient' <-
liftIO . runClientIO $
setupGrpcClient
( (grpcClientConfigSimple "pubsub.googleapis.com" 443 UseTls)
{ _grpcClientConfigHeaders = pure <$> getOAuthTokenHeader mgr s'
}
)
case grpcClient' of
Right grpcClient -> runReader grpcClient $ runPubSubIOC p
Left failure -> panic ("Unable to boot Pub/Sub client: " <> show failure)
Left failure -> panic ("Unable to find default google application credentials: " <> show failure)
-- | Run a streaming pubsub subscription given a standard TLS 'Manager'
runStreamingPubSub ::
forall subscription m a.
(KnownSymbol subscription, MonadIO m) =>
-- | GRPC client TLS Manager
Manager ->
-- | Streaming ack deadline seconds
Seconds ->
-- | Unique client id
ClientId ->
-- | Channel buffering capacity
StreamBufferCapacity ->
-- | PubSub computation
StreamingPubSubIOC subscription m a ->
m a
runStreamingPubSub mgr deadline uniqueClientId bufferCapacity p = do
s <- liftIO . runClientIO $ initCredsStore mgr
case s of
Right s' -> do
grpcClient' <-
liftIO . runClientIO $
setupGrpcClient
( (grpcClientConfigSimple "pubsub.googleapis.com" 443 UseTls)
{ _grpcClientConfigHeaders = pure <$> getOAuthTokenHeader mgr s'
}
)
case grpcClient' of
Right grpcClient -> do
let capacity = bufferCapacity ^. _Wrapped'
(input, output) <- liftIO $ U.newChan capacity
(input', output') <- liftIO $ U.newChan capacity
let incomingHandler _ (RecvMessage m) =
liftIO $ traverse_ (U.writeChan input) (m ^. PF.receivedMessages)
incomingHandler _ _ = liftIO $ pure ()
let outgoingHandler _ = do
acks <- liftIO $ U.readChan output'
pure ((), SendMessage Compressed acks)
_ <-
liftIO . async
. runClientIO
$ rawGeneralStream
(RPC :: RPC P.Subscriber "streamingPull")
grpcClient
()
incomingHandler
()
outgoingHandler
-- Kick off the subscription - TODO : how to restart a subscription if the stream dies?
liftIO $
U.writeChan input' $
defMessage & PF.subscription .~ (T.pack $ symbolVal (Proxy @subscription))
& PF.streamAckDeadlineSeconds .~ (deadline ^. _Wrapped')
& PF.clientId .~ (uniqueClientId ^. _Wrapped')
-- TODO: Bump googleapis version to set flow control params for backpressure to server
-- see: https://github.com/googleapis/googleapis/blob/b821f320473c8ec05a1c7fb9a496c958b1ab9834/google/pubsub/v1/pubsub.proto#L1096-L1117
runReader (output, input') $ runStreamingPubSubIOC p
Left failure -> panic ("Unable to boot Pub/Sub client: " <> show failure)
Left failure -> panic ("Unable to find default google application credentials: " <> show failure)
-- * TEST unary and streaming programs
-- testProgram :: (MonadIO m, Has PubSub sig m) => m ()
-- testProgram = do
-- liftIO $ print "Running pubsub test"
-- res <-
-- publish
-- (Topic "projects/james-78278/topics/jamestest")
-- Nothing
-- [ ("test message number one", _Wrapped # []),
-- ("test message number two", _Wrapped # []),
-- ("test message number three", _Wrapped # [])
-- ]
-- liftIO $ print res
-- type TestSubscription1 = "projects/james-78278/subscriptions/james-test-sub"
-- type TestSubscription2 = "projects/james-78278/subscriptions/james-test-sub234"
-- testSubscription1 :: Proxy TestSubscription1
-- testSubscription1 = Proxy
-- testSubscription2 :: Proxy TestSubscription2
-- testSubscription2 = Proxy
-- testStream ::
-- ( Has (StreamingPubSub TestSubscription1) sig m,
-- Has (StreamingPubSub TestSubscription2) sig m
-- ) =>
-- m ()
-- testStream = do
-- res <- streamPull testSubscription1
-- res2 <- streamPull testSubscription2
-- res3 <- streamPull testSubscription1
-- streamAck testSubscription1 (AckId <$> [res ^. PF.ackId, res3 ^. PF.ackId])
-- streamAck testSubscription2 [AckId $ res2 ^. PF.ackId]
-- pure ()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment