Parsing the redis replication stream with AttoParsec
{-# LANGUAGE OverloadedStrings #-} | |
module RedisParsing where | |
import Control.Exception | |
import qualified Data.ByteString as B | |
import Data.Attoparsec.ByteString as AB | |
import Data.Attoparsec.ByteString.Char8 | |
import Data.List | |
import Network.Socket hiding (recv) | |
import Network.Socket.ByteString (recv, sendAll) | |
data RedisValue = RedisInteger Integer | |
| RedisError B.ByteString | |
| RedisSimpleString B.ByteString | |
| RedisBulkString B.ByteString | |
| RedisArray [RedisValue] | |
instance Show RedisValue where | |
show (RedisArray elements) = concat $ ["[", intercalate "," (map show elements), "]"] | |
show (RedisBulkString bs) = show bs | |
show (RedisInteger num) = show num | |
show (RedisError bs) = show bs | |
show (RedisSimpleString bs) = show bs | |
parseRedisValue :: Parser RedisValue | |
parseRedisValue = choice [ parseRedisArray | |
, parseRedisBulkString | |
, parseRedisInteger | |
, parseRedisError | |
, parseRedisSimpleString | |
] | |
parseRedisArray = do | |
char '*' | |
numElements <- decimal | |
endOfLine | |
elements <- count numElements parseRedisValue | |
return $ RedisArray elements | |
parseRedisInteger = RedisInteger <$> (char ':' *> signed decimal) <* endOfLine | |
parseRedisSimpleString = RedisSimpleString <$> (char '+' *> AB.takeWhile (not . isEndOfLine)) <* endOfLine | |
parseRedisError = RedisError <$> (char '-' *> AB.takeWhile (not . isEndOfLine)) <* endOfLine | |
parseRedisBulkString = do | |
char '$' | |
bStringLength <- decimal | |
endOfLine | |
actualString <- AB.take bStringLength | |
endOfLine | |
return $ RedisBulkString actualString | |
parseRDB = do | |
char '$' | |
bStringLength <- decimal | |
endOfLine | |
actualString <- AB.take bStringLength | |
return $ RedisBulkString actualString | |
main = withSocketsDo $ do | |
addr <- resolve "127.0.0.1" "6379" | |
bracket (open addr) close talk | |
where | |
resolve host port = do | |
let hints = defaultHints { addrSocketType = Stream } | |
addr:_ <- getAddrInfo (Just hints) (Just host) (Just port) | |
return addr | |
open a = do | |
sock <- socket (addrFamily a) (addrSocketType a) (addrProtocol a) | |
connect sock $ addrAddress a | |
return sock | |
talk sock = do | |
sendAll sock "SYNC\n" | |
Done rest (RedisBulkString rdb) <- parseWith (recv sock 1024) parseRDB "" | |
putStr $ "Skipped " ++ (show . B.length $ rdb) ++ " bytes of RDB data.\n" | |
streamingParser sock rest | |
streamingParser :: Socket -> B.ByteString -> IO () | |
streamingParser sock leftovers = do | |
result <- parseWith (recv sock 1024) parseRedisValue leftovers | |
case result of | |
Done rest result -> do | |
print result | |
streamingParser sock rest | |
Fail _ _ msg -> error msg |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment