Skip to content

Instantly share code, notes, and snippets.

@kikyousky
Created May 18, 2016 08:00
Show Gist options
  • Save kikyousky/40ebf764fb7cfcdda4d234a0076af236 to your computer and use it in GitHub Desktop.
Save kikyousky/40ebf764fb7cfcdda4d234a0076af236 to your computer and use it in GitHub Desktop.
import struct
import io
import sys
import datetime
import re
import os
try :
from StringIO import StringIO
except ImportError:
from io import StringIO
import redis
import crc
import crc64
REDIS_RDB_6BITLEN = 0
REDIS_RDB_14BITLEN = 1
REDIS_RDB_32BITLEN = 2
REDIS_RDB_ENCVAL = 3
REDIS_RDB_OPCODE_EXPIRETIME_MS = 252
REDIS_RDB_OPCODE_EXPIRETIME = 253
REDIS_RDB_OPCODE_SELECTDB = 254
REDIS_RDB_OPCODE_EOF = 255
REDIS_RDB_TYPE_STRING = 0
REDIS_RDB_TYPE_LIST = 1
REDIS_RDB_TYPE_SET = 2
REDIS_RDB_TYPE_ZSET = 3
REDIS_RDB_TYPE_HASH = 4
REDIS_RDB_TYPE_HASH_ZIPMAP = 9
REDIS_RDB_TYPE_LIST_ZIPLIST = 10
REDIS_RDB_TYPE_SET_INTSET = 11
REDIS_RDB_TYPE_ZSET_ZIPLIST = 12
REDIS_RDB_TYPE_HASH_ZIPLIST = 13
REDIS_RDB_ENC_INT8 = 0
REDIS_RDB_ENC_INT16 = 1
REDIS_RDB_ENC_INT32 = 2
REDIS_RDB_ENC_LZF = 3
DATA_TYPE_MAPPING = {
0 : "string", 1 : "list", 2 : "set", 3 : "sortedset", 4 : "hash",
9 : "hash", 10 : "list", 11 : "set", 12 : "sortedset", 13 : "hash"}
class RdbRehasher :
def __init__(self, node, destdir) :
self.node = node
self.slots_dist = get_cluster_slots(node)
self.destdir = destdir
self.filehcache = {}
def _get_out_from_key(self,key):
try:
slot = keyslot(key)
except:
print key
return None
if slot in self.filehcache:
return self.filehcache[slot]
for item in self.slots_dist:
if slot >= item[0] and slot <= item[1]:
node = item[2][0] +'_'+ str(item[2][1])
outfile = self.destdir + '/' + node + '.rdb'
outfileh = open(outfile,'a')
self.filehcache[slot] = outfileh
break
return outfileh
def initRdbFile(self):
for item in self.slots_dist:
node = item[2][0] +'_'+ str(item[2][1])
outfile = self.destdir + '/' + node + '.rdb'
outfileh = open(outfile,'w')
outfileh.write('REDIS') #REDIS
outfileh.write('0006') #rdb version
outfileh.write('\xFE\x00') # start database 0
outfileh.close()
def finiRdbFile(self):
for item in self.slots_dist:
node = item[2][0] +'_'+ str(item[2][1])
outfile = self.destdir + '/' + node + '.rdb'
outfileh = open(outfile,'a')
outfileh.write('\xFF') #End of RDB file indicator
outfileh.close()
#write checksum, code below is very slow
"""
outfileh = open(outfile,'r')
buf = outfileh.read(5000000)
crc64obj = crc64.CRC64()
while buf:
crc64obj.append(buf)
buf = outfileh.read(5000000)
crc64val = struct.pack('Q',crc64obj.fini())
outfileh.close()
"""
outfileh = open(outfile,'a')
outfileh.write('\x00\x00\x00\x00')
outfileh.close()
def rehash(self, filename):
with open(filename, "rb") as f:
self.verify_magic_string(f.read(5))
self.verify_version(f.read(4))
while True :
expiry = None
start_pos = f.tell()
data_type = read_unsigned_char(f)
if data_type == REDIS_RDB_OPCODE_SELECTDB :
self.read_length(f) #skip db number
continue
if data_type == REDIS_RDB_OPCODE_EOF :
break
if data_type == REDIS_RDB_OPCODE_EXPIRETIME_MS :
expiry = to_datetime(read_unsigned_long(f) * 1000)
data_type = read_unsigned_char(f)
elif data_type == REDIS_RDB_OPCODE_EXPIRETIME :
expiry = to_datetime(read_unsigned_int(f) * 1000000)
data_type = read_unsigned_char(f)
self._key = self.read_string(f)
self.skip_object(f, data_type)
end_pos = f.tell()
segment_length = end_pos - start_pos
_out = self._get_out_from_key(self._key)
if _out:
f.seek(start_pos)
_out.write(f.read(segment_length))
_out.flush() #flush after every write
def read_length_with_encoding(self, f) :
length = 0
is_encoded = False
bytes = []
bytes.append(read_unsigned_char(f))
enc_type = (bytes[0] & 0xC0) >> 6
if enc_type == REDIS_RDB_ENCVAL :
is_encoded = True
length = bytes[0] & 0x3F
elif enc_type == REDIS_RDB_6BITLEN :
length = bytes[0] & 0x3F
elif enc_type == REDIS_RDB_14BITLEN :
bytes.append(read_unsigned_char(f))
length = ((bytes[0]&0x3F)<<8)|bytes[1]
else :
length = ntohl(f)
return (length, is_encoded)
def read_length(self, f) :
return self.read_length_with_encoding(f)[0]
def read_string(self, f) :
tup = self.read_length_with_encoding(f)
length = tup[0]
is_encoded = tup[1]
val = None
if is_encoded :
if length == REDIS_RDB_ENC_INT8 :
val = read_signed_char(f)
elif length == REDIS_RDB_ENC_INT16 :
val = read_signed_short(f)
elif length == REDIS_RDB_ENC_INT32 :
val = read_signed_int(f)
elif length == REDIS_RDB_ENC_LZF :
clen = self.read_length(f)
l = self.read_length(f)
val = self.lzf_decompress(f.read(clen), l)
else :
val = f.read(length)
return val
def lzf_decompress(self, compressed, expected_length):
in_stream = bytearray(compressed)
in_len = len(in_stream)
in_index = 0
out_stream = bytearray()
out_index = 0
while in_index < in_len :
ctrl = in_stream[in_index]
if not isinstance(ctrl, int) :
raise Exception('lzf_decompress', 'ctrl should be a number %s for key %s' % (str(ctrl), self._key))
in_index = in_index + 1
if ctrl < 32 :
for x in xrange(0, ctrl + 1) :
out_stream.append(in_stream[in_index])
#sys.stdout.write(chr(in_stream[in_index]))
in_index = in_index + 1
out_index = out_index + 1
else :
length = ctrl >> 5
if length == 7 :
length = length + in_stream[in_index]
in_index = in_index + 1
ref = out_index - ((ctrl & 0x1f) << 8) - in_stream[in_index] - 1
in_index = in_index + 1
for x in xrange(0, length + 2) :
out_stream.append(out_stream[ref])
ref = ref + 1
out_index = out_index + 1
if len(out_stream) != expected_length :
raise Exception('lzf_decompress', 'Expected lengths do not match %d != %d for key %s' % (len(out_stream), expected_length, self._key))
return str(out_stream)
def skip_key_and_object(self, f, data_type):
self.skip_string(f)
self.skip_object(f, data_type)
def skip_string(self, f):
tup = self.read_length_with_encoding(f)
length = tup[0]
is_encoded = tup[1]
bytes_to_skip = 0
if is_encoded :
if length == REDIS_RDB_ENC_INT8 :
bytes_to_skip = 1
elif length == REDIS_RDB_ENC_INT16 :
bytes_to_skip = 2
elif length == REDIS_RDB_ENC_INT32 :
bytes_to_skip = 4
elif length == REDIS_RDB_ENC_LZF :
clen = self.read_length(f)
l = self.read_length(f)
bytes_to_skip = clen
else :
bytes_to_skip = length
skip(f, bytes_to_skip)
def skip_object(self, f, enc_type):
skip_strings = 0
if enc_type == REDIS_RDB_TYPE_STRING :
skip_strings = 1
elif enc_type == REDIS_RDB_TYPE_LIST :
skip_strings = self.read_length(f)
elif enc_type == REDIS_RDB_TYPE_SET :
skip_strings = self.read_length(f)
elif enc_type == REDIS_RDB_TYPE_ZSET :
skip_strings = self.read_length(f) * 2
elif enc_type == REDIS_RDB_TYPE_HASH :
skip_strings = self.read_length(f) * 2
elif enc_type == REDIS_RDB_TYPE_HASH_ZIPMAP :
skip_strings = 1
elif enc_type == REDIS_RDB_TYPE_LIST_ZIPLIST :
skip_strings = 1
elif enc_type == REDIS_RDB_TYPE_SET_INTSET :
skip_strings = 1
elif enc_type == REDIS_RDB_TYPE_ZSET_ZIPLIST :
skip_strings = 1
elif enc_type == REDIS_RDB_TYPE_HASH_ZIPLIST :
skip_strings = 1
else :
raise Exception('read_object', 'Invalid object type %d for key %s' % (enc_type, self._key))
for x in xrange(0, skip_strings):
self.skip_string(f)
def verify_magic_string(self, magic_string) :
if magic_string != 'REDIS' :
raise Exception('verify_magic_string', 'Invalid File Format')
def verify_version(self, version_str) :
version = int(version_str)
if version < 1 or version > 6 :
raise Exception('verify_version', 'Invalid RDB version number %d' % version)
def get_logical_type(self, data_type):
return DATA_TYPE_MAPPING[data_type]
def skip(f, free):
if free :
f.read(free)
def ntohl(f) :
val = read_unsigned_int(f)
new_val = 0
new_val = new_val | ((val & 0x000000ff) << 24)
new_val = new_val | ((val & 0xff000000) >> 24)
new_val = new_val | ((val & 0x0000ff00) << 8)
new_val = new_val | ((val & 0x00ff0000) >> 8)
return new_val
def to_datetime(usecs_since_epoch):
seconds_since_epoch = usecs_since_epoch / 1000000
useconds = usecs_since_epoch % 1000000
dt = datetime.datetime.utcfromtimestamp(seconds_since_epoch)
delta = datetime.timedelta(microseconds = useconds)
return dt + delta
def read_signed_char(f) :
return struct.unpack('b', f.read(1))[0]
def read_unsigned_char(f) :
return struct.unpack('B', f.read(1))[0]
def read_signed_short(f) :
return struct.unpack('h', f.read(2))[0]
def read_unsigned_short(f) :
return struct.unpack('H', f.read(2))[0]
def read_signed_int(f) :
return struct.unpack('i', f.read(4))[0]
def read_unsigned_int(f) :
return struct.unpack('I', f.read(4))[0]
def read_big_endian_unsigned_int(f):
return struct.unpack('>I', f.read(4))[0]
def read_24bit_signed_number(f):
s = '0' + f.read(3)
num = struct.unpack('i', s)[0]
return num >> 8
def read_signed_long(f) :
return struct.unpack('q', f.read(8))[0]
def read_unsigned_long(f) :
return struct.unpack('Q', f.read(8))[0]
def string_as_hexcode(string) :
for s in string :
if isinstance(s, int) :
print(hex(s))
else :
print(hex(ord(s)))
def get_cluster_slots(node):
node = node.split(':')
rh = redis.StrictRedis(node[0], node[1], 0)
slots = rh.execute_command('cluster slots')
return slots
def keyslot(key):
"""
Calculate keyslot for a given key.
This also works for binary keys that is used in python 3.
"""
k = unicode(key,'utf-8')
start = k.find("{")
if start > -1:
end = k.find("}", start + 1)
if end > -1 and end != start + 1:
k = k[start + 1:end]
return crc.crc16(k) % 16384
if __name__ == "__main__":
rehasher = RdbRehasher('10.242.118.244:17002','/home/work/xiabing/rdb_rehashed')
rehasher.initRdbFile()
for f in [
#'10.242.80.20_16106.rdb',
#'10.242.82.23_16101.rdb',
#'10.242.82.23_16102.rdb',
#'10.242.82.23_16103.rdb',
#'10.242.82.23_16104.rdb',
#'10.242.82.23_16105.rdb',
#'10.242.82.23_16107.rdb',
#'10.242.82.23_16108.rdb',
#'10.242.82.23_16109.rdb',
#'10.242.82.23_16110.rdb',
#'10.242.82.23_16111.rdb',
#'10.242.82.23_16112.rdb',
#'10.242.82.23_16113.rdb',
#'10.242.82.23_16114.rdb',
#'10.242.82.23_16118.rdb',
#'10.242.83.43_16100.rdb',
#'10.242.83.43_16115.rdb',
#'10.242.83.43_16116.rdb',
#'10.242.83.43_16117.rdb',
'10.242.83.43_16119.rdb'
]:
print f
rehasher.rehash('/home/work/xiabing/rdb_online/'+f)
rehasher.finiRdbFile()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment