Skip to content

Instantly share code, notes, and snippets.

@robinp
Created February 11, 2013 15:08
Show Gist options
  • Save robinp/4754976 to your computer and use it in GitHub Desktop.
Save robinp/4754976 to your computer and use it in GitHub Desktop.
Stomp request sending to HornetQ (error handling might be incomplete in timeoutE, todo)
import Network
import System.IO
import Data.Char (chr, ord)
import Control.Exception (IOException)
import Control.Concurrent (forkIO, threadDelay, killThread)
import Control.Concurrent.MVar
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Either
import Control.Monad (liftM)
import Control.Error.Util (tryIO, note)
import Data.EitherR (fmapLT)
import qualified Data.ByteString.Lazy.Char8 as B
import qualified Codec.Compression.Zlib as Z
import qualified Network.Stomp as S
readPayload :: FilePath -> IO B.ByteString
readPayload s =
Z.compress `fmap` B.readFile s
callback :: MVar Bool -> S.Frame -> IO ()
callback sync frame = case frame of
S.Frame (S.SC S.MESSAGE) hs body ->
showresult body >> sync `putMVar` True
other ->
putStrLn ("Unexpected message received = " ++ show other) >> sync `putMVar` False
showresult body =
putStrLn $ (B.unpack . Z.decompress) body
data AppErr = AppIOErr (Maybe String) IOException
| AppOtherErr String
deriving (Show)
-- wraps up the IOException on the left in the custom App-specific ADT
mTryIO = fmapLT (AppIOErr Nothing) . tryIO
mNoteIO s = fmapLT (AppIOErr (Just s)) . tryIO
main = runEitherIO $ do
lift $ putStrLn "starting"
--
conn <- timeoutE (AppOtherErr "Connect timeout") 100000 $
S.connect "stomp://guest:guest@1110.0.2.21:61613" vers []
sync <- mTryIO $ newEmptyMVar
--
mNoteIO "Preparing consumer" $ do
S.startConsumer conn (callback sync)
S.subscribe conn replyQueue subId [selector]
--
payload <- mNoteIO "Reading request to send" $ readPayload "request.xml"
--
mNoteIO "Sending request" $ S.send conn requestQueue sendHeaders payload
-- wait for reply
timeoutE (AppOtherErr "Timeout waiting reply") timelimit $ readMVar sync
--
mNoteIO "Disconnect" $ S.disconnect conn []
where subId = "sub0"
vers = [(1,1)]
selector = ("selector", "JMSCorrelationID LIKE 'almakorte%'")
sendHeaders = [
-- content-length added by lib
("JMSReplyTo" , replyQueue),
("JMSCorrelationID", "almakorte01") ]
--
replyQueue = queue ++ ".OUT"
requestQueue = queue ++ ".IN"
queue = "jms.queue.SOLR.SEARCH"
--
timelimit = 1*1000*1000 -- usec
runEitherIO :: (Show e) => EitherT e IO () -> IO ()
runEitherIO = eitherT whenBad whenOk
where whenBad = putStrLn . ("Error happened:" ++) . show
whenOk = const $ return ()
-- from HaskellWiki
compete :: (MonadIO m) => [m a] -> m a
compete as = do
mvar <- liftIO newEmptyMVar
tids <- mapM (\a -> (liftIO . forkIO) $ a >>= liftIO . putMVar mvar) as
result <- liftIO $ readMVar mvar
mapM_ (liftIO . killThread) tids
return result
timeout :: (MonadIO m) => Int -> m a -> m (Maybe a)
timeout usec a =
compete [liftM Just a, liftIO (threadDelay usec) >> return Nothing]
-- custom mix
timeoutE :: (MonadIO m) => e -> Int -> m a -> EitherT e m a
timeoutE e t a = EitherT . fmap (note e) $ timeout t a
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment