Skip to content

Instantly share code, notes, and snippets.

@thehesiod
Created October 24, 2019 16:55
Show Gist options
  • Save thehesiod/05298c1c89c7b5a38da0abc4ccbed7b7 to your computer and use it in GitHub Desktop.
Save thehesiod/05298c1c89c7b5a38da0abc4ccbed7b7 to your computer and use it in GitHub Desktop.
import asyncio
import os
import time
import logging
from multiprocessing import Process, Queue
from queue import Empty
import botocore.session
from botocore.credentials import Credentials, CredentialResolver, CredentialProvider, AssumeRoleProvider
_original_create_credential_resolver = None
_current_credentials = None
_credential_queue = Queue(maxsize=1)
_credential_acquisition_process = None
_async_refresh_task = None
def _background_botocore_credential_acquisition() -> None:
"""
This loop gets the credentials from the default botocore credentials provider and passes them back to the main
process - botocore itself ensure it will refresh before the credentials are actually expired. We check at a higher
frequency than the window it uses to decide when we're too close to a token expirys
:return:
"""
global _current_credentials
session = botocore.session.get_session()
while True:
x = session.get_credentials()
credentials = Credentials(x.access_key, x.secret_key, token=x.token, method=x.method)
if _current_credentials != credentials:
_current_credentials = credentials
_credential_queue.put(credentials)
time.sleep(AssumeRoleProvider.EXPIRY_WINDOW_SECONDS / 5)
async def _credential_refresher() -> None:
"""
Checks the multiprocessing queue to see if the background process has provided a refreshed AWS credential. Must
use the non-blocking form to prevent this from taking CPU away from real work in the main process.
:return:
"""
global _current_credentials
while True:
try:
_current_credentials = _credential_queue.get(block=False)
except Empty:
pass
await asyncio.sleep(AssumeRoleProvider.EXPIRY_WINDOW_SECONDS / 5)
def start_background_botocore_credential_refresh() -> None:
"""
Starts a background process that will retrieve the real AWS credentials using the normal botocore credential
providers, and monkey patches the current process' credential provider to use the credentials provided.
This hack is needed to work around aiobotocore's lack of async credential provider support
:return:
"""
aws_profile = os.environ.get('AWS_PROFILE', None)
if aws_profile:
logging.getLogger().info('aws profile {} specified, starting aiobotocore credential refresh'.format(aws_profile))
else:
logging.getLogger().info('aws profile not specified, no need to start aiobotocore credential refresh')
return
global _credential_acquisition_process
global _current_credentials
global _async_refresh_task
_credential_acquisition_process = Process(target=_background_botocore_credential_acquisition, daemon=True)
_credential_acquisition_process.start()
# We block synchronously so we can't leave this function without getting the first set of credentials
_current_credentials = _credential_queue.get(block=True)
# Replace the default credential resolver with one patched to return the asyncronously refreshed credentials
_original_create_credential_resolver = botocore.credentials.create_credential_resolver
def patched_create_credential_resolver(session, cache=None):
class BackgroundProvider(CredentialProvider):
def load(self):
return _current_credentials
return CredentialResolver(providers=[BackgroundProvider()])
botocore.credentials.create_credential_resolver = patched_create_credential_resolver
# Start the background task that will keep
_async_refresh_task = asyncio.ensure_future(_credential_refresher())
@jungrae-prestolabs
Copy link

jungrae-prestolabs commented Dec 20, 2019

In My Case.

def start_background_botocore_credential_refresh() -> None:
    """
    Starts a background process that will retrieve the real AWS credentials using the normal botocore credential
    providers, and monkey patches the current process' credential provider to use the credentials provided.
    This hack is needed to work around aiobotocore's lack of async credential provider support
    :return:
    """

    global _credential_acquisition_process
    global _current_credentials
    global _async_refresh_task

    _credential_acquisition_process = Process(target=_background_botocore_credential_acquisition, daemon=True)
    _credential_acquisition_process.start()

    # We block synchronously so we can't leave this function without getting the first set of credentials
    _current_credentials = _credential_queue.get(block=True)

    # Replace the default credential resolver with one patched to return the asyncronously refreshed credentials
    _original_create_credential_resolver = botocore.credentials.create_credential_resolver

    def patched_create_credential_resolver(session, cache=None, region_name=None):
        class BackgroundProvider(CredentialProvider):
            def load(self):
                return _current_credentials

        return CredentialResolver(providers=[BackgroundProvider()])

    botocore.credentials.create_credential_resolver = patched_create_credential_resolver

    # Start the background task that will keep
    _async_refresh_task = asyncio.ensure_future(_credential_refresher())

Anyway, Thanks a lot giving monkey patch code.
We can use it with EKS assume role issue now.

botocore: 1.13.14
aiobotocore: 0.11.0

@thehesiod
Copy link
Author

credits to @alexmac for orig impl :)

@rrrix
Copy link

rrrix commented Mar 6, 2020

Hi @thehesiod,

Thank you for all of your contributions to aioboto3/aiobotocore! I came across the same problems that are now well documented in aiobotocore #619.

Can you advise on how to use the botocore_credential_refresher.py module? An example snippet?

This gets stuck on line 79 from above, essentially waiting (blocking) on an empty queue, which seems to never get the credentials put there.

_current_credentials = _credential_queue.get(block=True)

My test code:

import os
import logging
import asyncio
import aiobotocore
import botocore_assume_role

os.environ['AWS_PROFILE'] = 'dev1'
logging.basicConfig(level=logging.DEBUG)


async def main():
    botocore_assume_role.start_background_botocore_credential_refresh()  ## <<--- hangs forever here
    async with aiobotocore.get_session().create_client('sts') as client:
        response = await client.get_caller_identity()
        print(response)


if __name__ == '__main__':
    asyncio.run(main(), debug=True)

Thanks in advance!

@thehesiod
Copy link
Author

thehesiod commented Mar 11, 2020

@rrrix
most likely the helper process is dying/stuck. I suggest debugging it to see what's going on, or add try/except/printfs in there. IOW _background_botocore_credential_acquisition is dying. NOTE this isn't prod-ready code, there's a lot of prod'ification needed, logging, except handlers, etc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment