Skip to content

Instantly share code, notes, and snippets.

@mozey
Last active April 19, 2016 09:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mozey/00ab8bf387bee513f1f748e374da349e to your computer and use it in GitHub Desktop.
Save mozey/00ab8bf387bee513f1f748e374da349e to your computer and use it in GitHub Desktop.
SpectoLabs/hoverfly Lookup response by key only
#!/usr/bin/env python3
import sys
import logging
import json
import os
import hashlib
import collections
import copy
if len(sys.argv) == 1:
print("USAGE")
print(" ./middleware.py PATH")
print("")
print(" \"stubs\" dir must exist inside PATH")
print(" Logs will be written to PATH/middleware.log")
print("")
print(" ./middleware.py PATH STUB")
print("")
print(" Read payload from STUB")
sys.exit(1)
PATH = sys.argv[1]
logfilePath = os.path.join(PATH, "middleware.log")
logging.basicConfig(filename=logfilePath, level=logging.DEBUG)
def get_hash(s):
s_hash = hashlib.md5()
s_hash.update(str.encode(s))
return s_hash.hexdigest()
delimiter_added = False
def add_log_delimiter():
"""
Delimit requests to make log easier to read.
Call this before writing to the log
"""
global delimiter_added
logging.debug("." * 80)
delimiter_added = True
def done(payload_dict):
print(json.dumps(payload_dict, indent=2, sort_keys=True))
sys.exit(0)
def json_loads_ordered_dict(s):
"""
http://stackoverflow.com/a/6921760/639133
"""
return json.JSONDecoder(object_pairs_hook=collections.OrderedDict).decode(s)
strip_keys = []
# Example:
# strip_keys = ["timestamp"]
# http://stackoverflow.com/a/20558778/639133
def strip_request_body(d):
"""
Some properties are always stripped from the request body,
for example if the request contains a timestamp,
this can change and then it breaks the hash.
"""
if isinstance(d, (list, tuple, set)):
pass
elif isinstance(d, dict):
return type(d)(
(strip_request_body(k), strip_request_body(v))
for k, v in d.items() if
k not in strip_keys)
else:
return d
def main():
add_log_delimiter()
logging.debug("PATH {}".format(PATH))
# Read payload from file if STUB passed as script param,
# makes debugging middleware easier.
data = [None]
if len(sys.argv) > 2:
payload_path = sys.argv[2]
if os.path.exists(payload_path):
payload_file = open(payload_path, "r")
data = payload_file.read()
data = json_loads_ordered_dict(data)
data["request"]["body"] = json.dumps(data["request"]["body"])
data = json.dumps(data)
data = [data]
else:
payload_dict = collections.OrderedDict()
payload_dict['response'] = collections.OrderedDict()
payload_dict['response']['status'] = 404
payload_dict['response']['body'] = json.dumps({
"errors": "payload_path {} does not exist".format(payload_path)
})
done(payload_dict)
else:
# Hoverfly will pass payload on stdin
data = sys.stdin.readlines()
payload = data[0]
payload_dict = json_loads_ordered_dict(payload)
request_copy = collections.OrderedDict()
payload_body = payload_dict["request"]["body"]
# At this point we assume payload_body is a string
try:
payload_body = json_loads_ordered_dict(payload_body)
request_copy["body"] = copy.deepcopy(payload_body)
except ValueError:
request_copy["body"] = payload_body
request_copy["destination"] = payload_dict["request"]["destination"]
request_copy["method"] = payload_dict["request"]["method"]
request_copy["path"] = payload_dict["request"]["path"]
# request_copy["scheme"] = payload_dict["request"]["scheme"]
request_copy["query"] = payload_dict["request"]["query"]
if isinstance(request_copy["body"], collections.OrderedDict):
request_copy["body"] = strip_request_body(request_copy["body"])
request_dump = json.dumps(request_copy)
request_id = get_hash(request_dump)
stub_path = os.path.join(
PATH, "stubs", "{}.json".format(request_id))
stub_json = {}
if os.path.exists(stub_path):
add_log_delimiter()
logging.debug("request_dump: {}".format(request_dump))
stub = open(stub_path, "r")
stub_json = json_loads_ordered_dict(stub.read())
else:
add_log_delimiter()
logging.debug("request_id {} not found".format(request_id))
logging.debug("request_dump: {}".format(request_dump))
payload_dict['response']['status'] = 404
payload_dict['response']['body'] = json.dumps({
"errors": "stub_path {} does not exist".format(stub_path)
})
done(payload_dict)
payload_dict['response']['status'] = stub_json["response"]["status"]
body = stub_json["response"]["body"]
if isinstance(body, dict) or isinstance(body, collections.OrderedDict):
payload_dict['response']['body'] = \
json.dumps(stub_json["response"]["body"])
# "Python 3 renamed the unicode type to str,
# the old str type has been replaced by bytes."
# http://stackoverflow.com/a/19877309/639133
elif isinstance(body, str):
# Assume PATH/stubs is root, better way to do this?
file_prefix = "file:///"
if body.startswith(file_prefix):
body_path = body[len(file_prefix):]
body_path = os.path.join(PATH, body_path)
if os.path.exists(body_path):
body_file = open(body_path, "r")
payload_dict['response']['body'] = body_file.read()
else:
payload_dict = collections.OrderedDict()
payload_dict['response'] = collections.OrderedDict()
payload_dict['response']['status'] = 404
payload_dict['response']['body'] = json.dumps({
"errors": "body_path {} does not exist".format(body_path)
})
done(payload_dict)
else:
payload_dict['response']['body'] = \
stub_json["response"]["body"]
else:
add_log_delimiter()
logging.debug("Unknown body_type {}".format(type(body)))
payload_dict['response']['headers'] = stub_json["response"]["headers"]
# returning new payload
done(payload_dict)
if __name__ == "__main__":
main()
@mozey
Copy link
Author

mozey commented Apr 19, 2016

Written for hoverfly issue 169

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment