Skip to content

Instantly share code, notes, and snippets.

@e-dreyer
Created August 24, 2023 06:19
Show Gist options
  • Save e-dreyer/4f1c83c69ac51ec26d68ef014449d158 to your computer and use it in GitHub Desktop.
Save e-dreyer/4f1c83c69ac51ec26d68ef014449d158 to your computer and use it in GitHub Desktop.
Asyn Iterator for API requests
import time
import random
import asyncio
def randomSleep():
# Blocking sleep function
random_delay = random.uniform(0, 1)
time.sleep(random_delay)
async def randomAsyncSleep():
# Non-blocking sleep function
random_delay = random.uniform(0, 1)
await asyncio.sleep(random_delay)
def getSingleRecord(n):
# Get some record
print(f"Getting record: {n}")
return n
async def getBunchOfRecords(n = None):
# Simulate async API request
TOTAL_NUM_RECORDS = 100
MAX_PER_REQUEST = 10
print(f"Getting records using: {n}")
await randomAsyncSleep()
if not n:
# If empty request, return first records
return [getSingleRecord(x) for x in range (0, MAX_PER_REQUEST)]
else:
# If a previous result is provided, use it to provide the next to simulate pagination
# Get max in list
max_value = max(n)
# If the end has been reached return None
if max_value >= TOTAL_NUM_RECORDS:
return None
# Get the new min and max
request_min = max_value + 1
request_max = min(request_min + MAX_PER_REQUEST, TOTAL_NUM_RECORDS)
# Return the next list of records
return [getSingleRecord(x) for x in range(request_min, request_max)]
async def getAllRecords():
# Get the initial set of records
records = await getBunchOfRecords()
while records:
# Use asyncio.gather to concurrently process the current set of records
processing_tasks = [processRecord(record) for record in records]
await asyncio.gather(*processing_tasks)
# Fetch the next set of records asynchronously
next_records = await getBunchOfRecords(records)
# Yield the current set of records
for record in records:
yield record
records = next_records
async def processRecord(record):
# Simulate async processing of record
print(f"Processing Record: {record}")
await randomAsyncSleep()
async def main():
async for record in getAllRecords():
# Process all records
await processRecord(record)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment