Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
AWS Step Functions Activity 예제
import boto3
import json
client = boto3.client('stepfunctions')
# Activity가 수행되기를 기다린다.
response = client.get_activity_task(
activityArn='Activity의 ARN',
workerName='TestWorker'
)
# Task Token과 입력 값을 얻어온다.
task_token = response['taskToken']
activity_input = json.loads(response['input'])
# 수행하고 싶은 내용을 자유롭게 적는다.
if activity_input['in'] == 'SUCCESS':
# 성공 시 전달할 내용
response = client.send_task_success(
taskToken=task_token,
output=json.dumps({ 'result': activity_input })
)
else:
# 실패 시 전달할 내용
response = client.send_task_failure(
taskToken=task_token,
error='NO_SUCCESS_INPUT',
cause='Input is not "SUCCESS"'
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment