Skip to content

Instantly share code, notes, and snippets.

@buddylindsey
Last active October 20, 2020 00:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save buddylindsey/0b8dd87403f2cbc01fe611b9384f42b2 to your computer and use it in GitHub Desktop.
Save buddylindsey/0b8dd87403f2cbc01fe611b9384f42b2 to your computer and use it in GitHub Desktop.
SQS Client
class SQSClient:
def __init__(self, queue):
self.delete = False
self.client = boto3.client("sqs")
self.url = self.get_queue(queue)
self.receipt_handles = None
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if self.receipt_handles:
for handle in self.receipt_handles:
self.client.delete_message(QueueUrl=self.url, ReceiptHandle=handle)
def get_queue(self, queue):
response = self.client.get_queue_url(QueueName=queue)
return response["QueueUrl"]
def get_messages(self) -> List[dict]:
self.delete = True
response = self.client.receive_message(
QueueUrl=self.url, MaxNumberOfMessages=10
)
if "Messages" in response:
self.receipt_handles = [
msg["ReceiptHandle"] for msg in response["Messages"]
]
return [self.deserialize(msg["Body"]) for msg in response["Messages"]]
return []
def deserialize(self, data):
return json.loads(data)
def serialize(self, data):
return json.dumps(data)
def send_message(self, data):
response = self.client.send_message(
QueueUrl=self.url, MessageBody=self.serialize(data)
)
def send_message_batch(self, data: List[dict]):
final_data = [
{"Id": str(uuid.uuid4()), "MessageBody": self.serialize(d)} for d in data
]
response = self.client.send_message_batch(QueueUrl=self.url, Entries=final_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment