Skip to content

Instantly share code, notes, and snippets.

@sportebois
Last active October 13, 2020 16:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sportebois/86eebf5221b2ab104614ecd9a77f7bdc to your computer and use it in GitHub Desktop.
Save sportebois/86eebf5221b2ab104614ecd9a77f7bdc to your computer and use it in GitHub Desktop.
Akita SuperLearn - expose Flask requests/responses from pytests and Flask test client
# file: tests/api_helpers.py
import logging
import threading
from copy import deepcopy
from threading import Thread
from typing import Optional
from uuid import uuid4
import requests
from flask.testing import FlaskClient
from requests import ConnectionError
from werkzeug.serving import run_simple
from werkzeug.wrappers import BaseResponse, Request, Response
from api.constants import constants
logger = logging.getLogger(__name__)
# Start a Werkzeug local server to send echos of HTTP requests and responses
# these echoes will let Akita Super Learn observe the traffic
exposer_resp_maps = {}
exposer_port = 5555
@Request.application
def reply_with_stored_response(request):
h = request.headers.get("Correlation-Id", None)
if not h:
return BaseResponse(status=500)
resp = exposer_resp_maps.get(h)
if not resp:
return BaseResponse(status=500)
return resp
def start_exposer() -> None:
run_simple('localhost', exposer_port, reply_with_stored_response, threaded=True)
pub_thread: Optional[Thread] = None
def _publish_to_exposer(req_infos: dict, resp: Response) -> None:
"""
Publish an echo Akita can witness
"""
headers = req_infos.get("headers") or {}
correlation_id = headers.get("Correlation-Id")
if not correlation_id:
correlation_id = str(uuid4())
headers["Correlation-Id"] = correlation_id
req_infos["headers"] = headers
exposer_resp_maps[correlation_id] = resp
methods = {
"GET": requests.get,
"POST": requests.post,
"PUT": requests.put,
"PATCH": requests.patch,
"DELETE": requests.delete,
"HEAD": requests.head,
}
req_func = methods[req_infos.pop("method")]
path = req_infos.pop("path")
req_infos.setdefault("allow_redirects", False)
try:
req_func(f"http://localhost:{exposer_port}{path}", **req_infos)
except ConnectionError as err:
logger.info(f"Dropped observed query for {req_func} mirror on {path}, got {err}")
def start_exposer_thread() -> None:
"""
Starts the publish_to_observer thread which will listen on observer_port
and send the response copies.
This should be called by the setup as required
(and the mirror start_exposer_thread should be called on teardown)
"""
global pub_thread
pub_thread = threading.Thread(target=start_exposer, daemon=True)
pub_thread.start()
def stop_exposer_thread() -> None:
global pub_thread
if pub_thread:
pub_thread.join(timeout=1.0)
pub_thread = None
class ApiTestClient(FlaskClient):
"""
This App client is used for convenience injection of the base path (api/some/thing/vx) before the endpoint path.
This is also useful to then inherit and test various blueprints.
To use it, you can then put a simple fixture in your conftest.py like
```
@fixture(scope="module")
def test_myapi_client():
# Do your Flask app initialization for test context, in our case:
# configure_app(flask_app, config_name=Environments.TESTS)
flask_app.test_client_class = ApiTestClient
client = flask_app.test_client
yield client
```
Then you’re ready to get this fixture from your tests and use it to make the requests
and asserts the responses details:
```
resp = test_client().put(path=url, headers=headers)
```
"""
def __init__(self, *args, **kwargs):
self.url_prefix = ""
super().__init__(*args, **kwargs)
def open(self, *args, **kw):
if path:= kw.get("path"):
kw["path"] = f"{self.url_prefix}{path}"
resp: Response = super().open(*args, **kw)
# Send a copy of the request to the sink so that it’s visible for the Akita Agent
resp.freeze()
_publish_to_exposer(kw, deepcopy(resp))
return resp
# file: tests/conftest.py
from pytest import fixture
from api.app import app as flask_app # Your will be different
from tests.api_helpers import ApiTestClient, start_exposer_thread, stop_exposer_thread
@fixture(scope="module")
def test_client(exposer_thread):
"""
Use this fixture in your tests to autoamtically trigger echo requests/responses to the observer thread
"""
# Do your configuration if you need to, e.g.
# configure_app(flask_app, config_name=Environments.TESTS)
flask_app.test_client_class = ApiTestClient
client = flask_app.test_client
yield client
@fixture(scope="session")
def exposer_thread(request):
start_exposer_thread()
def close():
stop_exposer_thread()
request.addfinalizer(close)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment