Created
July 14, 2021 14:46
-
-
Save allenmichael/89f1846bf1a3994a432da60115fb27c6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def orchestrator_function(context: df.DurableOrchestrationContext): | |
logging.info('starting orchestrator') | |
# Persistent state stored with a Durable Entity | |
entityId = df.EntityId("TenableExportJob", "currentJobInfo") | |
state = yield context.call_entity(entityId, "get") | |
state = json.loads(state) | |
logging.info('got current state') | |
logging.info(state) | |
exportJobId = '' | |
# Check if this is a first ever run and pull all data | |
if 'isFirstRun' in state and state['isFirstRun']: | |
exportJobId = yield context.call_activity('TenableStartExportJob', 0) | |
context.signal_entity(entityId, "set_is_first_run", False) | |
else: | |
# check for a timestamp from last run report | |
# otherwise try to pull using the time delta supplied when created (default 1 hour) | |
logging.info('Checking for an existing timestamp.') | |
if 'lastRunTimestamp' in state and not state['lastRunTimestamp'] == 0: | |
lrts = state['lastRunTimestamp'] | |
logging.info(f'Found an existing timestamp: {lrts}') | |
exportJobId = yield context.call_activity('TenableStartExportJob', lrts) | |
else: | |
logging.info('Trying a timestamp with a best effort guess at when the integration ran last.') | |
logging.info(f'Current time: {int(context.current_utc_datetime.timestamp())}') | |
bestEffortTimestamp = context.current_utc_datetime - calculate_poll_schedule() | |
logging.info(f'Differential time: {int(bestEffortTimestamp.timestamp())}') | |
exportJobId = yield context.call_activity('TenableStartExportJob', int(bestEffortTimestamp.timestamp())) | |
context.signal_entity(entityId, "set_export_job_id", exportJobId) | |
# wait for job status to change to FINISHED | |
# more to do here | |
jobStatus = yield context.call_activity('TenableCheckExportStatus', exportJobId) | |
if 'status' in jobStatus and jobStatus['status'] == 'FINISHED' and 'chunks_available' in jobStatus: | |
chunks = jobStatus['chunks_available'] | |
context.signal_entity(entityId, "set_chunks", chunks) | |
# process chunks, will parallelize calls to retrieve chunks | |
for chunk in chunks: | |
processedChunk = yield context.call_activity('TenableProcessChunk', { 'exportJobId': exportJobId, 'chunkId': chunk }) | |
logging.info(f'received a chunk to remove from array: {processedChunk}') | |
context.signal_entity(entityId, 'remove_chunk', processedChunk) | |
logging.info('called to entity with remove_chunk') | |
# finished up, set the timestamp as a successful | |
context.signal_entity(entityId, 'set_timestamp', int(context.current_utc_datetime.timestamp())) | |
else: | |
# something went wrong, clear the existing job data and retry | |
# more to do here | |
context.signal_entity(entityId, 'reset') | |
# create a durable timer to rerun on a schedule based on the time delta provided at creation (default 1 hour) | |
timer_setting = calculate_poll_schedule() | |
yield context.create_timer(context.current_utc_datetime + timer_setting) | |
context.continue_as_new(None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment