Skip to content

Instantly share code, notes, and snippets.

@xnuinside
Last active May 17, 2020 16:44
Show Gist options
  • Save xnuinside/8451c8cfbed4bf0f42cb3bbe1ba1ca34 to your computer and use it in GitHub Desktop.
Save xnuinside/8451c8cfbed4bf0f42cb3bbe1ba1ca34 to your computer and use it in GitHub Desktop.
integration_tests_article
FROM python:3.7.7 as base
RUN pip install poetry==1.0.5
WORKDIR /app
# hack to avoid reinstall dependencies each time when source code of gino_admin changed
# without it poetry will say error what there is no package gino_admin
RUN mkdir /app/gino_admin && touch /app/gino_admin/__init__.py
COPY pyproject.toml poetry.lock README.rst /app/
RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi
COPY gino_admin /app/gino_admin
COPY examples/fastapi_as_main_app/src/csv_to_upload /app/csv_to_upload
COPY examples/fastapi_as_main_app/src/admin.py examples/fastapi_as_main_app/src/db.py /app/
COPY tests/integration_tests/wait_for.py /wait_for.py
CMD python /wait_for.py && python admin.py
@pytest.fixture(scope="module")
def admin_auth_headers(admin_url):
""" get auth token """
headers = {"Authorization": "admin:1234"}
result = requests.post(f"{admin_url}/api/auth/", headers=headers)
token = result.json().get("access_token")
headers = {"Authorization": f"Bearer {token}"}
return headers
@pytest.fixture(scope="module")
def initdb(admin_url, admin_auth_headers):
""" run api call with auth token """
result = requests.post(f"{admin_url}/api/presets/",
json={"preset_id": "preset_1", "drop": True},
headers=admin_auth_headers)
assert result.status_code == 200
import os
import pytest
pytest_plugins = ["docker_compose"]
@pytest.fixture(scope="module")
def docker_compose_file(pytestconfig):
return os.path.join(str(pytestconfig.rootdir), "test-docker-compose.yml")
def test_main_service_run(main_app_url):
result = requests.get(main_app_url)
assert result.status_code == 200
def test_admin_service_run(admin_url):
result = requests.get(admin_url)
assert result.status_code == 200
FROM python:3.7.7 as base
WORKDIR /app
COPY examples/fastapi_as_main_app/requirements.txt /app/requirements.txt
RUN pip install gino==1.0.0 && \
pip install gino-starlette==0.1.1 && \
pip install fastapi==0.54.1 && \
pip install uvicorn==0.11.5
COPY examples/fastapi_as_main_app/src/main.py examples/fastapi_as_main_app/src/db.py /app/
COPY tests/integration_tests/wait_for.py /wait_for.py
CMD uvicorn main:app --host 0.0.0.0 --port 5050
def test_main_service_users(main_app_url, initdb):
""" run test that depend on data in DB """
result = requests.get(f'{main_app_url}/users').json()
assert result
assert result == {"count_users": 5}
def test_admin_service_drop(admin_auth_headers, admin_url):
result = requests.post(f'{admin_url}/api/drop_db', headers=admin_auth_headers)
assert result.status_code == 200
version: "3"
services:
postgres:
image: "postgres:9.6"
environment:
- POSTGRES_USER=gino
- POSTGRES_PASSWORD=gino
- POSTGRES_DB=gino
ports:
- "5432:5432"
volumes:
- ./data/postgres:/var/lib/postgresql/data
fastapi_main_app_main:
environment:
- DB_HOST=postgres
build:
context: ../../
dockerfile: $PWD/docker/fastapi_as_main_app/Dockerfile
ports:
- "5050:5050"
depends_on:
- postgres
fastapi_main_app_admin:
environment:
- DB_HOST=postgres
build:
context: ../../
dockerfile: $PWD/docker/fastapi_as_main_app/Dockerfile-admin
ports:
- "5000:5000"
depends_on:
- postgres
import pytest
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
@pytest.fixture(scope="module")
def main_app_url(module_scoped_container_getter):
""" Wait for the api from fastapi_main_app_main to become responsive """
request_session = requests.Session()
retries = Retry(total=5, backoff_factor=3, status_forcelist=[500, 502, 503, 504])
request_session.mount("http://", HTTPAdapter(max_retries=retries))
service = module_scoped_container_getter.get("fastapi_main_app_main").network_info[0]
api_url = f"http://{service.hostname}:{service.host_port}"
return api_url
@pytest.fixture(scope="module")
def admin_url(module_scoped_container_getter):
""" Wait for the api from fastapi_main_app_admin to become responsive """
request_session = requests.Session()
retries = Retry(total=5, backoff_factor=3, status_forcelist=[500, 502, 503, 504])
request_session.mount("http://", HTTPAdapter(max_retries=retries))
service = module_scoped_container_getter.get("fastapi_main_app_admin").network_info[0]
api_url = f"http://{service.hostname}:{service.host_port}/admin"
return api_url
#!/usr/bin/python
""" wait for PostgreSQL DB up """
from time import sleep
import asyncio
from gino import Gino
async def main():
db = Gino()
await db.set_bind('postgresql://gino:gino@postgres:5432/gino')
await db.pop_bind().close()
if __name__ == "__main__":
for _ in range(5):
try:
asyncio.get_event_loop().run_until_complete(main())
print("DB Connected")
exit(0)
except Exception as e:
print(e)
print("Postgres is not available. Sleep for 8 sec")
sleep(8)
else:
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment