Skip to content

Instantly share code, notes, and snippets.

@jomido
Last active July 6, 2017 20:29
Show Gist options
  • Save jomido/c4e06ad1a22eeca888bfc96fb4505a23 to your computer and use it in GitHub Desktop.
Save jomido/c4e06ad1a22eeca888bfc96fb4505a23 to your computer and use it in GitHub Desktop.
Requests Google TaskQueue Insert (& auth via service file)

Do: pip install -r requirements.txt Do: python taskqueue.py

You will need a service file. Amend the location in the entry point in taskqueue.py.

"""
Google Cloud auth via service account file
"""
# stdlib
import datetime
import threading
import time
# 3rd party
import jwt
# internal
from methods import post
from utils import extract_json_fields, json_read
JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer'
GCLOUD_TOKEN_DURATION = 3600
MISMATCH = "Project name passed to Token does not match service_file's " \
"project_id."
def acquire_token(service_data, scopes):
url, assertion = generate_assertion(service_data, scopes)
payload = (
('grant_type', JWT_GRANT_TYPE),
('assertion', assertion)
)
status, content = post(
url,
payload,
timeout=60,
urlencoded=True,
json_response=True
)
data = extract_json_fields(
content, (
('access_token', str),
('expires_in', int)
)
)
return data
def generate_assertion(service_data, scopes):
payload = make_gcloud_oauth_body(
service_data['token_uri'],
service_data['client_email'],
scopes
)
jwt_token = jwt.encode(
payload,
service_data['private_key'],
algorithm='RS256' # <-- this means we need 240MB in additional
# dependencies...
)
return service_data['token_uri'], jwt_token
def make_gcloud_oauth_body(uri, client_email, scopes):
now = int(time.time())
return {
'aud': uri,
'exp': now + GCLOUD_TOKEN_DURATION,
'iat': now,
'iss': client_email,
'scope': ' '.join(scopes),
}
class Token(object):
def __init__(self, project, service_file, scopes=None):
self.project = project
self.service_data = json_read(service_file)
# sanity check
assert self.project == self.service_data['project_id'], MISMATCH
self.scopes = scopes or []
self.token = None
self.expires_at = None
self.expire_time = None
self.acquire_lock = threading.Lock()
def get(self):
self.ensure_token()
return self.token
def ensure_token(self):
with self.acquire_lock:
renew = False
if self.token:
now = datetime.datetime.now()
remaining = (self.expires_at - now).total_seconds()
renew = remaining < self.expire_time / 2
if not self.token or renew:
token_data = acquire_token(
self.service_data,
self.scopes
)
self.token = token_data['access_token']
self.expire_time = token_data['expires_in']
now = datetime.datetime.now()
delta = datetime.timedelta(seconds=self.expire_time)
self.expires_at = now + delta
def acquire_access_token(self):
data = acquire_token(
self.service_data,
self.scopes
)
access_token = data['access_token']
expires_in = data['expires_in']
self.access_token = access_token
self.access_token_duration = expires_in
self.access_token_acquired_at = datetime.datetime.now()
self.acquiring = None
return True
import json
from functools import partial
try:
# Python 3
from urllib.parse import urlencode, quote_plus
url_encode = partial(urlencode, quote_via=quote_plus)
except ImportError:
# Python 2
from urllib import urlencode
url_encode = urlencode
import requests
def post(url, payload, timeout=60, urlencoded=False, json_response=True,
headers=None):
headers = headers or {}
if urlencoded:
if payload:
payload = url_encode(payload)
headers.update({
'content-type': 'application/x-www-form-urlencoded'
})
else:
if payload:
payload = json.dumps(payload)
payload = payload.encode('utf-8')
content_length = str(len(payload))
else:
content_length = '0'
headers.update({
'content-length': content_length,
'content-type': 'application/json'
})
response = requests.post(
url,
data=payload,
headers=headers,
timeout=timeout
)
if json_response:
content = response.json()
else:
content = response.text
return response.status_code, content
def get(url, params=None, headers=None, json_response=True):
headers = headers or {}
params = params or {}
response = requests.get(url, params=params, headers=headers)
if json_response:
content = response.json()
else:
content = response.text
return response.status_code, content
cryptography
PyJWT
requests
ujson
"""
An asynchronous queue for Google Appengine Task Queues
"""
import datetime
import logging
from auth import Token
# internal
from methods import post
from utils import clean_b64encode
import ujson
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
API_ROOT = 'https://www.googleapis.com/taskqueue/v1beta2/projects'
SCOPES = [
"https://www.googleapis.com/auth/taskqueue",
"https://www.googleapis.com/auth/taskqueue.consumer",
"https://www.googleapis.com/auth/cloud-taskqueue",
"https://www.googleapis.com/auth/cloud-taskqueue.consumer"
]
TASK_QUEUE_URL = '{api_root}/s~{project_name}/taskqueues/{queue_name}/tasks'
def make_insert_body(queue_name, payload):
delta = datetime.datetime.now() - datetime.datetime(1970, 1, 1)
micro_sec_since_epock = int(delta.total_seconds() * 1000000)
encoded_payload = clean_b64encode(ujson.dumps(payload))
return {
"kind": "taskqueues#task",
"queueName": queue_name,
"payloadBase64": encoded_payload,
"enqueueTimestamp": micro_sec_since_epock,
"leaseTimestamp": 0,
"retry_count": 0
}
class TaskQueue(object):
"""
A Google Task Queue via requests
"""
def __init__(self, queue_name, project=None, service_file=None,
token=None):
if not token:
token = Token(project, service_file, scopes=SCOPES)
self.project = project
self.queue_name = queue_name
self.service_file = service_file
self.token = token
self.url = TASK_QUEUE_URL.format(
api_root=API_ROOT,
project_name=project,
queue_name=queue_name
)
self.api_root = API_ROOT
self.default_header = {
'content-length': '0',
'accept': 'application/json',
'Authorization': ''
}
def headers(self, override=None):
"""
return the default headers for any http call
"""
token = self.token.get()
header = {k: v for k, v in self.default_header.items()}
header.update(override or {})
header.update({
'Authorization': 'Bearer {}'.format(token)
})
return header
def insert_task(self, payload, id=None, tag=''):
if tag:
payload['tag'] = tag
body = make_insert_body(self.queue_name, payload)
status, content = post(
self.url,
body,
headers=self.headers()
)
success = status >= 200 and status < 300
if not success:
log.error("NO INSERTO: {}".format(content))
return success
if __name__ == "__main__":
project = 'talkiq-integration'
queue_name = 'test-pull'
service_file = '/opt/service-integration.json'
tq = TaskQueue(queue_name, project=project, service_file=service_file)
result = tq.insert_task({
'some_key': 'some_value'
})
assert result is True, 'result is {}'.format(result)
import base64
import json
def json_read(file_name):
with open(file_name, 'r') as f:
data = f.read()
return json.loads(data)
def extract_json_fields(content, spec):
if 'error' in content:
raise Exception('{}'.format(content))
return {field: cast(content[field]) for field, cast in spec}
def clean_b64encode(payload):
if not isinstance(payload, bytes):
payload = payload.encode('utf-8')
return (
base64.b64encode(payload)
.replace(b"+", b"-")
.replace(b"/", b"_")
.decode('utf-8')
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment