Skip to content

Instantly share code, notes, and snippets.

@jomido
Created May 23, 2017 21:56
Show Gist options
  • Save jomido/a4351fcb2eaf3cdbefdad41911ce2476 to your computer and use it in GitHub Desktop.
Save jomido/a4351fcb2eaf3cdbefdad41911ce2476 to your computer and use it in GitHub Desktop.
asyncio gcloud TaskQueue (Python 3.6)
"""
An asynchronous queue for Google Appengine Task Queues
For `auth` and `http_tools` imports, see:
https://gist.github.com/jomido/93940858a803327197314ceae8b31462
"""
import asyncio
import base64
import datetime
import json
import logging
import aiohttp
from auth import Token
from http_tools import post
FORMAT = "[%(asctime)s %(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s"
API_ROOT = 'https://www.googleapis.com/taskqueue/v1beta2/projects'
SCOPES = [
"https://www.googleapis.com/auth/taskqueue",
"https://www.googleapis.com/auth/taskqueue.consumer",
"https://www.googleapis.com/auth/cloud-taskqueue",
"https://www.googleapis.com/auth/cloud-taskqueue.consumer"
]
TASK_QUEUE_URL = '{api_root}/s~{project_name}/taskqueues/{queue_name}/tasks'
log = logging.getLogger(__name__)
logging.basicConfig(format=FORMAT)
def clean_b64encode(payload):
"""
https://en.wikipedia.org/wiki/Base64#URL_applications modified Base64
for URL variants exist, where the + and / characters of standard
Base64 are respectively replaced by - and _
"""
if not isinstance(payload, bytes):
payload = payload.encode('utf-8')
return (
base64.b64encode(payload)
.replace(b"+", b"-")
.replace(b"/", b"_")
.decode('utf-8')
)
def make_insert_body(queue_name: str, payload: dict):
delta = datetime.datetime.now() - datetime.datetime(1970, 1, 1)
micro_sec_since_epock = int(delta.total_seconds() * 1000000)
encoded_payload = clean_b64encode(payload)
return {
"kind": "taskqueues#task",
"queueName": queue_name,
"payloadBase64": encoded_payload,
"enqueueTimestamp": micro_sec_since_epock,
"leaseTimestamp": 0,
"retry_count": 0
}
class TaskQueue(object):
"""
An asynchronous Google Task Queue
"""
def __init__(self, project, service_file, task_queue, session=None):
self.task_queue = task_queue
self.service_file = service_file
self.session = session
self.token = Token(
project,
self.service_file,
session=self.session,
scopes=SCOPES
)
self.url = TASK_QUEUE_URL.format(
api_root=API_ROOT,
project_name=project,
queue_name=task_queue
)
async def insert_task(self, payload, id=None, tag='', session=None):
session = session or self.session
if tag:
payload['tag'] = tag
body = make_insert_body(self.task_queue, payload)
token = await self.token.get()
status, content = await post(
self.url,
body,
session=session,
headers={
'Authorization': 'Bearer {}'.format(token)
}
)
success = status >= 200 and status < 300
if not success:
log.error("NO INSERTO: {}".format(content))
return success
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment