Skip to content

Instantly share code, notes, and snippets.

@royerk
Last active August 5, 2022 21:20
Show Gist options
  • Save royerk/5cb58b0bf43cd2e8d211bc23eb43bdf9 to your computer and use it in GitHub Desktop.
Save royerk/5cb58b0bf43cd2e8d211bc23eb43bdf9 to your computer and use it in GitHub Desktop.
from airflow.providers.google.common.utils import (
id_token_credentials as id_token_credential_utils,
)
from google.auth.transport.requests import AuthorizedSession
A_GLOBAL_PARAM = 57
def invoke_container(url: str, my_param: str, **kwargs) -> bool:
"""
Invoke a container from Cloud Run or a Cloud Function by its endpoint.
:param url: endpoint
:param my_param: some param needed
:param kwargs:
:return:
"""
params = {
"param_1": A_GLOBAL_PARAM,
"param_2": my_param,
}
request = (
google.auth.transport.requests.Request()
) # get credentials
id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(
url, request=request
)
resp = AuthorizedSession(id_token_credentials).request(
method="GET",
timeout=60*5,
url=url,
params=params
)
if resp.status_code != 200:
raise ValueError(f"{url} returned {resp.status_code}") # Not absolutely necessary
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment