Skip to content

Instantly share code, notes, and snippets.

@andrewp-as-is
Last active November 10, 2022 15:10
Show Gist options
  • Save andrewp-as-is/0e9bc5a39a54235f647198f720d4c21b to your computer and use it in GitHub Desktop.
Save andrewp-as-is/0e9bc5a39a54235f647198f720d4c21b to your computer and use it in GitHub Desktop.
celery AWS SQS send message
#!/usr/bin/env python
import base64
from botocore.exceptions import ClientError
import boto3
import json
import logging
import os
def send_sqs_message(sqs_queue_url, body,attrs):
sqs_client = boto3.client('sqs')
try:
msg = sqs_client.send_message(
QueueUrl=sqs_queue_url,
MessageBody=body,
MessageAttributes=attrs
)
except ClientError as e:
logging.error(e)
return None
return msg
def main():
# export SQS_QUEUE_URL="SQS_QUEUE_URL"
SQS_QUEUE_URL = os.getenv('SQS_QUEUE_URL')
logging.basicConfig(level=logging.DEBUG,
format='%(levelname)s: %(asctime)s: %(message)s')
for i in range(1, 6):
body = f'SQS message #{i}'
attrs = {
'x': { 'StringValue': '1','DataType': 'String' },
'y': { 'StringValue': '2','DataType': 'String' }
}
msg = send_sqs_message(SQS_QUEUE_URL, body, attrs)
if msg is not None:
logging.info(f'Sent SQS message ID: {msg["MessageId"]}')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment