Last active
November 14, 2019 07:18
-
-
Save minkezhang/74a3dd77cb56f942a01c9516bbc507f2 to your computer and use it in GitHub Desktop.
A GoogleAPIClient-less API Client, in 50 LOC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Sample getting Google Auth. | |
See https://developers.google.com/identity/protocols/OAuth2ServiceAccount#authorizingrequests | |
""" | |
from typing import Any, Dict, List, Text, Union | |
import datetime | |
import functools | |
import json | |
import jwt # pyjwt | |
import pprint | |
import requests | |
import example # user-maintained constants | |
_JSONType = Union[ | |
Dict[Text, Any], | |
List[Dict[Text, Any]], | |
] | |
# See https://developers.google.com/identity/protocols/googlescopes. | |
_SCOPES = [ | |
'https://www.googleapis.com/auth/firebase.database', | |
'https://www.googleapis.com/auth/userinfo.email', | |
'https://www.googleapis.com/auth/cloud-platform', | |
'https://www.googleapis.com/auth/compute', | |
'https://www.googleapis.com/auth/devstorage.read_write', | |
] | |
@functools.lru_cache(maxsize=128, typed=True) | |
def _GetCredentialsFile(fn: Text) -> _JSONType: | |
with open(fn, 'r') as fp: | |
return json.loads(fp.read()) | |
def _GetEncodedJWT(fn: Text, scopes: List[Text]) -> Text: | |
now = datetime.datetime.utcnow() | |
return jwt.encode( | |
{ | |
'iss': _GetCredentialsFile(fn)['client_email'], | |
'scope': ' '.join(s for s in scopes), | |
'aud': _GetCredentialsFile(fn)['token_uri'], | |
'exp': now + datetime.timedelta(minutes=10), | |
'iat': now - datetime.timedelta(minutes=1), | |
}, | |
_GetCredentialsFile(fn)['private_key'], | |
algorithm='RS256', | |
) | |
def _GetAccessTokenStruct(fn: Text, scopes: List[Text]) -> _JSONType: | |
return requests.post( | |
_GetCredentialsFile(fn)['token_uri'], | |
data={ | |
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', | |
'assertion': _GetEncodedJWT(fn, scopes) | |
}, | |
).json() | |
# Here be demo. | |
def _List(access_token: Text, project: Text, zone: Text) -> _JSONType: | |
"""List all GCE instances for the specified zone.""" | |
return requests.get( | |
f'https://compute.googleapis.com/compute/v1/projects/{project}/zones/' | |
f'{zone}/instances', | |
headers={ | |
'Authorization': f'Bearer {access_token}', | |
} | |
).json() | |
def _Get(access_token: Text, bucket: Text) -> _JSONType: | |
"""Get a GCS bucket info.""" | |
return requests.get( | |
f'https://www.googleapis.com/storage/v1/b/{bucket}', | |
headers={ | |
'Authorization': f'Bearer {access_token}', | |
} | |
).json() | |
def _GetDoc(access_token: Text, project: Text, path: Text) -> _JSONType: | |
"""List Firestore info.""" | |
return requests.get( | |
f'https://firestore.googleapis.com/v1beta1/projects/{project}/' | |
f'databases/(default)/documents/{path}', | |
headers={ | |
'Authorization': f'Bearer {access_token}', | |
} | |
).json() | |
def _Stream(access_token: Text, project: Text, path: Text) -> requests.models.Response: | |
"""Streams Firebase endpoint.""" | |
return requests.get( | |
f'https://{project}.firebaseio.com/{path}.json', | |
stream=True, | |
headers={ | |
'Authorization': f'Bearer {access_token}', | |
'Accept': 'text/event-stream', | |
} | |
) | |
if __name__ == '__main__': | |
overly_permissive_access_token = _GetAccessTokenStruct( | |
example.CREDENTIALS_FILE, _SCOPES)['access_token'] | |
pprint.pprint( | |
_List( | |
access_token=overly_permissive_access_token, | |
project=example.PROJECT, | |
zone=example.ZONE, | |
) | |
) | |
pprint.pprint( | |
_Get( | |
access_token=overly_permissive_access_token, | |
bucket=example.BUCKET, | |
) | |
) | |
pprint.pprint( | |
_GetDoc( | |
access_token=overly_permissive_access_token, | |
project=example.PROJECT, | |
path=example.FIRESTORE_PATH, | |
) | |
) | |
s = _Stream( | |
access_token=overly_permissive_access_token, | |
project=example.PROJECT, | |
path=example.FIREBASE_PATH, | |
) | |
# Get initial data; use an actual SSE library for real-world applications. | |
lines = [] | |
buffer = '' | |
for c in s.iter_content(): | |
if len(lines) > 1: | |
break | |
if c != b'\n': | |
buffer += c.decode('utf-8', 'replace') | |
else: | |
lines.append(buffer) | |
buffer = '' | |
s.close() | |
pprint.pprint(lines) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment