Skip to content

Instantly share code, notes, and snippets.

@ibizaman
Last active August 29, 2015 14:27
Show Gist options
  • Save ibizaman/9d9d1c2dec55e8f18009 to your computer and use it in GitHub Desktop.
Save ibizaman/9d9d1c2dec55e8f18009 to your computer and use it in GitHub Desktop.
Simple recursive consumer for kafka
import Haskakafka
import qualified Data.ByteString.Char8 as C8
kafkaConfig = []
topicConfig = []
host = "localhost:9092"
topic = "ttp"
partition = 0
timeoutMs = 1000
maxMessages = 10000
printMessages :: [KafkaMessage] -> IO ()
printMessages = putStrLn . unlines . map (C8.unpack . messagePayload)
countMessages :: Kafka -> KafkaTopic -> IO Int
countMessages = countMessages' 0
countMessages' :: Int -> Kafka -> KafkaTopic -> IO Int
countMessages' count kafka topic = do
messages <- consumeMessageBatch topic partition timeoutMs maxMessages
case messages of
Left error -> do
putStrLn (show error)
return count
Right messages -> do
printMessages messages
countMessages' (count + (length messages)) kafka topic
main :: IO ()
main = do
withKafkaConsumer kafkaConfig topicConfig
host topic partition KafkaOffsetBeginning
(\kafka topic -> do
setLogLevel kafka KafkaLogDebug
count <- countMessages kafka topic
putStrLn (show count))
consumeMessageBatch (KafkaTopic topicPtr _ _) partition timeout maxMessages =
allocaArray maxMessages $ \outputPtr -> do
numMessages <- rdKafkaConsumeBatch topicPtr (fromIntegral partition) timeout outputPtr (fromIntegral maxMessages)
if numMessages < 0 then getErrno >>= return . Left . kafkaRespErr
else do
ms <- if numMessages /= 0 then
forM [0..(numMessages - 1)] $ \mnum -> do
storablePtr <- peekElemOff outputPtr (fromIntegral mnum)
storable <- peek storablePtr
ret <- fromMessageStorable storable
fptr <- newForeignPtr_ storablePtr
addForeignPtrFinalizer rdKafkaMessageDestroy fptr
return ret
else return []
return $ Right ms
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment