Last active
August 29, 2015 14:11
-
-
Save MagerValp/e364df7d494cb2418390 to your computer and use it in GitHub Desktop.
Parse DER format file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
import sys | |
import argparse | |
class DERParserException(BaseException): | |
pass | |
class DERParserEOF(DERParserException): | |
pass | |
TAG_IMPLICIT = 0x00 | |
TAG_BOOLEAN = 0x01 | |
TAG_INTEGER = 0x02 | |
TAG_BIT_STRING = 0x03 | |
TAG_OCTET_STRING = 0x04 | |
TAG_NULL = 0x05 | |
TAG_OBJECT_ID = 0x06 | |
TAG_UTF8_STRING = 0x0c | |
TAG_SEQUENCE = 0x10 | |
TAG_SET = 0x11 | |
TAG_PRINTABLE_STRING = 0x13 | |
TAG_T61_STRING = 0X14 | |
TAG_IA5_STRING = 0X16 | |
TAG_UTCTIME = 0X17 | |
TAG_HIGH = 0x1f | |
tag_name = { | |
TAG_IMPLICIT: u"IMPLICIT", | |
TAG_BOOLEAN: u"BOOLEAN", | |
TAG_INTEGER: u"INTEGER", | |
TAG_BIT_STRING: u"BIT_STRING", | |
TAG_OCTET_STRING: u"OCTET_STRING", | |
TAG_NULL: u"NULL", | |
TAG_OBJECT_ID: u"OBJECT_ID", | |
TAG_UTF8_STRING: u"UTF8_STRING", | |
TAG_SEQUENCE: u"SEQUENCE", | |
TAG_SET: u"SET", | |
TAG_PRINTABLE_STRING: u"PRINTABLE_STRING", | |
TAG_T61_STRING: u"T61_STRING", | |
TAG_IA5_STRING: u"IA5_STRING", | |
TAG_UTCTIME: u"UTCTIME", | |
TAG_HIGH: u"<high tag>", | |
} | |
VARIABLE_LENGTH = -1 | |
ENCODING_PRIMITIVE = 0 | |
ENCODING_CONSTRUCTED = 1 | |
class DERParser(object): | |
"""Parse DER file""" | |
def __init__(self, stream): | |
super(DERParser, self).__init__() | |
self.stream = stream | |
self.index = 0 | |
self.indent = 0 | |
def next_byte(self): | |
try: | |
byte = ord(self.stream[self.index]) | |
self.index += 1 | |
return byte | |
except IndexError: | |
raise DERParserEOF | |
def peek_byte(self): | |
try: | |
return ord(self.stream[self.index]) | |
except IndexError: | |
raise DERParserEOF | |
def get_slice(self, length): | |
try: | |
slc = [ord(x) for x in self.stream[self.index:self.index + length]] | |
self.index += length | |
return slc | |
except IndexError: | |
raise DERParserEOF | |
def parse_boolean(self, length): | |
if length != 1: | |
raise DERParserException(u"BOOLEAN length should be 1") | |
return (length, self.next_byte() != 0) | |
def parse_base128(self): | |
value = 0 | |
size = 0 | |
while True: | |
byte = self.next_byte() | |
size += 1 | |
value = (value << 7) | (byte & 0x7f) | |
if byte < 0x80: | |
return (size, value) | |
def parse_integer(self, length): | |
value = 0 | |
value = self.next_byte() | |
if value >= 0x80: | |
value -= 256 | |
for i in xrange(length - 1): | |
value = (value << 8) | self.next_byte() | |
return (length, value) | |
def parse_bit_string(self, length): | |
pad_bits = self.next_byte() | |
octets = self.get_slice(length - 1) | |
value = u"".join(u"{0:08b}".format(x) for x in octets) | |
if pad_bits: | |
value = value[:-pad_bits] | |
return (length, value) | |
def parse_octet_string(self, length): | |
return (length, "".join(chr(x) for x in self.get_slice(length))) | |
def parse_object_id(self, length): | |
byte = self.next_byte() | |
size = 1 | |
objid = u"%d.%d" % (int(byte / 40), byte % 40) | |
while size < length: | |
s, number = self.parse_base128() | |
size += s | |
objid += u".%d" % number | |
return (length, objid) | |
def parse_utf8_string(self, length): | |
return (length, "".join(chr(x) for x in self.get_slice(length)).decode(u"utf-8")) | |
def parse_end_of_seq(self): | |
for i in xrange(2): | |
byte = self.next_byte() | |
if byte != 0: | |
raise DERParserException(u"Unexpected byte 0x%02x in end of seq" % byte) | |
def parse_sequence(self, length): | |
sequence = list() | |
size = 0 | |
self.indent += 4 | |
if length == VARIABLE_LENGTH: | |
while True: | |
if self.peek_byte() == 0: | |
self.parse_end_of_seq() | |
self.indent -= 4 | |
break | |
else: | |
s, obj = self.parse() | |
size += s | |
sequence.append(obj) | |
else: | |
while size < length: | |
s, obj = self.parse() | |
size += s | |
sequence.append(obj) | |
self.indent -= 4 | |
return (size, tuple(sequence)) | |
def parse_ia5_string(self, length): | |
return (length, "".join(chr(x) for x in self.get_slice(length)).decode(u"ascii")) | |
def parse_length(self): | |
byte = self.next_byte() | |
if byte == 0x80: | |
return VARIABLE_LENGTH | |
elif byte < 0x80: | |
return byte | |
else: | |
size = byte & 0x7f | |
length = 0 | |
for i in xrange(size): | |
length = (length << 8) | self.next_byte() | |
return length | |
def parse(self): | |
offset = self.index | |
byte = self.next_byte() | |
type_class = byte >> 6 | |
encoding = (byte >> 5) & 1 | |
tag = byte & 0x1f | |
if tag == TAG_HIGH: | |
raise NotImplementedError(u"TAG_HIGH") | |
length = self.parse_length() | |
if length == VARIABLE_LENGTH: | |
length_str = u"variable length" | |
else: | |
length_str = u"%d bytes" % length | |
print8(u"%s%04x: %02x = %d, %s %s, %s" % ( | |
u" " * self.indent, | |
offset, | |
byte, | |
type_class, | |
u"CONSTRUCTED" if encoding == ENCODING_CONSTRUCTED else u"PRIMITIVE", | |
tag_name.get(tag, u"<unknown tag>"), | |
length_str) | |
) | |
if encoding == ENCODING_CONSTRUCTED: | |
size, value = self.parse_sequence(length) | |
if len(value) == 1: | |
value = value[0] | |
#if tag == TAG_SET: | |
# value = frozenset(value) | |
elif tag == TAG_BOOLEAN: | |
size, value = self.parse_boolean(length) | |
elif tag == TAG_INTEGER: | |
size, value = self.parse_integer(length) | |
elif tag == TAG_BIT_STRING: | |
size, value = self.parse_bit_string(length) | |
elif tag in (TAG_OCTET_STRING, TAG_T61_STRING): | |
size, value = self.parse_octet_string(length) | |
elif tag == TAG_NULL: | |
size, value = (0, None) | |
elif tag == TAG_UTF8_STRING: | |
size, value = self.parse_utf8_string(length) | |
elif tag == TAG_OBJECT_ID: | |
size, value = self.parse_object_id(length) | |
elif tag == TAG_SEQUENCE: | |
size, value = self.parse_sequence(length) | |
elif tag == TAG_SET: | |
size, value = self.parse_sequence(length) | |
size, value = (size, set(value)) | |
elif tag in (TAG_IA5_STRING, TAG_PRINTABLE_STRING): | |
size, value = self.parse_ia5_string(length) | |
elif tag == TAG_UTCTIME: | |
size, value = self.parse_ia5_string(length) | |
else: | |
raise NotImplementedError(u"unimplemented tag 0x%02x (%s)" % ( | |
tag, | |
tag_name.get(tag, u"<unknown tag>") | |
)) | |
if encoding == ENCODING_PRIMITIVE and tag not in (TAG_SEQUENCE, TAG_SET): | |
repr_value = repr(value).decode(u"ascii") | |
if len(repr_value) > 255: | |
repr_value = repr_value[:40] + u"..." | |
print8(u"%s value: %s" % ( | |
u" " * self.indent, | |
repr_value | |
)) | |
return (self.index - offset, value) | |
def print8(*args): | |
print " ".join(unicode(x).encode(u"utf-8") for x in args) | |
def main(argv): | |
p = argparse.ArgumentParser() | |
p.add_argument(u"-v", u"--verbose", action=u"store_true", | |
help=u"Verbose output.") | |
p.add_argument(u"der") | |
args = p.parse_args([x.decode(u"utf-8") for x in argv[1:]]) | |
with open(args.der) as f: | |
der = f.read() | |
parser = DERParser(der) | |
size, result = parser.parse() | |
import pprint | |
pprint.pprint(result) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment