Parsing the redis replication stream with AttoParsec
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE OverloadedStrings #-} | |
module RedisParsing where | |
import Control.Exception | |
import qualified Data.ByteString as B | |
import Data.Attoparsec.ByteString as AB | |
import Data.Attoparsec.ByteString.Char8 | |
import Data.List | |
import Network.Socket hiding (recv) | |
import Network.Socket.ByteString (recv, sendAll) | |
data RedisValue = RedisInteger Integer | |
| RedisError B.ByteString | |
| RedisSimpleString B.ByteString | |
| RedisBulkString B.ByteString | |
| RedisArray [RedisValue] | |
instance Show RedisValue where | |
show (RedisArray elements) = concat $ ["[", intercalate "," (map show elements), "]"] | |
show (RedisBulkString bs) = show bs | |
show (RedisInteger num) = show num | |
show (RedisError bs) = show bs | |
show (RedisSimpleString bs) = show bs | |
parseRedisValue :: Parser RedisValue | |
parseRedisValue = choice [ parseRedisArray | |
, parseRedisBulkString | |
, parseRedisInteger | |
, parseRedisError | |
, parseRedisSimpleString | |
] | |
parseRedisArray = do | |
char '*' | |
numElements <- decimal | |
endOfLine | |
elements <- count numElements parseRedisValue | |
return $ RedisArray elements | |
parseRedisInteger = RedisInteger <$> (char ':' *> signed decimal) <* endOfLine | |
parseRedisSimpleString = RedisSimpleString <$> (char '+' *> AB.takeWhile (not . isEndOfLine)) <* endOfLine | |
parseRedisError = RedisError <$> (char '-' *> AB.takeWhile (not . isEndOfLine)) <* endOfLine | |
parseRedisBulkString = do | |
char '$' | |
bStringLength <- decimal | |
endOfLine | |
actualString <- AB.take bStringLength | |
endOfLine | |
return $ RedisBulkString actualString | |
parseRDB = do | |
char '$' | |
bStringLength <- decimal | |
endOfLine | |
actualString <- AB.take bStringLength | |
return $ RedisBulkString actualString | |
main = withSocketsDo $ do | |
addr <- resolve "127.0.0.1" "6379" | |
bracket (open addr) close talk | |
where | |
resolve host port = do | |
let hints = defaultHints { addrSocketType = Stream } | |
addr:_ <- getAddrInfo (Just hints) (Just host) (Just port) | |
return addr | |
open a = do | |
sock <- socket (addrFamily a) (addrSocketType a) (addrProtocol a) | |
connect sock $ addrAddress a | |
return sock | |
talk sock = do | |
sendAll sock "SYNC\n" | |
Done rest (RedisBulkString rdb) <- parseWith (recv sock 1024) parseRDB "" | |
putStr $ "Skipped " ++ (show . B.length $ rdb) ++ " bytes of RDB data.\n" | |
streamingParser sock rest | |
streamingParser :: Socket -> B.ByteString -> IO () | |
streamingParser sock leftovers = do | |
result <- parseWith (recv sock 1024) parseRedisValue leftovers | |
case result of | |
Done rest result -> do | |
print result | |
streamingParser sock rest | |
Fail _ _ msg -> error msg |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment