Skip to content

Instantly share code, notes, and snippets.

@ixmatus
Created October 29, 2014 16:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ixmatus/932bc375bda632cc2111 to your computer and use it in GitHub Desktop.
Save ixmatus/932bc375bda632cc2111 to your computer and use it in GitHub Desktop.
-- Once we have an AMQP connection (which is stuffed into the RouterState data type) we then spawn multiple subscribers.
startSubscribers :: RouterState -> IO ()
startSubscribers r = do
stamp <- createTimestamp
tlist <- replicateM (amqpConsumers $ rstateSettings r) $ subscribe r
-- Link ALL of the async subscribers to this parent
debugM "Console" $ stamp ++ " [DEBUG] Linking subscribers"
forM_ tlist link
debugM "Console" $ stamp ++ " [DEBUG] Pooled " ++ (show . amqpConsumers $ rstateSettings r) ++ " channels"
-- Here is where the actual aysnc thread is spawned, a channel is requested out of the (Pool Channel), and threadDelay is called to make the thread wait.
subscribe :: RouterState -> IO (Async ())
subscribe r = do
stamp <- createTimestamp
case rstateAMQPChan r of
Just p -> async $ withResource p $ \chan -> do
_ <- consumeMsgs chan (amqpQueue $ rstateSettings r) Ack $ routeOnType r
threadDelay (maxBound :: Int)
Nothing -> error $ stamp ++ " [ERROR] Error occurred retrieving an amqp channel pool"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment