Skip to content

Instantly share code, notes, and snippets.

@cdunklau
Last active February 5, 2020 08:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cdunklau/4f8c72222295680ca20e3d4401f385b1 to your computer and use it in GitHub Desktop.
Save cdunklau/4f8c72222295680ca20e3d4401f385b1 to your computer and use it in GitHub Desktop.
A reimplementation of Twisted's LineOnlyReceiver with an explicit parser
import collections
from twisted.internet import protocol
class LineParser(object):
def __init__(self, delimiter, max_length):
self.delimiter = delimiter
self.max_length = max_length
self._buffer = b''
self._lines = collections.deque()
def readData(self, data):
lines = (self._buffer + data).split(self.delimiter)
self._buffer = lines.pop()
self._lines.extend(lines)
def iterLines(self):
while self._lines:
line = self._lines.popleft()
if len(line) > self.max_length:
raise LineLengthExceeded(line)
yield line
if len(self._buffer) > self.max_length:
raise LineLengthExceeded(self._buffer)
class LineLengthExceeded(Exception):
def __init__(self, culprit):
super().__init__(culprit)
self.culprit = culprit
class ComposedLineOnlyReceiver(protocol.Protocol):
delimiter = b'\r\n'
MAX_LENGTH = 16384
_parser = None
def dataReceived(self, data):
"""
Translates bytes into lines, and calls lineReceived.
"""
if self._parser is None:
self._parser = LineParser(self.delimiter, self.MAX_LENGTH)
self._parser.readData(data)
try:
for line in self._parser.iterLines():
if self.transport.disconnecting:
# this is necessary because the transport may be told to lose
# the connection by a line within a larger packet, and it is
# important to disregard all the lines in that packet following
# the one that told it to close.
return
self.lineReceived(line)
except LineLengthExceeded as e:
return self.lineLengthExceeded(e.culprit)
def lineReceived(self, line):
"""
Override this for when each line is received.
@param line: The line which was received with the delimiter removed.
@type line: C{bytes}
"""
raise NotImplementedError
def sendLine(self, line):
return self.transport.writeSequence((line, self.delimiter))
def lineLengthExceeded(self, line):
return self.transport.loseConnection()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment