Skip to content

Instantly share code, notes, and snippets.

@erhosen
Created January 21, 2024 12:13
Show Gist options
  • Save erhosen/6d472a69d43f0a6c5d9eb16910398019 to your computer and use it in GitHub Desktop.
Save erhosen/6d472a69d43f0a6c5d9eb16910398019 to your computer and use it in GitHub Desktop.
SQSClient
from contextlib import AsyncExitStack
from typing import AsyncGenerator
from aiobotocore import session
from pydantic import Field
from pydantic_settings import BaseSettings
class SQSClientConfig(BaseSettings):
aws_access_key_id: str = Field(validation_alias="AWS_ACCESS_KEY_ID")
aws_secret_access_key: str = Field(validation_alias="AWS_SECRET_ACCESS_KEY")
aws_region_name: str = Field(validation_alias="AWS_REGION_NAME")
aws_endpoint_url: str = Field(validation_alias="AWS_ENDPOINT_URL")
queue_name: str = Field(validation_alias="QUEUE_NAME")
class SQSClient:
def __init__(self):
self.config = SQSClientConfig()
self._exit_stack = AsyncExitStack()
self._client: session.AioBaseClient
self._queue_url: str
async def __aenter__(self):
boto_session = session.get_session()
self._client = await self._exit_stack.enter_async_context(
boto_session.create_client(
service_name="sqs",
endpoint_url=self.config.aws_endpoint_url,
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key,
region_name=self.config.aws_region_name,
)
)
self._queue_url = (
await self._client.get_queue_url(QueueName=self.config.queue_name)
)["QueueUrl"]
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
async def receive_message(self):
return await self._client.receive_message(
QueueUrl=self._queue_url,
WaitTimeSeconds=2,
)
async def delete_message(self, receipt_handle: str):
await self._client.delete_message(
QueueUrl=self._queue_url,
ReceiptHandle=receipt_handle,
)
async def send_message(self, message: str):
await self._client.send_message(
QueueUrl=self._queue_url,
MessageBody=message,
)
async def get_sqs_client() -> AsyncGenerator[SQSClient, None]:
async with SQSClient() as sqs_client:
yield sqs_client
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment