Skip to content

Instantly share code, notes, and snippets.

@dq-hustlecoding
Created December 2, 2021 09:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dq-hustlecoding/7cf7344017b91f9264196b805063d2ae to your computer and use it in GitHub Desktop.
Save dq-hustlecoding/7cf7344017b91f9264196b805063d2ae to your computer and use it in GitHub Desktop.
load recommendation and clean up for AWS Personalize
# DB에 추천 결과를 ETL
def etl_recommendation(batch_arn: str) -> None:
BATCH_STATUS = ''
while BATCH_STATUS != 'ACTIVE':
print('WAITING BATCH TO DONE... ::', BATCH_STATUS)
time.sleep(10)
BATCH_STATUS = personalize.describe_batch_inference_job(
batchInferenceJobArn=batch_arn )['batchInferenceJob']['status']
s3 = boto3.resource('s3')
output = s3.Object('bucket name', 'path/to/batch_result/batch_input.json.out')
tmp = output.get()['Body'].read().decode('utf-8').split('\n')
result = pd.DataFrame(tmp)[:-1]
# 제품에서 필요한 형태로 재가공해서 데이터베이스에 결과를 serving 합니다.
result.to_sql('result', con="데이터베이스 URI")
# AWS 위에 Personalize 관련 인스턴스를 모두 지워줍니다.
def clean_up() -> None:
DSG_ARN = personalize.list_dataset_groups()['datasetGroups'][0]['datasetGroupArn']
FILTER_LIST = personalize.list_filters( datasetGroupArn=DSG_ARN )['Filters']
SOLUTION_LIST = personalize.list_solutions( datasetGroupArn=DSG_ARN )['solutions']
DS_LIST = personalize.list_datasets( datasetGroupArn=DSG_ARN )['datasets']
for idx, filter in enumerate(FILTER_LIST):
response = personalize.delete_filter(
filterArn=filter['filterArn']
)
print("number ", idx, " FILTER CLEAN UP")
for idx, solution in enumerate(SOLUTION_LIST):
response = personalize.delete_solution(
solutionArn=solution['solutionArn']
)
print("number ", idx, " SOLUTION CLEAN UP")
FILTER_LENGTH = 1
while FILTER_LENGTH > 0:
print("WAIT FILTER TO DELETE... :: ", FILTER_LENGTH)
time.sleep(10)
FILTER_LENGTH = len(personalize.list_filters(datasetGroupArn=DSG_ARN)['Filters'])
for idx, dataset in enumerate(DS_LIST):
response = personalize.delete_dataset(
datasetArn=dataset['datasetArn']
)
print("number ", idx, " DATASET CLEAN UP")
DS_LENGTH = 1
while DS_LENGTH > 0:
print("WAIT DS TO DELETE... :: ", DS_LENGTH)
time.sleep(30)
DS_LENGTH = len(personalize.list_datasets(datasetGroupArn=DSG_ARN)['datasets'])
response = personalize.delete_dataset_group( datasetGroupArn=DSG_ARN )
print("DSG CLEAN UP")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment