Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# routes
from fastapi import APIRouter, Depends, HTTPException
from starlette.status import HTTP_201_CREATED
from starlette.responses import JSONResponse
from sqlalchemy.orm import Session
from app.core.config import WPS_PROCESS_LINK
from app import crud
from app.api.utils.db import get_db
from app.models.job import (
jobCollection,
JobCreate,
statusInfo,
statusEnum
)
from app.core.celery_app import celery_app
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post(
"/processes/{id}/jobs/",
operation_id="createJob",
status_code=HTTP_201_CREATED,
include_in_schema=True
)
def create_wps_job_by_process(
*,
db: Session = Depends(get_db),
id: str,
job_in: JobCreate,
current_user: DBUser = Depends(get_current_active_user),
):
"""
Create new job for a wps process.
"""
process = crud.process.get_by_id(db_session=db, id=id)
if not process:
raise HTTPException(
status_code=404,
detail=f"The process with id {id} does not exist."
)
job = crud.job.create(
db_session=db,
job_in=job_in,
process_id=process.pid,
owner_id=current_user.id
)
base_url = WPS_PROCESS_LINK["href"]
job_id = job.jid
location = f"{base_url}{id}/jobs/{job_id}"
headers = {"Location": f"{location}"}
if job:
# the conditional order drives the precedence of async/sync for now
# It could be something like ?c=sync/async
if jobControlOptions.ASYNC in process.jobControlOptions:
logger.info(f"======> Launch async task for jobID {job_id}")
celery_app.send_task(
"async_buffer",
args=[job.jid, location]
)
return JSONResponse(
content=None,
headers=headers,
status_code=HTTP_201_CREATED
)
elif jobControlOptions.SYNC in process.jobControlOptions:
# @TODO: Implement an abstract way to get the process
# from the catalog
pass
# celery_app
from celery import Celery
celery_app = Celery("worker", broker="amqp://guest@queue//")
celery_app.conf.task_routes = {
"app.worker.test_celery": "main-queue",
"async_buffer": "main-queue"
}
#app/tasks/base.py
import sqlalchemy
from app.core.celery_app import celery_app
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from app.db.session import engine
class WPSTask(celery_app.Task):
def __call__(self, *args, **kwargs):
self.engine = engine
session_factory = sessionmaker(
autocommit=False, autoflush=False, bind=self.engine
)
self.session = scoped_session(session_factory)
return super().__call__(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if hasattr(self, 'session'):
self.session.remove()
if hasattr(self, 'engine'):
self.engine.engine.dispose()
from typing import List
from pydantic import UUID4, UrlStr
from app.db_models.job import Job
from app.core.celery_app import celery_app
from app.tasks.base import WPSTask
from app.api.utils.processes.buffer.buffer import Buffer
from app.api.utils.processes.hello.hello import getResult
from app.models.job import statusEnum
from app.models.common import link as Link
from app.core.config import (
StatusMessage,
ApplicationType,
WPSRel,
Lang,
Title
)
# app/worker.py
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@celery_app.task(acks_late=True)
def test_celery(word: str):
return f"test task return {word}"
@celery_app.task(
bind=True,
base=WPSTask,
name="async_buffer",
acks_late=True
)
def async_buffer(self, job_jid: UUID4, location: UrlStr):
logger.info("======> Let's try to execute the query")
logger.info(f"======> The unique value of UUID for the job is {job_jid}")
job = self.session.query(Job).get(job_jid)
logger.info(f"======> Status for the job is {job.status}")
self_link = Link(
href=f"{location}",
rel=WPSRel.SELF.value,
type=ApplicationType.JSON.value,
hreflang=Lang.EN.value,
title=Title.SELF.value
)
logger.info(f"======> Self link is {self_link}")
job_links = [self_link.dict()]
job.links = job_links
self.session.commit()
p = Buffer(inputs=job.inputs, outputs=job.outputs)
# p.run()
# save result in the Job model
logger.info("======> Let's try to save the result")
result = getResult() # p.result
if result:
result_link = Link(
href=f"{location}/result",
rel=WPSRel.RESULT.value,
type=ApplicationType.JSON.value,
hreflang=Lang.EN.value,
title=Title.RESULT.value
)
job.progress = 100
job.status = statusEnum.SUCCESSFUL.value
job.message = StatusMessage.SUCCESSFUL.value
job_links = job_links + [result_link.dict()]
logger.info(f"======> job_links has value {job_links}")
job.links = job_links
job.result = result.dict()
self.session.commit()
json_result = result.json()
logger.info(f"======> Job record refreshed")
logger.info(f"======> The new value for the status of the job is {job.status}")
logger.info(f"======> Result dictionary for the job {job_jid} is \n{json_result}")
return f"async buffer finished"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment