Start worker
dagster-celery worker start \
--name worker_1 \
--app app \
--queue dagster
Pipeline execution
execution:
celery:
config:
broker: sqs://xxx:yyy@
backend: dynamodb://xxx:yyy@us-east-1/dagster
import boto3 | |
from celery import Celery | |
from kombu.utils.url import safequote | |
from dagster_celery.tasks import create_task | |
DYNAMO_TABLE = "dagster" | |
SQS_QUEUE = "dagster" | |
credentials = session.get_credentials() | |
aws_access_key = safequote(credentials.access_key) | |
aws_secret_key = safequote(credentials.secret_key) | |
config = { | |
"broker": f"sqs://{aws_access_key}:{aws_secret_key}@", | |
"backend": f"dynamodb://{aws_access_key}:{aws_secret_key}@us-east-1/{DYNAMO_TABLE}", | |
"task_default_queue": SQS_QUEUE, | |
"task_routes": {"prod": {"queue": SQS_QUEUE}}, | |
} | |
app = Celery("dagster", **config) | |
execute_plan = create_task(app) | |
if __name__ == "__main__": | |
app.worker_main() |
DagsterSQS: | |
Type: AWS::SQS::Queue | |
Properties: | |
QueueName: dagster | |
DagsterTable: | |
Type: AWS::DynamoDB::Table | |
Properties: | |
TableName: dagster | |
AttributeDefinitions: | |
- AttributeName: "id" | |
AttributeType: "S" | |
KeySchema: | |
- AttributeName: "id" | |
KeyType: "HASH" | |
ProvisionedThroughput: | |
ReadCapacityUnits: "2" | |
WriteCapacityUnits: "2" |