Skip to content

Instantly share code, notes, and snippets.

Last active Oct 13, 2020
What would you like to do?
Akita SuperLearn - expose Flask requests/responses from pytests and Flask test client
# file: tests/
import logging
import threading
from copy import deepcopy
from threading import Thread
from typing import Optional
from uuid import uuid4
import requests
from flask.testing import FlaskClient
from requests import ConnectionError
from werkzeug.serving import run_simple
from werkzeug.wrappers import BaseResponse, Request, Response
from api.constants import constants
logger = logging.getLogger(__name__)
# Start a Werkzeug local server to send echos of HTTP requests and responses
# these echoes will let Akita Super Learn observe the traffic
exposer_resp_maps = {}
exposer_port = 5555
def reply_with_stored_response(request):
h = request.headers.get("Correlation-Id", None)
if not h:
return BaseResponse(status=500)
resp = exposer_resp_maps.get(h)
if not resp:
return BaseResponse(status=500)
return resp
def start_exposer() -> None:
run_simple('localhost', exposer_port, reply_with_stored_response, threaded=True)
pub_thread: Optional[Thread] = None
def _publish_to_exposer(req_infos: dict, resp: Response) -> None:
Publish an echo Akita can witness
headers = req_infos.get("headers") or {}
correlation_id = headers.get("Correlation-Id")
if not correlation_id:
correlation_id = str(uuid4())
headers["Correlation-Id"] = correlation_id
req_infos["headers"] = headers
exposer_resp_maps[correlation_id] = resp
methods = {
"GET": requests.get,
"PUT": requests.put,
"PATCH": requests.patch,
"DELETE": requests.delete,
"HEAD": requests.head,
req_func = methods[req_infos.pop("method")]
path = req_infos.pop("path")
req_infos.setdefault("allow_redirects", False)
req_func(f"http://localhost:{exposer_port}{path}", **req_infos)
except ConnectionError as err:"Dropped observed query for {req_func} mirror on {path}, got {err}")
def start_exposer_thread() -> None:
Starts the publish_to_observer thread which will listen on observer_port
and send the response copies.
This should be called by the setup as required
(and the mirror start_exposer_thread should be called on teardown)
global pub_thread
pub_thread = threading.Thread(target=start_exposer, daemon=True)
def stop_exposer_thread() -> None:
global pub_thread
if pub_thread:
pub_thread = None
class ApiTestClient(FlaskClient):
This App client is used for convenience injection of the base path (api/some/thing/vx) before the endpoint path.
This is also useful to then inherit and test various blueprints.
To use it, you can then put a simple fixture in your like
def test_myapi_client():
# Do your Flask app initialization for test context, in our case:
# configure_app(flask_app, config_name=Environments.TESTS)
flask_app.test_client_class = ApiTestClient
client = flask_app.test_client
yield client
Then you’re ready to get this fixture from your tests and use it to make the requests
and asserts the responses details:
resp = test_client().put(path=url, headers=headers)
def __init__(self, *args, **kwargs):
self.url_prefix = ""
super().__init__(*args, **kwargs)
def open(self, *args, **kw):
if path:= kw.get("path"):
kw["path"] = f"{self.url_prefix}{path}"
resp: Response = super().open(*args, **kw)
# Send a copy of the request to the sink so that it’s visible for the Akita Agent
_publish_to_exposer(kw, deepcopy(resp))
return resp
# file: tests/
from pytest import fixture
from import app as flask_app # Your will be different
from tests.api_helpers import ApiTestClient, start_exposer_thread, stop_exposer_thread
def test_client(exposer_thread):
Use this fixture in your tests to autoamtically trigger echo requests/responses to the observer thread
# Do your configuration if you need to, e.g.
# configure_app(flask_app, config_name=Environments.TESTS)
flask_app.test_client_class = ApiTestClient
client = flask_app.test_client
yield client
def exposer_thread(request):
def close():
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment