Skip to content

Instantly share code, notes, and snippets.

@arnaldopereira
Created November 13, 2016 19:44
Show Gist options
  • Save arnaldopereira/222041971d0643b550d4262ec43c66e1 to your computer and use it in GitHub Desktop.
Save arnaldopereira/222041971d0643b550d4262ec43c66e1 to your computer and use it in GitHub Desktop.
asyncreplay, python 2 + tornado
#!/usr/bin/env python
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPClient
from tornado.httpclient import HTTPResponse
from tornado.httpclient import HTTPRequest
from tornado.httpclient import HTTPError
import StringIO
import quopri
import json
import mock
import os.path
from contextlib import contextmanager
import logging
class ReplayRecording(object):
"""
Holds on to a set of request keys and their response values.
Can be used to reproduce HTTP/HTTPS responses without using
the network.
"""
def __init__(self, jsonable=None):
self.request_responses = []
if jsonable:
self._from_jsonable(jsonable)
def _from_jsonable(self, jsonable):
self.request_responses = [(r['request'], r['response'])
for r in jsonable]
def __getitem__(self, request):
"""Match requests by the tuple (url, method)
"""
try:
return next(rr[1] for rr in self.request_responses
if rr[0]['url'] == request.url and
rr[0]['method'] == request.method)
except StopIteration:
raise KeyError
def __contains__(self, request):
return any(rr[1] for rr in self.request_responses
if rr[0]['url'] == request.url and
rr[0]['method'] == request.method)
def __setitem__(self, request, response):
self.request_responses.append((request, response))
def to_jsonable(self):
return [dict(request=request, response=response)
for request, response in self.request_responses]
def to_httpresponse(self, request):
"""Try and get a response that matches the request, create a HTTPResponse
object from it and return it.
"""
response_dict = self[request]
return HTTPResponse(
request,
response_dict['status']['code'],
headers=response_dict['headers'],
buffer=StringIO.StringIO(response_dict['body']),
reason=response_dict['status']['message'])
class ReplayRecordingManager(object):
"""
Loads and saves replay recordings as to json files.
"""
@classmethod
def load(cls, recording_file_name):
try:
with open(recording_file_name) as fp:
recording = ReplayRecording(json.load(fp))
except IOError:
logging.debug("ReplayRecordingManager starting new %r",
os.path.basename(recording_file_name))
recording = ReplayRecording()
else:
logging.debug("ReplayRecordingManager loaded from %r",
os.path.basename(recording_file_name))
return recording
@classmethod
def save(cls, recording, recording_file_name):
logging.debug("ReplayRecordingManager saving to %r",
os.path.basename(recording_file_name))
dirname, _ = os.path.split(recording_file_name)
if not os.path.exists(dirname):
os.makedirs(dirname)
with open(recording_file_name, 'w') as recording_file:
json.dump(
recording.to_jsonable(),
recording_file,
indent=4,
sort_keys=True,
cls=RequestResponseEncoder)
class RequestResponseEncoder(json.JSONEncoder):
"""Encoder that handles HTTPRequest and HTTPResponse objects.
"""
def default(self, obj):
if isinstance(obj, HTTPRequest):
return {
'url': obj.url,
'method': obj.method,
'body': obj.body,
'user_agent': obj.user_agent,
'headers': obj.headers
}
if isinstance(obj, HTTPResponse):
return {
'headers': obj.headers,
'status': { 'code': obj.code, 'message': obj.reason },
'body': obj.body,
'body_quoted_printable': quopri.encodestring(obj.body)
}
return json.JSONEncoder.default(self, obj)
def async_replay_patch(fetch_mock, recordfile):
@gen.coroutine
def side_effect(request, **kwargs):
"""Replay http requests for all hosts except localhost.
"""
if request is not HTTPRequest:
request = HTTPRequest(request)
recording = ReplayRecordingManager.load(recordfile)
if request in recording and not request.url.startswith('http://localhost'):
raise gen.Return(recording.to_httpresponse(request))
client = AsyncHTTPClient(force_instance=True)
try:
response = yield client.fetch(request)
except HTTPError as e:
response = e.response
recording[request] = response
ReplayRecordingManager.save(recording, recordfile)
raise gen.Return(response)
fetch_mock.side_effect = side_effect
def patch_fetch(fetch_mock, recordfile):
def side_effect(request, **kwargs):
"""Replay http requests for all hosts except localhost.
"""
if request is not HTTPRequest:
request = HTTPRequest(request)
recording = ReplayRecordingManager.load(recordfile)
try:
response = recording.to_httpresponse(request)
except Exception as e:
logging.info('Response for {0} not found, issuing http request.'.format(request.url))
else:
if not request.url.startswith('http://localhost'):
return recording.to_httpresponse(request)
client = HTTPClient(force_instance=True)
try:
response = client.fetch(request)
except HTTPError as e:
response = e.response
recording[request] = response
ReplayRecordingManager.save(recording, recordfile)
return response
fetch_mock.side_effect = side_effect
@contextmanager
def asyncreplay(recordfile):
with mock.patch.object(AsyncHTTPClient(), 'fetch') as fetch_mock:
async_replay_patch(fetch_mock, recordfile)
yield
@contextmanager
def syncreplay(http_client, recordfile):
with mock.patch.object(http_client, 'fetch') as fetch_mock:
patch_fetch(fetch_mock, recordfile)
yield
@gen.coroutine
def main():
with asyncreplay(os.path.join(os.getcwd(), 'record-asyncreplay.json')):
client = AsyncHTTPClient()
response = yield client.fetch('http://localhost:8888/')
response = yield client.fetch('http://localhost:8888/foobar')
sync_client = HTTPClient()
with syncreplay(sync_client, os.path.join(os.getcwd(), 'record-syncreplay.json')):
response = sync_client.fetch('http://localhost:8888/')
if __name__ == '__main__':
IOLoop.instance().run_sync(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment