Last active
May 17, 2020 16:44
-
-
Save xnuinside/8451c8cfbed4bf0f42cb3bbe1ba1ca34 to your computer and use it in GitHub Desktop.
integration_tests_article
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3.7.7 as base | |
RUN pip install poetry==1.0.5 | |
WORKDIR /app | |
# hack to avoid reinstall dependencies each time when source code of gino_admin changed | |
# without it poetry will say error what there is no package gino_admin | |
RUN mkdir /app/gino_admin && touch /app/gino_admin/__init__.py | |
COPY pyproject.toml poetry.lock README.rst /app/ | |
RUN poetry config virtualenvs.create false \ | |
&& poetry install --no-interaction --no-ansi | |
COPY gino_admin /app/gino_admin | |
COPY examples/fastapi_as_main_app/src/csv_to_upload /app/csv_to_upload | |
COPY examples/fastapi_as_main_app/src/admin.py examples/fastapi_as_main_app/src/db.py /app/ | |
COPY tests/integration_tests/wait_for.py /wait_for.py | |
CMD python /wait_for.py && python admin.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@pytest.fixture(scope="module") | |
def admin_auth_headers(admin_url): | |
""" get auth token """ | |
headers = {"Authorization": "admin:1234"} | |
result = requests.post(f"{admin_url}/api/auth/", headers=headers) | |
token = result.json().get("access_token") | |
headers = {"Authorization": f"Bearer {token}"} | |
return headers | |
@pytest.fixture(scope="module") | |
def initdb(admin_url, admin_auth_headers): | |
""" run api call with auth token """ | |
result = requests.post(f"{admin_url}/api/presets/", | |
json={"preset_id": "preset_1", "drop": True}, | |
headers=admin_auth_headers) | |
assert result.status_code == 200 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pytest | |
pytest_plugins = ["docker_compose"] | |
@pytest.fixture(scope="module") | |
def docker_compose_file(pytestconfig): | |
return os.path.join(str(pytestconfig.rootdir), "test-docker-compose.yml") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_main_service_run(main_app_url): | |
result = requests.get(main_app_url) | |
assert result.status_code == 200 | |
def test_admin_service_run(admin_url): | |
result = requests.get(admin_url) | |
assert result.status_code == 200 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3.7.7 as base | |
WORKDIR /app | |
COPY examples/fastapi_as_main_app/requirements.txt /app/requirements.txt | |
RUN pip install gino==1.0.0 && \ | |
pip install gino-starlette==0.1.1 && \ | |
pip install fastapi==0.54.1 && \ | |
pip install uvicorn==0.11.5 | |
COPY examples/fastapi_as_main_app/src/main.py examples/fastapi_as_main_app/src/db.py /app/ | |
COPY tests/integration_tests/wait_for.py /wait_for.py | |
CMD uvicorn main:app --host 0.0.0.0 --port 5050 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_main_service_users(main_app_url, initdb): | |
""" run test that depend on data in DB """ | |
result = requests.get(f'{main_app_url}/users').json() | |
assert result | |
assert result == {"count_users": 5} | |
def test_admin_service_drop(admin_auth_headers, admin_url): | |
result = requests.post(f'{admin_url}/api/drop_db', headers=admin_auth_headers) | |
assert result.status_code == 200 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: "3" | |
services: | |
postgres: | |
image: "postgres:9.6" | |
environment: | |
- POSTGRES_USER=gino | |
- POSTGRES_PASSWORD=gino | |
- POSTGRES_DB=gino | |
ports: | |
- "5432:5432" | |
volumes: | |
- ./data/postgres:/var/lib/postgresql/data | |
fastapi_main_app_main: | |
environment: | |
- DB_HOST=postgres | |
build: | |
context: ../../ | |
dockerfile: $PWD/docker/fastapi_as_main_app/Dockerfile | |
ports: | |
- "5050:5050" | |
depends_on: | |
- postgres | |
fastapi_main_app_admin: | |
environment: | |
- DB_HOST=postgres | |
build: | |
context: ../../ | |
dockerfile: $PWD/docker/fastapi_as_main_app/Dockerfile-admin | |
ports: | |
- "5000:5000" | |
depends_on: | |
- postgres |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pytest | |
import requests | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
@pytest.fixture(scope="module") | |
def main_app_url(module_scoped_container_getter): | |
""" Wait for the api from fastapi_main_app_main to become responsive """ | |
request_session = requests.Session() | |
retries = Retry(total=5, backoff_factor=3, status_forcelist=[500, 502, 503, 504]) | |
request_session.mount("http://", HTTPAdapter(max_retries=retries)) | |
service = module_scoped_container_getter.get("fastapi_main_app_main").network_info[0] | |
api_url = f"http://{service.hostname}:{service.host_port}" | |
return api_url | |
@pytest.fixture(scope="module") | |
def admin_url(module_scoped_container_getter): | |
""" Wait for the api from fastapi_main_app_admin to become responsive """ | |
request_session = requests.Session() | |
retries = Retry(total=5, backoff_factor=3, status_forcelist=[500, 502, 503, 504]) | |
request_session.mount("http://", HTTPAdapter(max_retries=retries)) | |
service = module_scoped_container_getter.get("fastapi_main_app_admin").network_info[0] | |
api_url = f"http://{service.hostname}:{service.host_port}/admin" | |
return api_url | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" wait for PostgreSQL DB up """ | |
from time import sleep | |
import asyncio | |
from gino import Gino | |
async def main(): | |
db = Gino() | |
await db.set_bind('postgresql://gino:gino@postgres:5432/gino') | |
await db.pop_bind().close() | |
if __name__ == "__main__": | |
for _ in range(5): | |
try: | |
asyncio.get_event_loop().run_until_complete(main()) | |
print("DB Connected") | |
exit(0) | |
except Exception as e: | |
print(e) | |
print("Postgres is not available. Sleep for 8 sec") | |
sleep(8) | |
else: | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment