Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
S3Connector
import os
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
from typing import IO, Dict, Generator, Literal
import boto3
from botocore.exceptions import ClientError
S3ConnectorMode = Literal["r", "w", "a"]
class S3Connector:
def __init__(self, bucket: str, **kwargs: Dict) -> None:
s3 = boto3.resource("s3", **kwargs)
try:
s3.meta.client.head_bucket(Bucket=bucket)
except ClientError as err:
raise err
self._bucket = s3.Bucket(bucket)
@contextmanager
def open(self, s3key: str, mode: S3ConnectorMode = "r") -> Generator[IO, None, None]:
if mode == "r":
yield from self._read(s3key)
elif mode == "w":
yield from self._write(s3key)
elif mode == "a":
yield from self._add(s3key)
else:
raise Exception("invalid mode")
def _read(self, s3key: str) -> Generator[IO, None, None]:
# 存在確認
try:
self._bucket.meta.client.head_object(Bucket=self._bucket.name, Key=s3key)
except ClientError as err:
raise err
# S3のオブジェクトをローカルに一時ファイルとして作成
with NamedTemporaryFile(delete=False, mode="w") as f_tmp:
tmpfile_name = f_tmp.name
self._bucket.download_file(s3key, tmpfile_name)
# withブロックに読み込み専用ファイルIOを渡す
# withブロック終了後に一時ファイルを閉じて削除する
f_yield = open(tmpfile_name)
try:
yield f_yield
finally:
f_yield.close()
os.unlink(tmpfile_name)
def _write(self, s3key: str) -> Generator[IO, None, None]:
# 書き込み用の一時ファイルを作成
f_yield = NamedTemporaryFile(delete=False, mode="w")
tmpfile_name = f_yield.name
# withブロックに書き込み専用ファイルIOを渡す
# withブロック内で例外が発生した場合は一時ファイルを閉じて削除する
try:
yield f_yield
except Exception as err:
f_yield.close()
os.unlink(tmpfile_name)
raise err
# withブロックが正常終了した場合
# S3にアップロードしてから一時ファイルを閉じて削除する
try:
f_yield.flush()
self._bucket.upload_file(tmpfile_name, s3key)
except ClientError as err:
raise err
finally:
f_yield.close()
os.unlink(tmpfile_name)
def _add(self, s3key: str) -> Generator[IO, None, None]:
# 存在確認
try:
self._bucket.meta.client.head_object(Bucket=self._bucket.name, Key=s3key)
except ClientError as err:
raise err
# S3のオブジェクトをローカルに一時ファイルとして作成
with NamedTemporaryFile(delete=False, mode="w") as f_tmp:
tmpfile_name = f_tmp.name
self._bucket.download_file(s3key, tmpfile_name)
# withブロックに追記専用ファイルIOを渡す
# withブロック終了後に一時ファイルを閉じて削除する
f_yield = open(tmpfile_name, "a")
try:
yield f_yield
except Exception as err:
f_yield.close()
os.unlink(tmpfile_name)
raise err
# withブロックが正常終了した場合
# S3にアップロードしてから一時ファイルを閉じて削除する
try:
f_yield.flush()
self._bucket.upload_file(tmpfile_name, s3key)
except ClientError as err:
raise err
finally:
f_yield.close()
os.unlink(tmpfile_name)
if __name__ == "__main__":
bucket = "s3-bucket-name" # WRITE ME
key = "path/to/sample.txt" # WRITE ME
s3_conn = S3Connector(bucket)
with s3_conn.open(key) as f:
print(f.read())
with s3_conn.open(key, "w") as f:
f.write("Hello World\n")
with s3_conn.open(key, "a") as f:
f.write("Goodbye\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment