public
Last active

Monitor "upstream" processes with EKG

  • Download Gist
EkgJsonParser.hs
Haskell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
 
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
 
module EkgJsonParser where
 
import Data.Aeson
import Data.Aeson.Types
import Control.Monad
import Data.ByteString.Lazy
import Text.InterpolatedString.Perl6
import qualified Data.HashMap.Strict as H
import qualified Data.Text as T
 
type AscList = [(T.Text, Double)]
data EkgData = EkgData { unCounters :: AscList, unGauges :: AscList } deriving Show
 
instance FromJSON EkgData where
parseJSON (Object x) = parseObject x
parseJSON _ = mzero
 
parseObject :: Object -> Parser EkgData
parseObject x = do
counters <- x .: "counters"
gauges <- x .: "gauges"
return (EkgData (H.toList counters) (H.toList gauges))
 
testData :: ByteString
testData = [q|{
"counters": {"one": 1, "two": 2},
"gauges": {"three": 1, "four": 2}
}|]
 
decodeEkgData :: ByteString -> Maybe EkgData
decodeEkgData = decode
 
decodeValue :: ByteString -> Maybe Value
decodeValue = decode
 
test :: IO ()
test = do
print testData
print $ decodeValue testData
print $ decodeEkgData testData
ekg_test.hs
Haskell
1 2 3 4 5 6 7 8 9 10 11
{-# LANGUAGE OverloadedStrings #-}
 
import System.Remote.Monitoring
import Control.Monad
import Control.Concurrent
 
main :: IO ()
main = forkServer "localhost" 8000 >> readwords
 
readwords :: IO ()
readwords = forever $ readFile "/usr/share/dict/words" >>= print . length . words >> threadDelay 1000000
upstream.hs
Haskell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
{-# LANGUAGE OverloadedStrings #-}
 
import Network.HTTP.Conduit
import Control.Monad
import Data.Monoid
import Control.Applicative
import Control.Monad.IO.Class
import qualified Data.Text as T
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import Network.HTTP.Types.Header (HeaderName)
 
import System.Remote.Monitoring
import System.Remote.Gauge as G
import Control.Concurrent
import EkgJsonParser
 
jsonRequest :: MonadIO m => String -> m L.ByteString
jsonRequest url = liftIO $ withManager $ \man -> do
req <- liftIO $ parseUrl url
responseBody <$> httpLbs (setHeaders [("Accept", "application/json"), ("Connection", "close")] req) man
 
setHeaders :: [(HeaderName, B.ByteString)] -> Request m -> Request m
setHeaders l r = r { requestHeaders = foldr (:) (requestHeaders r) l }
 
main :: IO ()
main = do
server <- forkServer "localhost" 9000
 
void $ forever $ upstream "http://localhost:8000/" server >> threadDelay 10000000
 
upstream :: String -> Server -> IO ()
upstream location server = do
putStrLn $ "Remote Read: " ++ location
 
response <- decodeEkgData <$> jsonRequest location
 
case response
of Nothing -> putStrLn "SUX"
(Just l) -> process location server (unGauges l)
 
process :: String -> Server -> [(T.Text, Double)] -> IO ()
process r s l = do
gs <- mapM (flip getGauge s) (map ((\x -> T.pack r <> "->" <> x) . fst) l)
sequence_ $ zipWith G.set gs (map (floor . snd) l)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.