Created
January 21, 2024 12:13
-
-
Save erhosen/6d472a69d43f0a6c5d9eb16910398019 to your computer and use it in GitHub Desktop.
SQSClient
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import AsyncExitStack | |
from typing import AsyncGenerator | |
from aiobotocore import session | |
from pydantic import Field | |
from pydantic_settings import BaseSettings | |
class SQSClientConfig(BaseSettings): | |
aws_access_key_id: str = Field(validation_alias="AWS_ACCESS_KEY_ID") | |
aws_secret_access_key: str = Field(validation_alias="AWS_SECRET_ACCESS_KEY") | |
aws_region_name: str = Field(validation_alias="AWS_REGION_NAME") | |
aws_endpoint_url: str = Field(validation_alias="AWS_ENDPOINT_URL") | |
queue_name: str = Field(validation_alias="QUEUE_NAME") | |
class SQSClient: | |
def __init__(self): | |
self.config = SQSClientConfig() | |
self._exit_stack = AsyncExitStack() | |
self._client: session.AioBaseClient | |
self._queue_url: str | |
async def __aenter__(self): | |
boto_session = session.get_session() | |
self._client = await self._exit_stack.enter_async_context( | |
boto_session.create_client( | |
service_name="sqs", | |
endpoint_url=self.config.aws_endpoint_url, | |
aws_access_key_id=self.config.aws_access_key_id, | |
aws_secret_access_key=self.config.aws_secret_access_key, | |
region_name=self.config.aws_region_name, | |
) | |
) | |
self._queue_url = ( | |
await self._client.get_queue_url(QueueName=self.config.queue_name) | |
)["QueueUrl"] | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) | |
async def receive_message(self): | |
return await self._client.receive_message( | |
QueueUrl=self._queue_url, | |
WaitTimeSeconds=2, | |
) | |
async def delete_message(self, receipt_handle: str): | |
await self._client.delete_message( | |
QueueUrl=self._queue_url, | |
ReceiptHandle=receipt_handle, | |
) | |
async def send_message(self, message: str): | |
await self._client.send_message( | |
QueueUrl=self._queue_url, | |
MessageBody=message, | |
) | |
async def get_sqs_client() -> AsyncGenerator[SQSClient, None]: | |
async with SQSClient() as sqs_client: | |
yield sqs_client |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment