Skip to content

Instantly share code, notes, and snippets.

View fithisux's full-sized avatar

Vasileios Anagnostopoulos fithisux

  • Newcross (Agile Actors)
  • Athens/Greece
View GitHub Profile
version: '3.8'
services:
redis:
image: redis:latest
restart: always
ports:
- '6379:6379'
command: redis-server --save 20 1 --loglevel warning
volumes:
- cache:/data
@fithisux
fithisux / Dockerfile
Created June 17, 2025 07:40
Dockerization
FROM python:latest
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements-prod.txt .
RUN pip install --upgrade pip
@fithisux
fithisux / happy.py
Created June 17, 2025 07:28
Happy path
def test_works_correctly():
response : requests.Response = requests.get('http://127.0.0.1:5000/hello')
# submission happened ok
assert response.status_code == 202
assert response.json()["status"] == "Task has been submitted"
# immediate status is pending
task_id = response.json()["task_id"]
response : requests.Response = requests.get(f'http://127.0.0.1:5000/hellowait/{task_id}')
@fithisux
fithisux / unhappy.py
Created June 17, 2025 07:25
Unhappy path
def test_unknown_id_gives_404():
response : requests.Response = requests.get('http://127.0.0.1:5000/hellowait/lol1taa')
assert response.status_code == 404
assert response.json() == {'exists': False, 'result': None}
import requests
import time
def test_unknown_id_gives_404():
response : requests.Response = requests.get('http://127.0.0.1:5000/hellowait/lol1taa')
assert response.status_code == 404
assert response.json() == {'exists': False, 'result': None}
@fithisux
fithisux / docker-compose.yml
Created June 17, 2025 07:12
Start Services
version: '3.8'
services:
redis:
image: redis:latest
restart: always
ports:
- '6379:6379'
command: redis-server --save 20 1 --loglevel warning
volumes:
- cache:/data
@fithisux
fithisux / waiting.py
Created June 16, 2025 13:57
Wait an identifier
# API route to wait the task
@app.route('/hellowait/<task_id>', methods=['GET'])
def wait_on_task(task_id):
some_result = AsyncResult(task_id, app=xxx)
taskid_list_str = redis_server.get('taskid_list').decode('utf-8')
taskid_list = taskid_list_str.split(";")
print("scan ", taskid_list)
if task_id not in taskid_list:
return jsonify({"result": None, "exists": False}), 404
@fithisux
fithisux / submission.py
Created June 16, 2025 13:50
Submitting a task
# API route to trigger the task
@app.route('/hello', methods=['GET'])
def trigger_task(trail=True):
task = say_hello.delay()
taskid_list_str = redis_server.get('taskid_list').decode('utf-8')
taskid_list = taskid_list_str.split(";")
taskid_list.append(task.id)
redis_server.set('taskid_list', ";".join(taskid_list))
print("append ", taskid_list)
return jsonify({"task_id": task.id, "status": "Task has been submitted"}), 202
@fithisux
fithisux / init_app.py
Created June 16, 2025 13:18
App initialization
redis_server = redis.Redis(config_app.REDIS_HOST, config_app.REDIS_PORT)
redis_server.set('taskid_list', '')
app = Flask(__name__)
# Configure Celery
xxx = Celery(app.name, backend=config_app.CELERY_RESULT_BACKEND, broker=config_app.CELERY_BROKER_URL)
# Define a Celery task
@xxx.task
@fithisux
fithisux / computation.py
Last active June 16, 2025 13:17
Celery computation
# Configure Celery
xxx = Celery(app.name, backend=config_app.CELERY_RESULT_BACKEND, broker=config_app.CELERY_BROKER_URL)
# Define a Celery task
@xxx.task
def say_hello():
time.sleep(10)
return "Hello, World!"