Skip to content

Instantly share code, notes, and snippets.

@tekumara
Last active December 8, 2023 01:18
Show Gist options
  • Save tekumara/298dbf392f5db259dce32fe4aca65b66 to your computer and use it in GitHub Desktop.
Save tekumara/298dbf392f5db259dce32fe4aca65b66 to your computer and use it in GitHub Desktop.
# ruff: noqa: SLF001 ARG001
from typing import Callable
from unittest.mock import MagicMock
import aiobotocore.awsrequest
import aiobotocore.endpoint
import aiohttp
import aiohttp.client_reqrep
import aiohttp.typedefs
import botocore.awsrequest
import botocore.model
import pytest
"""
Patch aiobotocore to work with moto
See https://github.com/aio-libs/aiobotocore/issues/755
"""
class MockAWSResponse(aiobotocore.awsrequest.AioAWSResponse):
def __init__(self, response: botocore.awsrequest.AWSResponse):
self._moto_response = response
self.status_code = response.status_code
self.raw = MockHttpClientResponse(response)
# adapt async methods to use moto's response
async def _content_prop(self) -> bytes:
return self._moto_response.content
async def _text_prop(self) -> str:
return self._moto_response.text
class MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse):
def __init__(self, response: botocore.awsrequest.AWSResponse):
async def read(n: int = -1) -> bytes:
# streaming/range requests. used by s3fs
if not self.content._eof:
self.content._eof = True
return response.content
return b""
self.content = MagicMock(aiohttp.StreamReader)
self.content.read = read
self.content._eof = False
@property
def raw_headers(self) -> aiohttp.typedefs.RawHeaders:
return ()
@pytest.fixture(scope="session")
def _patch_aiobotocore() -> None:
def factory(original: Callable) -> Callable:
def patched_convert_to_response_dict(
http_response: botocore.awsrequest.AWSResponse, operation_model: botocore.model.OperationModel
):
return original(MockAWSResponse(http_response), operation_model)
return patched_convert_to_response_dict
aiobotocore.endpoint.convert_to_response_dict = factory(aiobotocore.endpoint.convert_to_response_dict)
@tekumara
Copy link
Author

Tested with aiobotocore 2.3.4, moto 3.1.18, and s3fs 2022.7.1.

@tekumara
Copy link
Author

Updated to work with

aiobotocore==2.5.4
moto==4.1.14
s3fs==2023.6.0

@k-scherer
Copy link

@tekumara I am running into an issue utilizing this patch with s3fs.copy due to an issue in
aiobotocore.handlers._looks_like_special_case_error. Any idea how to add this to the patch and resolve that?

@tekumara
Copy link
Author

@k-scherer I've not hit that, could you share a stack trace?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment