Skip to content

Instantly share code, notes, and snippets.

@seporaitis
Created February 8, 2019 14:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seporaitis/e0ffd20b690b8e630f452b2a0ad0d09f to your computer and use it in GitHub Desktop.
Save seporaitis/e0ffd20b690b8e630f452b2a0ad0d09f to your computer and use it in GitHub Desktop.
aioboto3 pagination
import asyncio
import os
import aioboto3
loop = asyncio.get_event_loop()
async def test():
async with aioboto3.client('s3', region_name='us-east-1') as s3:
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(
Bucket=os.environ.get('BUCKET_NAME'),
Prefix='',
)
for page in page_iterator:
print(len(page.get('Contents')))
def main():
loop.run_until_complete(test())
if __name__ == '__main__':
main()
Traceback (most recent call last):
File "aioboto3_test.py", line 25, in <module>
main()
File "aioboto3_test.py", line 21, in main
loop.run_until_complete(test())
File "/usr/lib/python3.6/asyncio/base_events.py", line 473, in run_until_complete
return future.result()
File "aioboto3_test.py", line 16, in test
for page in page_iterator:
File "/home/user/venv/lib/python3.6/site-packages/aioboto3/aiobotocore/paginate.py", line 13, in __iter__
raise NotImplementedError
NotImplementedError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment