Created
February 11, 2013 15:08
-
-
Save robinp/4754976 to your computer and use it in GitHub Desktop.
Stomp request sending to HornetQ (error handling might be incomplete in timeoutE, todo)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Network | |
import System.IO | |
import Data.Char (chr, ord) | |
import Control.Exception (IOException) | |
import Control.Concurrent (forkIO, threadDelay, killThread) | |
import Control.Concurrent.MVar | |
import Control.Monad.IO.Class (MonadIO, liftIO) | |
import Control.Monad.Trans.Class (lift) | |
import Control.Monad.Trans.Either | |
import Control.Monad (liftM) | |
import Control.Error.Util (tryIO, note) | |
import Data.EitherR (fmapLT) | |
import qualified Data.ByteString.Lazy.Char8 as B | |
import qualified Codec.Compression.Zlib as Z | |
import qualified Network.Stomp as S | |
readPayload :: FilePath -> IO B.ByteString | |
readPayload s = | |
Z.compress `fmap` B.readFile s | |
callback :: MVar Bool -> S.Frame -> IO () | |
callback sync frame = case frame of | |
S.Frame (S.SC S.MESSAGE) hs body -> | |
showresult body >> sync `putMVar` True | |
other -> | |
putStrLn ("Unexpected message received = " ++ show other) >> sync `putMVar` False | |
showresult body = | |
putStrLn $ (B.unpack . Z.decompress) body | |
data AppErr = AppIOErr (Maybe String) IOException | |
| AppOtherErr String | |
deriving (Show) | |
-- wraps up the IOException on the left in the custom App-specific ADT | |
mTryIO = fmapLT (AppIOErr Nothing) . tryIO | |
mNoteIO s = fmapLT (AppIOErr (Just s)) . tryIO | |
main = runEitherIO $ do | |
lift $ putStrLn "starting" | |
-- | |
conn <- timeoutE (AppOtherErr "Connect timeout") 100000 $ | |
S.connect "stomp://guest:guest@1110.0.2.21:61613" vers [] | |
sync <- mTryIO $ newEmptyMVar | |
-- | |
mNoteIO "Preparing consumer" $ do | |
S.startConsumer conn (callback sync) | |
S.subscribe conn replyQueue subId [selector] | |
-- | |
payload <- mNoteIO "Reading request to send" $ readPayload "request.xml" | |
-- | |
mNoteIO "Sending request" $ S.send conn requestQueue sendHeaders payload | |
-- wait for reply | |
timeoutE (AppOtherErr "Timeout waiting reply") timelimit $ readMVar sync | |
-- | |
mNoteIO "Disconnect" $ S.disconnect conn [] | |
where subId = "sub0" | |
vers = [(1,1)] | |
selector = ("selector", "JMSCorrelationID LIKE 'almakorte%'") | |
sendHeaders = [ | |
-- content-length added by lib | |
("JMSReplyTo" , replyQueue), | |
("JMSCorrelationID", "almakorte01") ] | |
-- | |
replyQueue = queue ++ ".OUT" | |
requestQueue = queue ++ ".IN" | |
queue = "jms.queue.SOLR.SEARCH" | |
-- | |
timelimit = 1*1000*1000 -- usec | |
runEitherIO :: (Show e) => EitherT e IO () -> IO () | |
runEitherIO = eitherT whenBad whenOk | |
where whenBad = putStrLn . ("Error happened:" ++) . show | |
whenOk = const $ return () | |
-- from HaskellWiki | |
compete :: (MonadIO m) => [m a] -> m a | |
compete as = do | |
mvar <- liftIO newEmptyMVar | |
tids <- mapM (\a -> (liftIO . forkIO) $ a >>= liftIO . putMVar mvar) as | |
result <- liftIO $ readMVar mvar | |
mapM_ (liftIO . killThread) tids | |
return result | |
timeout :: (MonadIO m) => Int -> m a -> m (Maybe a) | |
timeout usec a = | |
compete [liftM Just a, liftIO (threadDelay usec) >> return Nothing] | |
-- custom mix | |
timeoutE :: (MonadIO m) => e -> Int -> m a -> EitherT e m a | |
timeoutE e t a = EitherT . fmap (note e) $ timeout t a |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment