Created
October 29, 2014 16:51
-
-
Save ixmatus/932bc375bda632cc2111 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Once we have an AMQP connection (which is stuffed into the RouterState data type) we then spawn multiple subscribers. | |
startSubscribers :: RouterState -> IO () | |
startSubscribers r = do | |
stamp <- createTimestamp | |
tlist <- replicateM (amqpConsumers $ rstateSettings r) $ subscribe r | |
-- Link ALL of the async subscribers to this parent | |
debugM "Console" $ stamp ++ " [DEBUG] Linking subscribers" | |
forM_ tlist link | |
debugM "Console" $ stamp ++ " [DEBUG] Pooled " ++ (show . amqpConsumers $ rstateSettings r) ++ " channels" | |
-- Here is where the actual aysnc thread is spawned, a channel is requested out of the (Pool Channel), and threadDelay is called to make the thread wait. | |
subscribe :: RouterState -> IO (Async ()) | |
subscribe r = do | |
stamp <- createTimestamp | |
case rstateAMQPChan r of | |
Just p -> async $ withResource p $ \chan -> do | |
_ <- consumeMsgs chan (amqpQueue $ rstateSettings r) Ack $ routeOnType r | |
threadDelay (maxBound :: Int) | |
Nothing -> error $ stamp ++ " [ERROR] Error occurred retrieving an amqp channel pool" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment