Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Monkey patch for Python-EngineIO JSONP-Polling support
# For Python-EngineIO 2.0.2
def engine_io_patcher():
from engineio.payload import Payload as eio_Payload
from engineio.server import Server as eio_Server
original_handle_request = eio_Server.handle_request
def new_eioserv__ok(self, packets=None, headers=None, b64=False, jsonp_seq=None):
"""response generator."""
if packets is not None:
if headers is None:
headers = []
if b64:
headers += [('Content-Type', 'text/plain; charset=UTF-8')]
headers += [('Content-Type', 'application/octet-stream')]
return {'status': '200 OK',
'headers': headers,
'response': eio_Payload(packets=packets).encode(b64=b64, jsonp_seq=jsonp_seq)}
return {'status': '200 OK',
'headers': [('Content-Type', 'text/plain')],
'response': b'OK'}
def new_eiopayload_decode(self, encoded_payload):
"""Decode a transmitted payload."""
import six
from engineio import packet
self.packets = []
while encoded_payload:
if encoded_payload.startswith(b'd='): # is form submit
encoded_payload = encoded_payload[2:]
encoded_payload = parse.unquote_to_bytes(encoded_payload.decode('utf-8'))
if six.byte2int(encoded_payload[0:1]) <= 1:
packet_len = 0
i = 1
while six.byte2int(encoded_payload[i:i + 1]) != 255:
packet_len = packet_len * 10 + six.byte2int(encoded_payload[i:i + 1])
i += 1
self.packets.append(packet.Packet(encoded_packet=encoded_payload[i + 1:i + 1 + packet_len]))
i = encoded_payload.find(b':')
if i == -1:
raise ValueError('invalid payload')
# extracting the packet out of the payload is extremely
# inefficient, because the payload needs to be treated as
# binary, but the non-binary packets have to be parsed as
# unicode. Luckily this complication only applies to long
# polling, as the websocket transport sends packets
# individually wrapped.
packet_len = int(encoded_payload[0:i])
pkt = encoded_payload.decode('utf-8', errors='ignore')[
i + 1: i + 1 + packet_len].encode('utf-8')
# the protocol sends the packet length in
# utf-8 characters, but we need it in bytes to be able to
# jump to the next packet in the payload
packet_len = len(pkt)
encoded_payload = encoded_payload[i + 1 + packet_len:]
def new_eiopayload_encode(self, b64=False, jsonp_seq=None):
"""Encode the payload for transmission."""
import six
encoded_payload = b''
for pkt in self.packets:
encoded_packet = pkt.encode(b64=b64)
packet_len = len(encoded_packet)
if b64:
encoded_payload += str(packet_len).encode('utf-8') + b':' + encoded_packet
binary_len = b''
while packet_len != 0:
binary_len = six.int2byte(packet_len % 10) + binary_len
packet_len = int(packet_len / 10)
if not pkt.binary:
encoded_payload += b'\0'
encoded_payload += b'\1'
encoded_payload += binary_len + b'\xff' + encoded_packet
# Patch start
if jsonp_seq is not None:
return '___eio[%s]("%s");' % (str(jsonp_seq), str(encoded_payload, encoding='utf-8').replace('"', '\\"'))
# Patch end
return encoded_payload
def new_eioserv__handle_connect(self, environ, start_response, transport, b64=False, jsonp_seq=None):
"""handshake entry"""
from engineio import socket
from engineio import packet
sid = self._generate_id()
s = socket.Socket(self, sid)
# Patch start
# save jsonp status to engineio::socket instance
setattr(s, 'is_jsonp', True if jsonp_seq is not None else False)
setattr(s, 'jsonp_seq', jsonp_seq)
# Patch end
self.sockets[sid] = s
pkt = packet.Packet(
packet.OPEN, {'sid': sid,
'upgrades': self._upgrades(sid, transport),
'pingTimeout': int(self.ping_timeout * 1000),
'pingInterval': int(self.ping_interval * 1000)})
ret = self._trigger_event('connect', sid, environ, run_async=False)
if ret is False:
del self.sockets[sid]
self.logger.warning('Application rejected connection')
return self._unauthorized()
if transport == 'websocket':
ret = s.handle_get_request(environ, start_response)
if s.closed:
# websocket connection ended, so we are done
del self.sockets[sid]
return ret
s.connected = True
headers = None
if self.cookie:
headers = [('Set-Cookie', self.cookie + '=' + sid)]
return self._ok(s.poll(), headers=headers, b64=b64, jsonp_seq=jsonp_seq)
def new_eioserv_handle_request(self, environ, start_response, patched_call=False):
# Patch start
# bypass jsonp not supported bad request
# handle jsonp request with regular polling code
if not patched_call:
from six.moves import urllib
from engineio.exceptions import EngineIOError
method = environ['REQUEST_METHOD']
query = urllib.parse.parse_qs(environ.get('QUERY_STRING', ''))
if 'j' in query:
sid = query['sid'][0] if 'sid' in query else None
b64 = False
if 'b64' in query:
if query['b64'][0] == "1" or query['b64'][0].lower() == "true":
b64 = True
if method == 'GET':
if sid is None: # Need handshake
transport = query.get('transport', ['polling'])[0]
if transport != 'polling' and transport != 'websocket':
self.logger.warning('Invalid transport %s', transport)
r = self._bad_request()
r = self._handle_connect(environ, start_response, transport, b64, jsonp_seq=int(query['j'][0]))
if sid not in self.sockets:
self.logger.warning('Invalid session %s', sid)
r = self._bad_request()
socket = self._get_socket(sid)
packets = socket.handle_get_request(environ, start_response)
if isinstance(packets, list):
r = self._ok(packets, b64=b64, jsonp_seq=int(query['j'][0]))
r = packets
except EngineIOError:
if sid in self.sockets: # pragma: no cover
r = self._bad_request()
if sid in self.sockets and self.sockets[sid].closed:
del self.sockets[sid]
elif method == 'POST':
if sid is None or sid not in self.sockets:
self.logger.warning('Invalid session %s', sid)
r = self._bad_request()
socket = self._get_socket(sid)
r = self._ok(jsonp_seq=int(query['j'][0]))
except EngineIOError:
if sid in self.sockets: # pragma: no cover
r = self._bad_request()
except: # pragma: no cover
# for any other unexpected errors, we log the error
# and keep going
self.logger.exception('post request handler error')
r = self._ok(jsonp_seq=int(query['j'][0]))
self.logger.warning('Method %s not supported', method)
r = self._method_not_found()
if not isinstance(r, dict):
return r or []
if self.http_compression and len(r['response']) >= self.compression_threshold:
encodings = [e.split(';')[0].strip() for e in environ.get('HTTP_ACCEPT_ENCODING', '').split(',')]
for encoding in encodings:
if encoding in self.compression_methods:
r['response'] = getattr(self, '_' + encoding)(r['response'])
r['headers'] += [('Content-Encoding', encoding)]
cors_headers = self._cors_headers(environ)
start_response(r['status'], r['headers'] + cors_headers)
return [r['response']]
return original_handle_request(self, environ, start_response)
# Patch end
eio_Server._ok = new_eioserv__ok
eio_Payload.decode = new_eiopayload_decode
eio_Payload.encode = new_eiopayload_encode
eio_Server.handle_request = new_eioserv_handle_request
eio_Server._handle_connect = new_eioserv__handle_connect
log('EngineIO JSONP support patch done.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.