Last active
November 24, 2023 03:30
-
-
Save 6mini/8ae385bfcfa06af2d6ef24428ee73c4d to your computer and use it in GitHub Desktop.
S3ParquetHandler 클래스는 AWS S3에서 Parquet 파일을 읽고, Pandas DataFrame을 S3에 저장하는 기능을 제공한다. 이 클래스는 boto3 세션 초기화, 단일 및 다중 Parquet 파일 읽기, 멀티스레딩을 통한 성능 향상, 경로 부분 추출하여 컬럼으로 추가, 여러 경로 파일 병합, DataFrame을 Parquet으로 변환하여 S3에 저장 등의 기능을 갖추고 있다.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
이 파일은 S3ParquetHandler 클래스를 정의하고 있으며, 이 클래스는 AWS S3에서 Parquet 파일을 효율적으로 읽거나 Pandas DataFrame을 S3에 Parquet 형태로 저장하기 위한 기능을 제공한다. 클래스의 주요 기능은 다음과 같다: | |
1. 클래스 초기화 (__init__ 메소드): | |
- AWS S3 자격 증명 및 지역 설정을 받아 boto3 세션을 초기화한다. | |
- 멀티스레딩 사용 여부(use_multithreading)와 최대 스레드 수(max_threads)를 설정할 수 있다. 멀티스레딩 사용 시, 파일 로딩 속도가 향상될 수 있다. | |
2. 단일 Parquet 파일 읽기 (read_parquet_from_s3 메소드): | |
- 지정된 S3 버킷과 키를 사용하여 단일 Parquet 파일을 읽고, Pandas DataFrame으로 반환한다. | |
3. 다중 Parquet 파일 읽기 (read_multiple_parquets_from_s3 메소드): | |
- 지정된 S3 경로의 모든 Parquet 파일을 찾아 이를 합쳐 하나의 DataFrame으로 반환한다. | |
- 필요에 따라 경로의 특정 부분(예: 'year', 'month')을 추출하여 DataFrame의 컬럼으로 추가할 수 있다. | |
- 멀티스레딩을 활성화한 경우, concurrent.futures.ThreadPoolExecutor를 사용하여 파일 로딩을 병렬 처리한다. | |
4. 다중 Prefix의 다중 Parquet 파일 읽기 (read_multiple_prefix_parquets_from_s3 메소드): | |
- 여러 카테고리(접두사가 다른 경로)에 걸쳐 있는 Parquet 파일들을 읽어 합치는 기능을 제공한다. | |
5. S3에 Parquet 파일 저장 (save_parquet_to_s3 메소드): | |
- Pandas DataFrame을 Parquet 파일 형식으로 변환하고 지정된 S3 경로에 저장한다. | |
6. 도우미 메소드들: | |
- _parse_s3_key: S3 키에서 지정된 부분을 추출한다. | |
- _get_s3_keys: 주어진 경로에 해당하는 S3 키 리스트를 가져온다. | |
- _normalize_s3_key: S3 키가 전체 URL 형태일 경우 필요한 부분만 추출한다. | |
- _print_verbose: 파일 로딩 과정을 출력한다. | |
이 클래스는 S3 상의 대량의 Parquet 데이터를 효율적으로 읽어들이고 처리하는 데 유용하며, 특히 대규모 데이터 처리 작업에서 성능 향상을 위해 멀티스레딩 옵션을 제공한다. | |
""" | |
# pip install boto3 pandas pyarrow | |
# import libraries | |
import boto3 | |
import pandas as pd | |
import io | |
import concurrent.futures | |
import os | |
import time | |
class S3ParquetHandler: | |
def __init__(self, s3_config: dict, use_multithreading: bool = False, max_threads: int = None): | |
""" | |
AWS S3에서 Parquet 파일을 읽기 위한 클래스를 초기화한다. | |
:param s3_config: AWS 자격 증명 및 지역 정보를 포함한 사전. | |
:param use_multithreading: 멀티스레딩을 사용할지 여부. | |
:param max_threads: 사용할 최대 스레드 수. None이면 기본값 사용. | |
""" | |
session = boto3.session.Session(**s3_config) | |
self.s3_client = session.client('s3') | |
self.s3_resource = session.resource('s3') | |
self.use_multithreading = use_multithreading | |
self.max_threads = max_threads if max_threads is not None else min(32, (os.cpu_count() or 1) + 4) | |
def read_parquet_from_s3(self, key: str, bucket: str, **args): | |
""" | |
S3에서 Parquet 파일을 읽어 Pandas DataFrame으로 반환한다. | |
:param key: Parquet 파일의 S3 키. | |
:param bucket: 파일이 저장된 S3 버킷. | |
:return: Parquet 파일의 데이터를 포함하는 Pandas DataFrame. | |
""" | |
try: | |
key = self._normalize_s3_key(key, bucket) | |
obj = self.s3_client.get_object(Bucket=bucket, Key=key) | |
with io.BytesIO(obj['Body'].read()) as f: | |
return pd.read_parquet(f, **args) | |
except Exception as e: | |
print(f"{bucket} 버킷의 {key} 읽기 오류: {e}") | |
return pd.DataFrame() | |
def read_multiple_parquets_from_s3(self, filepath: str, bucket: str, extract_columns=None, verbose: bool = False, **args): | |
""" | |
주어진 S3 경로에서 여러 Parquet 파일을 읽고 하나의 DataFrame으로 합친다. | |
필요한 경우, 경로의 특정 부분을 컬럼으로 추출한다. | |
:param filepath: Parquet 파일이 저장된 S3 경로. | |
:param bucket: S3 버킷. | |
:param extract_columns: 추출하고 싶은 경로 부분의 리스트. 예: ['year', 'month'] | |
:param verbose: True인 경우, 로드되는 파일을 출력한다. | |
:return: 모든 Parquet 파일을 합친 Pandas DataFrame. | |
""" | |
s3_keys = self._get_s3_keys(filepath, bucket) | |
if not s3_keys: | |
print(f'{bucket}의 {filepath}에 Parquet 파일이 없습니다.') | |
return pd.DataFrame() | |
if verbose: | |
self._print_verbose(s3_keys) | |
if self.use_multithreading: | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_threads) as executor: | |
futures = {executor.submit(self.read_parquet_from_s3, key, bucket=bucket, **args): key for key in s3_keys} | |
dfs = [] | |
for future in concurrent.futures.as_completed(futures): | |
key = futures[future] | |
try: | |
df = future.result() | |
if extract_columns: | |
path_details = self._parse_s3_key(key, extract_columns) | |
for col, val in path_details.items(): | |
df[col] = val | |
dfs.append(df) | |
except Exception as e: | |
print(f"{bucket} 버킷 {key} 읽기 오류: {e}") | |
else: | |
dfs = [self.read_parquet_from_s3(key, bucket=bucket, **args) for key in s3_keys] | |
for i, df in enumerate(dfs): | |
if extract_columns: | |
path_details = self._parse_s3_key(s3_keys[i], extract_columns) | |
for col, val in path_details.items(): | |
df[col] = val | |
return pd.concat(dfs, ignore_index=True) | |
def read_multiple_prefix_parquets_from_s3(self, category_prefixes: list, bucket: str, verbose: bool = False, **args): | |
""" | |
여러 접두사에 해당하는 다중 Parquet 파일을 읽고 합친다. | |
:param category_prefixes: 읽을 카테고리 접두사 리스트. | |
:param bucket: S3 버킷. | |
:param verbose: True인 경우, 상세한 출력을 보여준다. | |
:return: 모든 프리픽스의 데이터를 합친 DataFrame. | |
""" | |
dfs = [self.read_multiple_parquets_from_s3(prefix, bucket, verbose=verbose, **args) for prefix in category_prefixes] | |
return pd.concat(dfs, ignore_index=True) | |
def save_parquet_to_s3(self, df, bucket, key, **args): | |
""" | |
Pandas DataFrame을 Parquet 파일로 변환하여 S3에 저장한다. | |
:param df: 저장할 Pandas DataFrame. | |
:param bucket: 저장할 S3 버킷. | |
:param key: 저장할 파일의 S3 키. | |
""" | |
try: | |
key = self._normalize_s3_key(key, bucket) | |
# DataFrame을 BytesIO 객체로 변환 | |
buffer = io.BytesIO() | |
df.to_parquet(buffer, engine='pyarrow', compression='snappy', index=False, **args) | |
buffer.seek(0) | |
# S3에 Parquet 파일 업로드 | |
for i in range(10): | |
try: | |
self.s3_client.upload_fileobj(buffer, bucket, key) | |
print(f"S3 버킷 {bucket}의 {key}에 데이터 저장 완료") | |
break | |
except Exception as e: | |
print(f"재시도 {i+1}: {e}") # 각 시도의 에러 메시지 출력 | |
if i == 9: | |
raise e # 마지막 재시도에서도 실패하면 예외 발생 | |
time.sleep(1) | |
except Exception as e: | |
print(f"S3 버킷 {bucket}의 {key}에 데이터 저장 최종 오류: {e}") | |
def _parse_s3_key(self, key: str, extract_columns: list): | |
""" | |
S3 키에서 지정된 부분의 값을 추출한다. | |
:param key: S3 키. | |
:param extract_columns: 추출할 경로 부분의 리스트. | |
:return: 추출된 부분의 값이 포함된 사전. | |
""" | |
path_parts = key.split('/') | |
details = {} | |
for part in path_parts: | |
if '=' in part: | |
col, val = part.split('=', 1) | |
if col in extract_columns: | |
details[col] = val | |
return details | |
def _get_s3_keys(self, filepath: str, bucket: str): | |
""" | |
주어진 경로에 해당하는 S3 키 리스트를 가져온다. | |
:param filepath: 검색할 S3 경로. | |
:param bucket: S3 버킷. | |
:return: 해당 경로에 맞는 S3 키의 리스트. | |
""" | |
if not filepath.endswith('/'): | |
filepath += '/' | |
filepath = self._normalize_s3_key(filepath, bucket) | |
return [item.key for item in self.s3_resource.Bucket(bucket).objects.filter(Prefix=filepath) | |
if item.key.endswith('.parquet')] | |
def _normalize_s3_key(self, key, bucket): | |
""" | |
S3 키가 전체 URL 형태일 경우 필요한 부분만 추출한다. | |
:param key: S3 키 또는 전체 URL. | |
:param bucket: S3 버킷. | |
:return: 정규화된 S3 키. | |
""" | |
if 's3://' in key: | |
return key.replace(f's3://{bucket}/', '') | |
return key | |
def _print_verbose(self, s3_keys): | |
print('Parquet 파일 로딩 중:') | |
for p in s3_keys: | |
print(p) | |
# 사용 예: | |
s3_config = { | |
'aws_access_key_id': 'YOUR_ACCESS_KEY_ID', | |
'aws_secret_access_key': 'YOUR_SECRET_ACCESS_KEY', | |
'region_name': 'YOUR_REGION' | |
} | |
bucket_name = 'your-bucket-name' | |
# 멀티스레딩을 사용하고 최대 스레드 수를 10으로 설정하여 S3ParquetHandler 인스턴스 생성 | |
handler = S3ParquetHandler(s3_config, use_multithreading=True, max_threads=10) | |
# 단일 Parquet 파일 읽기 | |
single_file = 'your/single/file.parquet' | |
single_parquet_df = handler.read_parquet_from_s3(single_file, bucket_name) | |
print(single_parquet_df.head()) | |
# 여러 Parquet 파일 읽기 | |
file_path = 'your/file/path/' | |
multiple_parquets_df = handler.read_multiple_parquets_from_s3(file_path, bucket_name, verbose=True) | |
print(multiple_parquets_df.head()) | |
# 파티션을 추출하여 컬럼으로 추가 | |
file_path = 'your/file/path/year=2020/month=01/day=01/' | |
extract_columns = ['year', 'month', 'day'] | |
multiple_parquets_df = handler.read_multiple_parquets_from_s3(file_path, bucket_name, extract_columns=extract_columns, verbose=True) | |
print(multiple_parquets_df.head()) | |
# 다양한 접두사를 사용하여 여러 Parquet 파일 읽기 | |
category_prefixes = ['your/file/path-1/', 'your/file/path-2/'] | |
all_categories_df = handler.read_multiple_prefix_parquets_from_s3(category_prefixes, bucket_name, verbose=True) | |
print(all_categories_df.head()) | |
# DataFrame을 S3에 Parquet 파일로 저장 | |
df_to_save = pd.DataFrame(...) # 저장할 데이터 | |
save_path = 'your/path/to/save/file.parquet' | |
handler.save_parquet_to_s3(df_to_save, bucket_name, save_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment