Skip to content

Instantly share code, notes, and snippets.

@janssen
Created February 27, 2016 23:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save janssen/343c86116367c4e94bde to your computer and use it in GitHub Desktop.
Save janssen/343c86116367c4e94bde to your computer and use it in GitHub Desktop.
how to interact with Google OAuth and Google Drive
# Code to upload database files to Google Drive, or download them
import urllib, urllib2, time, socket, errno, functools, urlparse, re, logging, sys, webbrowser, json, uuid, calendar
import email.message, datetime, email.encoders
from collections import namedtuple
# for the pseudo Web service
from kivy.clock import Clock, mainthread
from kivy.network.urlrequest import UrlRequest
from kivy.logger import Logger
AUTHORIZED_PAGE = """
<!DOCTYPE html>
<html>
<head>
<title>%(app_name)s Synchronization with Google Drive</title>
</head>
<body>
<h1>%(app_name)s Synchronization with Google Drive</h1>
<p><b>Authorized.</b></p>
</body>
</html>
"""
NOT_AUTHORIZED_PAGE = """
<!DOCTYPE html>
<html>
<head>
<title>%(app_name)s Synchronization with Google Drive</title>
</head>
<body>
<h1>%(app_name)s Synchronization with Google Drive</h1>
<p><b>Not authorized.</b></p>
</body>
</html>
"""
# need to keep a handle on outstanding requests so that they are
# not garbage-collected
OUTSTANDING_REQUESTS = []
def add_request(r):
global OUTSTANDING_REQUESTS
# first, clean up old requests
to_remove = []
for req in OUTSTANDING_REQUESTS:
if req.is_finished:
to_remove.append(req)
for req in to_remove:
OUTSTANDING_REQUESTS.remove(req)
OUTSTANDING_REQUESTS.append(r)
class GoogleConnection(object):
REDIRECT = re.compile("^[A-Z]* /[^\s]+state=[^\s]+ .*$")
def __init__(self, client_id, client_secret,
app_name="Goodstuff",
port=None, user_gmail_id=None, scopes=None,
access_token=None, refresh_token=None, expires=None, id_token=None,
initialize_callback=None, update_callback=None, debug=False):
self.app_name = app_name # used for talking to the user
self._client_id = client_id # Google-assigned application ID
self._client_secret = client_secret # Google-assigned "secret" for the application
self._access_token = access_token # the Google authorization token
self._expires = expires # when the current authorization token expires
self._refresh_token = refresh_token # the Google token to use to fetch a new authorization token
self._id_token = id_token # not sure what this is
self._initialize_callback = initialize_callback # function to call after obtaining the access token
self._update_callback = update_callback # function to call to update database with tokens
# These elements are used only in the initial handshake with Google
self._socket = None # socket to listen for Google response on
self._port = port # port to listen for Google response on
self._code = None # authorization code which can be traded for an access token
self._state = None # application-provided random string for local authentication
self._user_gmail_id = user_gmail_id # if known, user's gmail address
# which Google 'scopes' we need access to
self._scopes = \
scopes if scopes else ["https://www.googleapis.com/auth/drive.file"]
# whether or not to use debug on the UrlRequest calls
self._debug = debug
# OAuth2 for Google is somewhat complicated. We need to run a Web
# service for Google to send redirects to, so set that up first.
def _check_for_connections(self, *junk):
"""
This is the mandated redirect handler. Check for connection (from
Google). If present, read the first line. If it's a redirect,
read lines till we get the location redirected to."""
global REDIRECT
if self._socket is None:
return
try:
conn, addr = self._socket.accept()
except socket.error as err:
if err.errno in (errno.EWOULDBLOCK, errno.EAGAIN):
# no pending connections
pass
elif err.errno in (errno.EBADF,):
Logger.warning('Google: check_for_connections: attempt to use closed socket %s', self._socket)
else:
Logger.error('Google: check_for_connections: error on self._socket.accept(): %s', err)
raise
else:
if conn:
conn.setblocking(True)
data = bytes("")
state = None
error = None
while True:
newdata = conn.recv(1)
if not newdata: break
data = data + newdata
if data.endswith("\r\n"):
# have a line, check it
if self.REDIRECT.match(data.strip()):
parse = urlparse.urlparse(data.split()[1])
if parse.query:
params = urlparse.parse_qs(parse.query)
state = params.get('state')
code = params.get('code')
error = params.get('error')
if code:
self._code = code[0]
if state:
state = state[0]
if error:
error = error[0]
else:
Logger.debug('Google _check_for_connections: Invalid request received: %s', data.strip())
break
if state == self._state:
# must be our reply
page = AUTHORIZED_PAGE if self._code else NOT_AUTHORIZED_PAGE
page_rendered = page % {'app_name': self.app_name}
conn.send(page_rendered)
Clock.schedule_once(lambda dt: conn.close(), 0.5)
self._socket.close()
self._socket = None
if self._code:
Logger.debug('Google _check_for_connections: Authorization code received.')
self._get_tokens()
else:
Logger.debug('Google _check_for_connections: No authorization code in received URL: %s', data.strip())
if self._initialize_callback:
Clock.schedule_once(lambda dt: self._initialize_callback(error))
else:
# some random scan
conn.close()
if not self._socket:
Clock.unschedule(self._check_for_connections)
def _timeout_auth(self, *args):
if self._socket:
self._socket.close()
self._socket = None
Clock.unschedule(self._check_for_connections)
if self._initialize_callback:
Clock.schedule_once(lambda dt: self._initialize_callback("timed out"))
def _establish_service(self):
"""Open a socket and listen on it. If "port" is not specified,
a random port will be chosen."""
s = socket.socket()
s.bind(('127.0.0.1', self._port if (self._port is not None) else 0))
s.setblocking(False)
self._port = s.getsockname()[1]
s.listen(2)
self._socket = s
@mainthread
def _process_tokens(self, req, response):
Logger.debug("Google _process_tokens: %s", response)
# response should be a json object
self._access_token = response.get("access_token")
rt = response.get("refresh_token")
if rt:
self._refresh_token = rt
self._id_token = response.get("id_token")
self._expires = time.time() + int(response.get("expires_in")) - 60
Logger.debug("Google _process_tokens: _initialize_callback is %s", self._initialize_callback)
if self._initialize_callback:
self._initialize_callback(self)
self._initialize_callback = None
@mainthread
def _process_error(self, req, error):
if self._initialize_callback:
self._initialize_callback(error)
self._initialize_callback = None
@mainthread
def _process_redirect(self, req, error):
if self._initialize_callback:
self._initialize_callback(error)
self._initialize_callback = None
@mainthread
def _process_failure(self, req, error):
if self._initialize_callback:
self._initialize_callback(error)
self._initialize_callback = None
def _get_tokens(self):
Logger.debug("Google _get_tokens: ...")
UrlRequest("https://accounts.google.com/o/oauth2/token",
req_headers={'Content-Type': 'application/x-www-form-urlencoded'},
on_success=self._process_tokens,
on_error=self._process_error,
on_failure=self._process_failure,
on_redirect=self._process_redirect,
req_body="code=%s&"
"client_id=%s&"
"client_secret=%s&"
"redirect_uri=%s&"
"grant_type=authorization_code" % (
self._code, self._client_id, self._client_secret,
"http://localhost:%d" % (self._port,)),
timeout=120,
debug=self._debug)
def _refresh(self):
self._access_token = None
self._expires = 0
self.refresh_request = UrlRequest("https://accounts.google.com/o/oauth2/token",
req_headers={'Content-Type': 'application/x-www-form-urlencoded'},
on_success=self._process_tokens,
on_error=self._process_error,
on_failure=self._process_failure,
on_redirect=self._process_redirect,
req_body="refresh_token=%s&"
"client_id=%s&"
"client_secret=%s&"
"grant_type=refresh_token" % (self._refresh_token, self._client_id, self._client_secret),
timeout=120, debug=self._debug)
def set_update_callback(self, callback):
self._update_callback = callback
def initialize(self, callback=None):
self._initialize_callback = callback
# listen for redirects
self._establish_service()
# time out call after 2 minutes
Clock.schedule_once(self._timeout_auth, 120)
# check for response every tenth of a second
Clock.schedule_interval(self._check_for_connections, 0.1)
# now open a web browser page on a special URL
self._state = uuid.uuid4().hex
d = {"response_type": "code",
"client_id": self._client_id,
"redirect_uri": "http://localhost:%d" % (self._port,),
"scope": " ".join(["profile"] + (self._scopes if self._scopes else [])),
"state": self._state,
"access_type": "offline",
"approval_type": "force"}
if self._user_gmail_id:
d["login_hint"] = self._user_gmail_id
url = "https://accounts.google.com/o/oauth2/auth?" + urllib.urlencode(d)
webbrowser.open(url)
def initialized(self):
return (self._refresh_token is not None)
def authorized(self):
return (self._access_token is not None) and (time.time() < self._expires)
def auth_token(self):
return self._access_token, self._expires
def refresh_token(self):
return self._refresh_token
@mainthread
def _call_callback(self, real_callback, response_type, req, response):
real_callback(response_type, req, response)
def _refresh_callback(self, gconn, actual_callback):
if self._update_callback:
self._update_callback(self)
actual_callback()
def call(self, url, callback, params=None, body=None, headers=None, timeout=None, debug=False, method=None):
if self._expires is None:
raise RuntimeError("GoogleConnection not initialized")
if time.time() > self._expires:
# need to refresh the authorization token
self._initialize_callback=lambda gconn: self._refresh_callback(gconn,
functools.partial(self.call, url, callback, params=params, body=body, headers=headers, timeout=timeout, debug=debug or self._debug))
self._refresh()
return
if headers is None: headers = dict()
headers['Authorization'] = "Bearer " + self._access_token
if params:
url += '?' + urllib.urlencode(params)
elif isinstance(body, dict):
headers['Content-Type'] = 'application/x-www-form-urlencoded'
body = urllib.urlencode(body)
return UrlRequest(url,
req_headers=headers,
on_success=lambda req, response: self._call_callback(callback, 'success', req, response),
on_error=lambda req, error: self._call_callback(callback, 'error', req, error),
on_failure=lambda req, failure: self._call_callback(callback, 'failure', req, failure),
on_redirect=lambda req, result: self._call_callback(callback, 'redirect', req, result),
req_body=body,
method=method or ("GET" if not body else "POST"),
timeout=timeout if timeout else 120,
debug=self._debug)
FileInfo = namedtuple("FileInfo", "title modified id downloadurl")
def put_file(self, callback, metadata, contents, fileid=None):
@mainthread
def _callback(response_type, req, response):
if response_type != "success":
logging.error("Error on request %s: %s, %s", req, response_type, response)
callback(None)
else:
callback(response)
method = "POST"
metadata_part = email.message.Message()
metadata_part.set_payload(json.dumps(metadata))
metadata_part.add_header("Content-Type", "application/json")
content_part = email.message.Message()
content_part.set_payload(contents)
email.encoders.encode_base64(content_part)
content_part.add_header("Content-Type", "application/octet-stream")
if not metadata:
params = {'uploadType': 'media'}
body = content_part
elif not contents:
params = {}
body = metadata_part
else:
params = {'uploadType': 'multipart'}
body = email.message.Message()
body.set_payload([metadata_part, content_part])
body.add_header("Content-Type", "multipart/form-data")
# we express it as a string to get the boundary calculated
bodystring = body.as_string()
# now grab the content-type
content_type = body.get("content-type")
# now trim off the content-type header from that string
pt = bodystring.find("\n\n")
body = bodystring[pt:].lstrip()
# by default, we're going to do a multipart upload
url = "https://www.googleapis.com/upload/drive/v2/files"
if fileid:
# update instead of insert
method="PUT"
if metadata and 'modifiedDate' in metadata:
params['setModifiedDate'] = 'true'
if not metadata:
content_type="application/octet-stream"
body = contents
elif not contents:
url = "https://www.googleapis.com/drive/v2/files"
content_type = "application/json"
body = json.dumps(metadata)
url += "/" + fileid
r = self.call(url, _callback,
headers={"Content-Type": content_type},
method="PUT" if fileid else "POST",
params = params,
body=body,
debug=self._debug,
timeout=120)
#if r: r.wait()
if r: add_request(r)
def list_files(self, callback, query=None):
@mainthread
def _callback(response_type, req, response):
if response_type == "success":
results = {}
for item in response.get("items"):
if item.get('labels', {}).get('trashed'):
continue
name = item.get("title")
if "modifiedDate" in item:
Logger.debug("Google list_files: raw modifiedDate for %s is %s", name, item.get("modifiedDate"))
timestamp = calendar.timegm(datetime.datetime.strptime(
item.get("modifiedDate"), '%Y-%m-%dT%H:%M:%S.%fZ').timetuple())
else:
timestamp = 0.0
Logger.debug("Google list_files: timestamp for %s is %s", name, time.ctime(timestamp))
if ((name in results) and (timestamp > results[name].modified)) or (name not in results):
downloadurl = item.get("downloadUrl")
fileid = item.get("id")
results[name] = self.FileInfo(name, timestamp, fileid, downloadurl)
callback(results)
else:
logging.error("Error obtaining file metadata with %s: %s, %s", req, response_type, response)
callback(None)
params = {}
if query is not None:
params['q'] = query
r = self.call("https://www.googleapis.com/drive/v2/files",
_callback,
params = params,
debug=self._debug,
timeout=120);
#if r: r.wait()
if r: add_request(r)
def get_file_contents(self, fdata, callback):
def _callback(user_callback, fdata, response_type, req, response):
if response_type == "success":
user_callback(fdata, response)
else:
logging.error("Error calling req %s: %s, %s", str(req), response_type, response)
user_callback(fdata, None)
r = self.call(fdata.downloadurl, functools.partial(_callback, callback, fdata))
# if r: r.wait()
if r: add_request(r)
def put_file_contents(self, fdata, fp, callback, description=None):
def _callback(user_callback, fdata, response):
if response:
user_callback(fdata, response.get("downloadUrl"))
else:
user_callback(fdata, None)
if fdata.modified is None:
modifiedDate = datetime.datetime.now()
else:
modifiedDate = datetime.datetime.utcfromtimestamp(fdata.modified)
Logger.debug("Google put_file_contents: timestamp for %s is %s", fdata.title, modifiedDate.ctime())
modifiedDate = modifiedDate.isoformat("T")
# Google requires at least one decimal place in the seconds field
if "." not in modifiedDate:
modifiedDate += ".0"
# mark it as "Zulu" time
modifiedDate += "Z"
if not fdata.id:
r = self.put_file(functools.partial(_callback, callback, fdata), {
'title': fdata.title,
'mimeType': 'application/octet-stream',
'visibility': 'PRIVATE',
'description': description or 'a Kivy database',
'modifiedDate': modifiedDate,
}, fp.read())
# if r: r.wait()
if r: add_request(r)
else:
r = self.put_file(functools.partial(_callback, callback, fdata), {
"modifiedDate": modifiedDate,
}, fp.read(), fileid=fdata.id)
#if r: r.wait()
if r: add_request(r)
if __name__ == "__main__":
from kivy.base import EventLoop
from StringIO import StringIO
logging.basicConfig(level=logging.DEBUG)
g = GoogleConnection(sys.argv[1], sys.argv[2], user_gmail_id=sys.argv[3], debug=True)
EventLoop.start()
if len(sys.argv) > 4:
g._refresh_token = sys.argv[4]
g._expires = 0.0
else:
g.initialize()
print 'dancing...'
while True:
if g.authorized():
print 'authorized'
break
time.sleep(0.01)
Clock.tick()
listing_wait = True
best = None
def metadata_callback(items):
global listing_wait, best
if items:
for item in items.values():
print item.title, item.id, time.ctime(item.modified)
best = item
listing_wait = False
fileid = None
g.list_files(metadata_callback)
while listing_wait:
time.sleep(0.01)
Clock.tick()
upload_wait = True
def upload_callback(f, download_url):
global upload_wait, fileid
print 'upload response:', f, download_url
if download_url:
fileid = f.id
upload_wait = False
if best:
listing_wait = True
def download_callback(fdata, contents):
global listing_wait
print fdata, ":", contents
listing_wait = False
g.get_file_contents(best, download_callback)
while listing_wait:
time.sleep(0.01)
Clock.tick()
fileid = best.id
else:
filecontents = StringIO("some text")
g.put_file_contents(g.FileInfo("goodstuff.test", modified=time.time(), id=None, downloadurl=None),
filecontents,
upload_callback)
while upload_wait:
time.sleep(0.01)
Clock.tick()
upload_wait = True
if fileid:
g.put_file_contents(g.FileInfo("goodstuff.test", modified=time.time(), id=fileid, downloadurl=None),
StringIO("still different contents \x90 \xCA\r\n"),
upload_callback)
while upload_wait:
time.sleep(0.01)
Clock.tick()
#name = raw_input("User ID: ")
#password = raw_input("Password: ")
#print obtain_auth(name, password)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment