Skip to content

Instantly share code, notes, and snippets.

@wrenoud
Last active August 29, 2015 14:02
Show Gist options
  • Save wrenoud/a9eaf6f5be72f45d5e0b to your computer and use it in GitHub Desktop.
Save wrenoud/a9eaf6f5be72f45d5e0b to your computer and use it in GitHub Desktop.
Buffered datagram reader
from collections import deque
import struct
import sys
import io
class datagram_reader:
"""Buffered datagram reader
Accepts any stream that supports read() method
"""
def __init__(self, stream, dg_size_fmt_char, start_byte = '\x02', end_byte = '\x03', maximum_packet_size = 50, outfyle = None):
self.stream = stream
if isinstance( stream, (str, unicode) ):
# filename provided instead of stream, open file as stream
self.stream = io.open(stream, 'rb')
self.buffer = deque([])
self.maximum_packet_size = maximum_packet_size
self.outfyle = outfyle # file to save stream
self.dg_size_fmt_char = dg_size_fmt_char
self.dg_size_fmt_char_size = struct.calcsize(dg_size_fmt_char)
# packet start and end byte flags
self.start_byte = start_byte
self.end_byte = end_byte
# prime the buffer
self.topUpBuffer(self.maximum_packet_size)
def next(self):
if len(self.buffer) > 0:
self.topUpBuffer(self.maximum_packet_size)
packet_size = self.getPacketSize()
# find the next start and end byte, it will be after the packet size
while packet_size > self.maximum_packet_size or \
self.buffer[self.dg_size_fmt_char_size] != self.start_byte or \
self.buffer[(packet_size + self.dg_size_fmt_char_size) - 3] != self.end_byte:
# shift right one byte and throw it away (pass through unrecognized bytes)
sys.stdout.write(self.buffer.popleft())
self.topUpBuffer(self.maximum_packet_size)
packet_size = self.getPacketSize()
# make sure we've fed the buffer enough
self.topUpBuffer(packet_size + self.dg_size_fmt_char_size)
# extract the packet
packet_data = self.extractFromBuffer(packet_size, skip = self.dg_size_fmt_char_size)
return packet_data
else:
raise StopIteration
def __iter__(self):
return self
def getPacketSize(self):
"""Joins packet size bytes and unpacks, returning integer size"""
# use list comprehension to concatenate
data = ''.join([self.buffer[i] for i in xrange(self.dg_size_fmt_char_size)])
packet_size = struct.unpack(self.dg_size_fmt_char, data)[0]
return packet_size
def extractFromBuffer(self, size, skip = 0):
"""Removes 'size' bytes from buffer and returns concatenated byte string"""
# remove skip bytes from buffer (used for skipping packet size bytes)
for i in xrange(skip):
self.buffer.popleft()
# use list comprehension to remove from deque and concatenate,
# significantly faster than byte by byte concatenation using +=
data = ''.join([self.buffer.popleft() for i in xrange(size)])
return data
def topUpBuffer(self, size):
if len(self.buffer) < size:
content = self.stream.read(size - len(self.buffer))
# append to the buffer
self.buffer.extend(content)
# save the stream to file
if self.outfyle:
self.outfyle.write(content)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment