Skip to content

Instantly share code, notes, and snippets.

@tehmaze
Created February 13, 2011 16:23
Show Gist options
  • Save tehmaze/824817 to your computer and use it in GitHub Desktop.
Save tehmaze/824817 to your computer and use it in GitHub Desktop.
Dump data streams
#! /usr/bin/env python
#
# _______
# ____________ _______ _\__ /_________ ___ _____
# | _ _ \ _ | ____\ _ / | |/ _ \
# | / / / / | | | /___/ _ | | / /
# |___/___/ /___/____|________|___ | |_| |___|_____/
# \__/ |___|
#
#
# (c) 2011 Wijnand 'maze' Modderman-Lenstra - https://maze.io/
#
__author__ = 'Wijnand Modderman-Lenstra <maze@pyth0n.org>'
__copyright__ = '(C) 2011 Wijnand Modderman-Lenstra'
__license__ = 'MIT'
__url__ = 'https://maze.io/'
import socket as python_socket
import sys
class HexDump(object):
def __init__(self, pad=1, step=16, buffer=sys.stdout):
self.pad = pad
self.step = step
# Make sure we have even steps
if self.step % 2 != 0:
self.step += 1
self.buffer = buffer
def dump(self, data, encoding='utf8', offset=0):
'''
Dump the contents of a byte array, such as a string or an unicode
object.
'''
if type(data) == unicode:
data = bytearray(data, encoding)
else:
data = bytearray(data)
for y in xrange(0, len(data), self.step):
part = data[y:y+self.step]
print '0x%04x ' % (y * self.step + offset,),
hexc = []
for x in xrange(0, len(part), 2):
if len(part[x:x+2]) == 1:
hexc.append('%02x ' % (part[x],))
else:
hexc.append('%02x%02x' % tuple(part[x:x+2]))
char = [(c >= 0x20 and c < 0x7f) and chr(c) or '.' for c in part]
pads = ' ' * self.pad
print pads.join(hexc).ljust((self.step / 2) * (4 + self.pad)), ''.join(char)
def dump_stream(self, stream, encoding='utf8'):
'''
Consume a stream dumping its contents.
'''
size = 0
while True:
data = stream.read(self.step)
if len(data) == 0:
break
else:
self.dump(data, encoding=encoding, offset=size)
size += len(data)
class socket(object):
'''
Socket wrapper that inherits from :type:``socket.socket`` dumping the
data being received and sent.
'''
def __init__(self, *args, **kwargs):
self.hexdump = HexDump()
self.socket = python_socket.socket(*args, **kwargs)
self.__doc__ = self.socket.__doc__
for attr in dir(self.socket):
if not hasattr(self, attr):
setattr(self, attr, getattr(self.socket, attr))
def recv(self, *args, **kwargs):
data = self.socket.recv(*args, **kwargs)
self.hexdump.dump(data)
return data
def recvfrom(self, *args, **kwargs):
data = self.socket.recvfrom(*args, **kwargs)
self.hexdump.dump(data)
return data
def send(self, data, *args, **kwargs):
self.hexdump.dump(data)
return self.socket.send(data, *args, **kwargs)
def sendto(self, data, *args, **kwargs):
self.hexdump.dump(data)
return self.socket.sendto(data, *args, **kwargs)
def run():
import optparse
parser = optparse.OptionParser(usage='%prog [<options>] [<file>]')
parser.add_option('-e', '--encoding', default='utf8',
help='character encoding (default: UTF-8)')
parser.add_option('-p', '--pad', type='int', dest='pad', default=1,
help='padding width (default: 1)')
parser.add_option('-s', '--step', type='int', dest='step', default=16,
help='step size (default: 16)')
options, args = parser.parse_args()
if not args:
stream = sys.stdin
elif args[0] == '-':
stream = sys.stdin
else:
stream = file(args[0])
hexdump = HexDump(options.pad, options.step)
hexdump.dump_stream(stream, options.encoding)
if __name__ == '__main__':
sys.exit(run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment