Skip to content

Instantly share code, notes, and snippets.

@allenmichael
Created July 14, 2021 14:46
Show Gist options
  • Save allenmichael/89f1846bf1a3994a432da60115fb27c6 to your computer and use it in GitHub Desktop.
Save allenmichael/89f1846bf1a3994a432da60115fb27c6 to your computer and use it in GitHub Desktop.
def orchestrator_function(context: df.DurableOrchestrationContext):
logging.info('starting orchestrator')
# Persistent state stored with a Durable Entity
entityId = df.EntityId("TenableExportJob", "currentJobInfo")
state = yield context.call_entity(entityId, "get")
state = json.loads(state)
logging.info('got current state')
logging.info(state)
exportJobId = ''
# Check if this is a first ever run and pull all data
if 'isFirstRun' in state and state['isFirstRun']:
exportJobId = yield context.call_activity('TenableStartExportJob', 0)
context.signal_entity(entityId, "set_is_first_run", False)
else:
# check for a timestamp from last run report
# otherwise try to pull using the time delta supplied when created (default 1 hour)
logging.info('Checking for an existing timestamp.')
if 'lastRunTimestamp' in state and not state['lastRunTimestamp'] == 0:
lrts = state['lastRunTimestamp']
logging.info(f'Found an existing timestamp: {lrts}')
exportJobId = yield context.call_activity('TenableStartExportJob', lrts)
else:
logging.info('Trying a timestamp with a best effort guess at when the integration ran last.')
logging.info(f'Current time: {int(context.current_utc_datetime.timestamp())}')
bestEffortTimestamp = context.current_utc_datetime - calculate_poll_schedule()
logging.info(f'Differential time: {int(bestEffortTimestamp.timestamp())}')
exportJobId = yield context.call_activity('TenableStartExportJob', int(bestEffortTimestamp.timestamp()))
context.signal_entity(entityId, "set_export_job_id", exportJobId)
# wait for job status to change to FINISHED
# more to do here
jobStatus = yield context.call_activity('TenableCheckExportStatus', exportJobId)
if 'status' in jobStatus and jobStatus['status'] == 'FINISHED' and 'chunks_available' in jobStatus:
chunks = jobStatus['chunks_available']
context.signal_entity(entityId, "set_chunks", chunks)
# process chunks, will parallelize calls to retrieve chunks
for chunk in chunks:
processedChunk = yield context.call_activity('TenableProcessChunk', { 'exportJobId': exportJobId, 'chunkId': chunk })
logging.info(f'received a chunk to remove from array: {processedChunk}')
context.signal_entity(entityId, 'remove_chunk', processedChunk)
logging.info('called to entity with remove_chunk')
# finished up, set the timestamp as a successful
context.signal_entity(entityId, 'set_timestamp', int(context.current_utc_datetime.timestamp()))
else:
# something went wrong, clear the existing job data and retry
# more to do here
context.signal_entity(entityId, 'reset')
# create a durable timer to rerun on a schedule based on the time delta provided at creation (default 1 hour)
timer_setting = calculate_poll_schedule()
yield context.create_timer(context.current_utc_datetime + timer_setting)
context.continue_as_new(None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment