Skip to content

Instantly share code, notes, and snippets.

@abg
Created December 2, 2013 19:40
Show Gist options
  • Save abg/7756451 to your computer and use it in GitHub Desktop.
Save abg/7756451 to your computer and use it in GitHub Desktop.
Decode MySQL newdecimal datatype - intended for decoding the defaults buffer in a .frm file.
import operator
import struct
def unpack_integer(data, size, endian='<'):
"""Unpack a MySQL packed integer format
Standard MySQL integers are packed in little endian format,
but various integers are packed in big endian format - e.g.
timestamps and decimal parts.
:param data: byte string of packed data
:param size: size in bytes of the underlying integer data
Expected value in range of 1, 2, 3, 4, or 8
:param endian: < for little endian or > for big endian
default < (little endian)
"""
if endian not in ('<', '>'):
raise ValueError("Invalid endian value '%s'" % endian)
# size may be 1, 2, 3, 4 or 8
fmt = { 1 : 'B', 2 : 'H', 3 : 'I', 4 : 'I', 8 : 'Q'}
if size == 3:
# pad the most significant byte so we can decode
# a 4 byte integer
if endian == '<':
data = data + b'\x00'
else:
data = b'\x00' + data
return struct.unpack(endian + fmt[size], data)[0]
def unpack_decimal(data, precision, scale):
"""Unpack a packed MySQL DECIMAL(precision, scale) byte sequence
"""
# map a number of digits to the bytes required for
# the packed representation
digits_to_bytes = [ 0, 1, 1, 2, 2, 3, 3, 4, 4, 4 ]
# extract sign information and repack first byte
first = unpack_integer(data, 1)
data = struct.pack('B', first ^ 0x80) + data[1:]
# if the high bit was set, this is a negative number
# all data needs to be decoded by inversion
conversion = None if first & 0x80 else operator.invert
def decode_decimal(data):
"""Decode packed decimal format
This decoded one half of the MySQL NEWDECIMAL format
This asssumes `data` has its high bit decoded, as needed
and is either the integer or decimal portion.
Then data is decoded in multiples of 4 bytes and returned
as a string of decimal digits
e.g. \x01 -> '1'
"""
# pad to a multiple of 4, as needed
if len(data) % 4:
pad = 4 - len(data) % 4
# ensure we pad with the correct byte sequence
pad_char = b'\xff' if conversion else b'\x00'
data = pad_char*pad + data
# these are signed numbers - decoded in groups of 4
groups = struct.unpack('>' + 'i'*(len(data) // 4), data)
if conversion:
groups = [conversion(i) for i in groups]
return ''.join(str(i) for i in groups)
# calculate the bounds for the integer and fractional portion of the
# decimal data
# Data is grouped into 9 digits per 4 byte word. The most significant
# digits may be packed in < 4 bytes based on the number of digits required
# for the precision or scale.
# So this is the number of whole 9 decimal places + the number of bytes for
# the remainder (if any)
i = ((precision - scale) // 9)*4 + digits_to_bytes[(precision - scale) % 9]
j = (scale // 9)*4 + digits_to_bytes[scale % 9]
integer_part = decode_decimal(data[0:i])
decimal_part = decode_decimal(data[-j:])
return '{0}.{1}'.format(integer_part, decimal_part)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment