Skip to content

Instantly share code, notes, and snippets.

@dehume-drizly
Created October 2, 2020 13:53
Show Gist options
  • Save dehume-drizly/6c01cfcfd700e61eeb8238f208a61cb7 to your computer and use it in GitHub Desktop.
Save dehume-drizly/6c01cfcfd700e61eeb8238f208a61cb7 to your computer and use it in GitHub Desktop.

Start worker

dagster-celery worker start \
  --name worker_1 \
  --app app \
  --queue dagster

Pipeline execution

execution:
  celery:
    config:
      broker: sqs://xxx:yyy@
      backend: dynamodb://xxx:yyy@us-east-1/dagster
import boto3
from celery import Celery
from kombu.utils.url import safequote
from dagster_celery.tasks import create_task
DYNAMO_TABLE = "dagster"
SQS_QUEUE = "dagster"
credentials = session.get_credentials()
aws_access_key = safequote(credentials.access_key)
aws_secret_key = safequote(credentials.secret_key)
config = {
"broker": f"sqs://{aws_access_key}:{aws_secret_key}@",
"backend": f"dynamodb://{aws_access_key}:{aws_secret_key}@us-east-1/{DYNAMO_TABLE}",
"task_default_queue": SQS_QUEUE,
"task_routes": {"prod": {"queue": SQS_QUEUE}},
}
app = Celery("dagster", **config)
execute_plan = create_task(app)
if __name__ == "__main__":
app.worker_main()
DagsterSQS:
Type: AWS::SQS::Queue
Properties:
QueueName: dagster
DagsterTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: dagster
AttributeDefinitions:
- AttributeName: "id"
AttributeType: "S"
KeySchema:
- AttributeName: "id"
KeyType: "HASH"
ProvisionedThroughput:
ReadCapacityUnits: "2"
WriteCapacityUnits: "2"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment