Do: pip install -r requirements.txt
Do: python taskqueue.py
You will need a service file. Amend the location in the entry point in taskqueue.py
.
""" | |
Google Cloud auth via service account file | |
""" | |
# stdlib | |
import datetime | |
import threading | |
import time | |
# 3rd party | |
import jwt | |
# internal | |
from methods import post | |
from utils import extract_json_fields, json_read | |
JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer' | |
GCLOUD_TOKEN_DURATION = 3600 | |
MISMATCH = "Project name passed to Token does not match service_file's " \ | |
"project_id." | |
def acquire_token(service_data, scopes): | |
url, assertion = generate_assertion(service_data, scopes) | |
payload = ( | |
('grant_type', JWT_GRANT_TYPE), | |
('assertion', assertion) | |
) | |
status, content = post( | |
url, | |
payload, | |
timeout=60, | |
urlencoded=True, | |
json_response=True | |
) | |
data = extract_json_fields( | |
content, ( | |
('access_token', str), | |
('expires_in', int) | |
) | |
) | |
return data | |
def generate_assertion(service_data, scopes): | |
payload = make_gcloud_oauth_body( | |
service_data['token_uri'], | |
service_data['client_email'], | |
scopes | |
) | |
jwt_token = jwt.encode( | |
payload, | |
service_data['private_key'], | |
algorithm='RS256' # <-- this means we need 240MB in additional | |
# dependencies... | |
) | |
return service_data['token_uri'], jwt_token | |
def make_gcloud_oauth_body(uri, client_email, scopes): | |
now = int(time.time()) | |
return { | |
'aud': uri, | |
'exp': now + GCLOUD_TOKEN_DURATION, | |
'iat': now, | |
'iss': client_email, | |
'scope': ' '.join(scopes), | |
} | |
class Token(object): | |
def __init__(self, project, service_file, scopes=None): | |
self.project = project | |
self.service_data = json_read(service_file) | |
# sanity check | |
assert self.project == self.service_data['project_id'], MISMATCH | |
self.scopes = scopes or [] | |
self.token = None | |
self.expires_at = None | |
self.expire_time = None | |
self.acquire_lock = threading.Lock() | |
def get(self): | |
self.ensure_token() | |
return self.token | |
def ensure_token(self): | |
with self.acquire_lock: | |
renew = False | |
if self.token: | |
now = datetime.datetime.now() | |
remaining = (self.expires_at - now).total_seconds() | |
renew = remaining < self.expire_time / 2 | |
if not self.token or renew: | |
token_data = acquire_token( | |
self.service_data, | |
self.scopes | |
) | |
self.token = token_data['access_token'] | |
self.expire_time = token_data['expires_in'] | |
now = datetime.datetime.now() | |
delta = datetime.timedelta(seconds=self.expire_time) | |
self.expires_at = now + delta | |
def acquire_access_token(self): | |
data = acquire_token( | |
self.service_data, | |
self.scopes | |
) | |
access_token = data['access_token'] | |
expires_in = data['expires_in'] | |
self.access_token = access_token | |
self.access_token_duration = expires_in | |
self.access_token_acquired_at = datetime.datetime.now() | |
self.acquiring = None | |
return True |
import json | |
from functools import partial | |
try: | |
# Python 3 | |
from urllib.parse import urlencode, quote_plus | |
url_encode = partial(urlencode, quote_via=quote_plus) | |
except ImportError: | |
# Python 2 | |
from urllib import urlencode | |
url_encode = urlencode | |
import requests | |
def post(url, payload, timeout=60, urlencoded=False, json_response=True, | |
headers=None): | |
headers = headers or {} | |
if urlencoded: | |
if payload: | |
payload = url_encode(payload) | |
headers.update({ | |
'content-type': 'application/x-www-form-urlencoded' | |
}) | |
else: | |
if payload: | |
payload = json.dumps(payload) | |
payload = payload.encode('utf-8') | |
content_length = str(len(payload)) | |
else: | |
content_length = '0' | |
headers.update({ | |
'content-length': content_length, | |
'content-type': 'application/json' | |
}) | |
response = requests.post( | |
url, | |
data=payload, | |
headers=headers, | |
timeout=timeout | |
) | |
if json_response: | |
content = response.json() | |
else: | |
content = response.text | |
return response.status_code, content | |
def get(url, params=None, headers=None, json_response=True): | |
headers = headers or {} | |
params = params or {} | |
response = requests.get(url, params=params, headers=headers) | |
if json_response: | |
content = response.json() | |
else: | |
content = response.text | |
return response.status_code, content |
cryptography | |
PyJWT | |
requests | |
ujson |
""" | |
An asynchronous queue for Google Appengine Task Queues | |
""" | |
import datetime | |
import logging | |
from auth import Token | |
# internal | |
from methods import post | |
from utils import clean_b64encode | |
import ujson | |
log = logging.getLogger(__name__) | |
log.setLevel(logging.INFO) | |
API_ROOT = 'https://www.googleapis.com/taskqueue/v1beta2/projects' | |
SCOPES = [ | |
"https://www.googleapis.com/auth/taskqueue", | |
"https://www.googleapis.com/auth/taskqueue.consumer", | |
"https://www.googleapis.com/auth/cloud-taskqueue", | |
"https://www.googleapis.com/auth/cloud-taskqueue.consumer" | |
] | |
TASK_QUEUE_URL = '{api_root}/s~{project_name}/taskqueues/{queue_name}/tasks' | |
def make_insert_body(queue_name, payload): | |
delta = datetime.datetime.now() - datetime.datetime(1970, 1, 1) | |
micro_sec_since_epock = int(delta.total_seconds() * 1000000) | |
encoded_payload = clean_b64encode(ujson.dumps(payload)) | |
return { | |
"kind": "taskqueues#task", | |
"queueName": queue_name, | |
"payloadBase64": encoded_payload, | |
"enqueueTimestamp": micro_sec_since_epock, | |
"leaseTimestamp": 0, | |
"retry_count": 0 | |
} | |
class TaskQueue(object): | |
""" | |
A Google Task Queue via requests | |
""" | |
def __init__(self, queue_name, project=None, service_file=None, | |
token=None): | |
if not token: | |
token = Token(project, service_file, scopes=SCOPES) | |
self.project = project | |
self.queue_name = queue_name | |
self.service_file = service_file | |
self.token = token | |
self.url = TASK_QUEUE_URL.format( | |
api_root=API_ROOT, | |
project_name=project, | |
queue_name=queue_name | |
) | |
self.api_root = API_ROOT | |
self.default_header = { | |
'content-length': '0', | |
'accept': 'application/json', | |
'Authorization': '' | |
} | |
def headers(self, override=None): | |
""" | |
return the default headers for any http call | |
""" | |
token = self.token.get() | |
header = {k: v for k, v in self.default_header.items()} | |
header.update(override or {}) | |
header.update({ | |
'Authorization': 'Bearer {}'.format(token) | |
}) | |
return header | |
def insert_task(self, payload, id=None, tag=''): | |
if tag: | |
payload['tag'] = tag | |
body = make_insert_body(self.queue_name, payload) | |
status, content = post( | |
self.url, | |
body, | |
headers=self.headers() | |
) | |
success = status >= 200 and status < 300 | |
if not success: | |
log.error("NO INSERTO: {}".format(content)) | |
return success | |
if __name__ == "__main__": | |
project = 'talkiq-integration' | |
queue_name = 'test-pull' | |
service_file = '/opt/service-integration.json' | |
tq = TaskQueue(queue_name, project=project, service_file=service_file) | |
result = tq.insert_task({ | |
'some_key': 'some_value' | |
}) | |
assert result is True, 'result is {}'.format(result) |
import base64 | |
import json | |
def json_read(file_name): | |
with open(file_name, 'r') as f: | |
data = f.read() | |
return json.loads(data) | |
def extract_json_fields(content, spec): | |
if 'error' in content: | |
raise Exception('{}'.format(content)) | |
return {field: cast(content[field]) for field, cast in spec} | |
def clean_b64encode(payload): | |
if not isinstance(payload, bytes): | |
payload = payload.encode('utf-8') | |
return ( | |
base64.b64encode(payload) | |
.replace(b"+", b"-") | |
.replace(b"/", b"_") | |
.decode('utf-8') | |
) |