Skip to content

Instantly share code, notes, and snippets.

@rhockenbury
Created June 15, 2017 02:40
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save rhockenbury/74d176691a4d9e2a84f64cb314910fc6 to your computer and use it in GitHub Desktop.
Save rhockenbury/74d176691a4d9e2a84f64cb314910fc6 to your computer and use it in GitHub Desktop.
Queue Celery Task in SQS from Lambda
from __future__ import print_function
import json
import urllib
import boto3
import uuid
import base64
print("Loading function")
sqs = boto3.client("sqs")
QUEUE_URL = "YOUR_QUEUE_URL"
"""
{
"task_name": "tasks.your_task_name",
"task_args": [
"your", "task", "args"
]
}
"""
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=4))
task_name = event["task_name"]
task_args = event["task_args"]
task_kwargs = {}
# generate celery task message
msg_id = str(uuid.uuid4())
msg_envelope = {
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": {},
"properties": {
"body_encoding": "base64",
"correlation_id": msg_id,
"delivery_info": {
"exchange": None,
"routing_key": None
},
"delivery_tag": None
}
}
msg_body = {
"task": task_name,
"args": task_args,
"kwargs": task_kwargs,
"id": msg_id,
"retries": 0
}
# package celery task message
msg_envelope["body"] = base64.b64encode(json.dumps(msg_body))
msg = base64.b64encode(json.dumps(msg_envelope))
# send message to sqs
response = sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=msg
)
if response["MessageId"]:
print("Task message sent to sqs: " + json.dumps(msg_body, indent=4))
else:
raise Exception("Task message unable to send: " + json.dumps(msg_body, indent=4))
from __future__ import print_function
import json
import urllib
import boto3
import uuid
import base64
print("Loading function")
sqs = boto3.client("sqs")
QUEUE_URL = "YOUR_QUEUE_URL"
"""
{
"Records": [
{
"eventVersion": "2.0",
"eventTime": "1970-01-01T00:00:00.000Z",
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"s3": {
"configurationId": "testConfigRule",
"object": {
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901",
"key": "HappyFace.jpg",
"size": 1024
},
"bucket": {
"arn": "arn:aws:s3:::mybucket",
"name": "sourcebucket",
"ownerIdentity": {
"principalId": "EXAMPLE"
}
},
"s3SchemaVersion": "1.0"
},
"responseElements": {
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
"x-amz-request-id": "EXAMPLE123456789"
},
"awsRegion": "us-east-1",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"eventSource": "aws:s3"
}
]
}
"""
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=4))
s3_record = event["Records"][0]["s3"]
bucket = s3_record["bucket"]["name"]
key = urllib.unquote_plus(s3_record["object"]["key"].encode("utf8"))
etag = s3_record["object"]["eTag"]
# determine task type to queue based on S3 object key
if "some_string" in key:
task_name = "tasks.some_task"
elif "some_other_string" in key:
task_name = "tasks.some_other_task"
else:
task_name = "tasks.some_default_task"
task_args = [bucket, key, etag]
task_kwargs = {}
# generate celery task message
msg_id = str(uuid.uuid4())
msg_envelope = {
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": {},
"properties": {
"body_encoding": "base64",
"correlation_id": msg_id,
"delivery_info": {
"exchange": None,
"routing_key": None
},
"delivery_tag": None
}
}
msg_body = {
"task": task_name,
"args": task_args,
"kwargs": task_kwargs,
"id": msg_id,
"retries": 0
}
# package celery task message
msg_envelope["body"] = base64.b64encode(json.dumps(msg_body))
msg = base64.b64encode(json.dumps(msg_envelope))
# send message to sqs
response = sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=msg
)
if response["MessageId"]:
print("Task message sent to sqs: " + json.dumps(msg_body, indent=4))
else:
raise Exception("Task message unable to send: " + json.dumps(msg_body, indent=4))
@andrebr
Copy link

andrebr commented Jul 5, 2019

Thanks for the great resource! Saved me a ton of time.

Just had this issue: TypeError: the JSON object must be str, bytes or bytearray, not dict, which could fix by replace those lines:

msg_envelope["body"] = base64.b64encode(json.dumps(msg_body))
msg = base64.b64encode(json.dumps(msg_envelope))

With these:

msg_envelope["body"] = base64.b64encode(json.dumps(msg_body).encode('utf8')).decode('utf8')
msg = base64.b64encode(json.dumps(msg_envelope).encode('utf8')).decode('utf8')

So the body could be serialized, as it's a byte when json.dumps expects str.

@jdenisTLM
Copy link

Thanks for the tip ! But on my side, It ends by a crash ...

[2024-01-31 12:32:34,352: INFO/ForkPoolWorker-16] Task main.identification_task[2e945089-217d-497c-b53e-9c7dc663f08d] succeeded in 0.00021101800302858464s: 'yomy-queue.fifo'
[2024-01-31 12:32:34,352: CRITICAL/MainProcess] Unrecoverable error: TypeError('can only concatenate str (not "NoneType") to str')
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/celery/worker/worker.py", line 202, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 742, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.10/site-packages/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/usr/local/lib/python3.10/site-packages/kombu/asynchronous/hub.py", line 306, in create_loop
    item()
  File "/usr/local/lib/python3.10/site-packages/vine/promises.py", line 161, in __call__
    return self.throw()
  File "/usr/local/lib/python3.10/site-packages/vine/promises.py", line 158, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/usr/local/lib/python3.10/site-packages/kombu/message.py", line 131, in ack_log_error
    self.ack(multiple=multiple)
  File "/usr/local/lib/python3.10/site-packages/kombu/message.py", line 126, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "/usr/local/lib/python3.10/site-packages/kombu/transport/SQS.py", line 637, in basic_ack
    queue = self.canonical_queue_name(message['routing_key'])
  File "/usr/local/lib/python3.10/site-packages/kombu/transport/SQS.py", line 351, in canonical_queue_name
    return self.entity_name(self.queue_name_prefix + queue_name)
TypeError: can only concatenate str (not "NoneType") to str
[2024-01-31 12:32:35,488: WARNING/MainProcess] Restoring 1 unacknowledged message(s)
[2024-01-31 12:32:35,488: WARNING/MainProcess] UNABLE TO RESTORE 1 MESSAGES: (TypeError('can only concatenate str (not "NoneType") to str'),)
[2024-01-31 12:32:35,488: WARNING/MainProcess] EMERGENCY DUMP STATE TO FILE -> /tmp/tmp1alk3jko <-
[2024-01-31 12:32:35,488: WARNING/MainProcess] Cannot pickle state: TypeError("cannot pickle 'Message' object: a class that defines __slots__ without defining __getstate__ cannot be pickled with protocol 0"). Fallback to pformat.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment