Skip to content

Instantly share code, notes, and snippets.

@shiro01
Last active December 17, 2021 04:20
Show Gist options
  • Save shiro01/451661e517697699cd75c952c8def01b to your computer and use it in GitHub Desktop.
Save shiro01/451661e517697699cd75c952c8def01b to your computer and use it in GitHub Desktop.
Athenaにクエリを実行しS3に保存されたファイルを別のフォルダにファイル名を変更しつつ移動する。
import boto3
import os
import time
from datetime import datetime
from datetime import timedelta
def lambda_handler(event, context):
query = build_query()
target_db = os.environ['TARGET_DB']
athena_result_output_s3_bucket = s3_bucket = os.environ['ATHENA_RESULT_OUTPUT_S3_BUCKET']
athena_result_output_s3_key = os.environ['ATHENA_RESULT_OUTPUT_S3_KEY']
athena_result_output_s3_dir = athena_result_output_s3_bucket + '/' + athena_result_output_s3_key
query_response = athena_query_execution(target_db, athena_result_output_s3_dir, query)
print(query_response)
# Athenaクエリ結果(CSVファイル)はAthenaによってS3に保存される
retry_count = int(os.environ['RETRY_COUNT'])
athena_query_response_wait(query_response['QueryExecutionId'], retry_count)
# コピー対象のファイルキー
copy_target_file_s3_key = athena_result_output_s3_key + '/' + query_response['QueryExecutionId'] + '.csv'
print(athena_result_output_s3_bucket + '/' + copy_target_file_s3_key)
# コピー先S3内でのファイルパス
target_datetime = datetime.now() + timedelta(hours=9) - timedelta(days=1)
copy_output_s3_bucket = s3_bucket = os.environ['COPY_OUTPUT_S3_BUCKET']
copy_output_s3_key_filename = os.environ['COPY_OUTPUT_S3_KEY'] + '/' \
+ os.environ['COPY_OUTPUT_FILE_NAME'] \
+ target_datetime.strftime('%Y%m%d') + '.csv'
print(athena_result_output_s3_bucket + '/' + copy_output_s3_key_filename)
# S3ファイルコピー実行
athena_query_result_copy(
athena_result_output_s3_bucket,
copy_target_file_s3_key,
athena_result_output_s3_bucket,
copy_output_s3_key_filename
)
return 'end'
def build_query():
return "SELECT * FROM log_analysis.test limit 2;"
# athenaのqueryを実行する
def athena_query_execution(target_db, athena_result_output_s3_dir, query):
print("QUERY : " + query)
client = boto3.client('athena')
query_response = client.start_query_execution(
QueryString = query,
QueryExecutionContext = {
'Database': target_db
},
ResultConfiguration = {
'OutputLocation': 's3://' + athena_result_output_s3_dir,
}
)
return query_response
# 対象query_execution_idの結果を取得する
def athena_query_response_wait(query_execution_id, retry_count):
client = boto3.client('athena')
# ステータス確認
for i in range(1, 1 + retry_count):
query_status = client.get_query_execution(QueryExecutionId = query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
print("STATUS:" + query_execution_status)
if query_execution_status == 'SUCCEEDED':
break
if query_execution_status == 'FAILED':
raise Exception('FAILED')
else:
time.sleep(1)
else:
# タイムアウトした場合Queryを止める
client.stop_query_execution(QueryExecutionId = query_execution_id)
raise Exception('TIME OUT')
# 結果取得
result = client.get_query_results(QueryExecutionId = query_execution_id)
return result
# S3バケット内のファイルコピー
def athena_query_result_copy(target__bucket, target_key, output_bucket, output_key):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.copy_object
s3client = boto3.client('s3')
s3client.copy_object(
Bucket=output_bucket,
Key=output_key,
CopySource={
'Bucket': target__bucket,
'Key': target_key
}
)
# iam role, policy
# AmazonS3FullAccess
# AmazonAthenaFullAccess
#
# 環境変数
# ATHENA_RESULT_OUTPUT_S3_BUCKET : test_bucket
# ATHENA_RESULT_OUTPUT_S3_KEY : test_folda
# RETRY_COUNT : 30
# TARGET_DB : test_db
# COPY_OUTPUT_S3_BUCKET : test_bucket
# COPY_OUTPUT_S3_KEY : test_folda2
# COPY_OUTPUT_FILE_NAME : test_filename
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment