Created
May 23, 2017 21:56
-
-
Save jomido/a4351fcb2eaf3cdbefdad41911ce2476 to your computer and use it in GitHub Desktop.
asyncio gcloud TaskQueue (Python 3.6)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
An asynchronous queue for Google Appengine Task Queues | |
For `auth` and `http_tools` imports, see: | |
https://gist.github.com/jomido/93940858a803327197314ceae8b31462 | |
""" | |
import asyncio | |
import base64 | |
import datetime | |
import json | |
import logging | |
import aiohttp | |
from auth import Token | |
from http_tools import post | |
FORMAT = "[%(asctime)s %(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s" | |
API_ROOT = 'https://www.googleapis.com/taskqueue/v1beta2/projects' | |
SCOPES = [ | |
"https://www.googleapis.com/auth/taskqueue", | |
"https://www.googleapis.com/auth/taskqueue.consumer", | |
"https://www.googleapis.com/auth/cloud-taskqueue", | |
"https://www.googleapis.com/auth/cloud-taskqueue.consumer" | |
] | |
TASK_QUEUE_URL = '{api_root}/s~{project_name}/taskqueues/{queue_name}/tasks' | |
log = logging.getLogger(__name__) | |
logging.basicConfig(format=FORMAT) | |
def clean_b64encode(payload): | |
""" | |
https://en.wikipedia.org/wiki/Base64#URL_applications modified Base64 | |
for URL variants exist, where the + and / characters of standard | |
Base64 are respectively replaced by - and _ | |
""" | |
if not isinstance(payload, bytes): | |
payload = payload.encode('utf-8') | |
return ( | |
base64.b64encode(payload) | |
.replace(b"+", b"-") | |
.replace(b"/", b"_") | |
.decode('utf-8') | |
) | |
def make_insert_body(queue_name: str, payload: dict): | |
delta = datetime.datetime.now() - datetime.datetime(1970, 1, 1) | |
micro_sec_since_epock = int(delta.total_seconds() * 1000000) | |
encoded_payload = clean_b64encode(payload) | |
return { | |
"kind": "taskqueues#task", | |
"queueName": queue_name, | |
"payloadBase64": encoded_payload, | |
"enqueueTimestamp": micro_sec_since_epock, | |
"leaseTimestamp": 0, | |
"retry_count": 0 | |
} | |
class TaskQueue(object): | |
""" | |
An asynchronous Google Task Queue | |
""" | |
def __init__(self, project, service_file, task_queue, session=None): | |
self.task_queue = task_queue | |
self.service_file = service_file | |
self.session = session | |
self.token = Token( | |
project, | |
self.service_file, | |
session=self.session, | |
scopes=SCOPES | |
) | |
self.url = TASK_QUEUE_URL.format( | |
api_root=API_ROOT, | |
project_name=project, | |
queue_name=task_queue | |
) | |
async def insert_task(self, payload, id=None, tag='', session=None): | |
session = session or self.session | |
if tag: | |
payload['tag'] = tag | |
body = make_insert_body(self.task_queue, payload) | |
token = await self.token.get() | |
status, content = await post( | |
self.url, | |
body, | |
session=session, | |
headers={ | |
'Authorization': 'Bearer {}'.format(token) | |
} | |
) | |
success = status >= 200 and status < 300 | |
if not success: | |
log.error("NO INSERTO: {}".format(content)) | |
return success |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment