Skip to content

Instantly share code, notes, and snippets.

@telenieko
Created September 29, 2017 22:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save telenieko/8050c92849414b3f42d0d090de1be033 to your computer and use it in GitHub Desktop.
Save telenieko/8050c92849414b3f42d0d090de1be033 to your computer and use it in GitHub Desktop.
Script to do maintenance on Google Drive, example of gevent, google drive sdk, rate limiting, automatic retries, ...
# -*- coding: utf-8 -*-
# pylint: disable=missing-docstring,wrong-import-position,invalid-name,len-as-condition
# pylint: disable=superfluous-parens,wrong-import-order,ungrouped-imports,unsubscriptable-object
""" gdrive_cleanup.py <user@domain.com> [start folder_id]
License: This code is release to the Public Domain under "CC0 1.0 Universal" License
Author: Marc Fargas <telenieko@telenieko.com>
This script is one we use to do some maintenance on user's Google Drive folders.
Things it does:
- Find duplicate files owned by the same user and "merge" them
(put one of them on the same parents as the others and delete them)
- Find similarly named folders next to each other and merge its contents
(ie: after failed syncs because of nice non-ascii characters with wrong
enoding)
- Find files in 'root' that are also in other folders and "unroot" them.
This is not a one size-fits-all thing, it is published to the Public Domain
for reference and example for anyone looking for example code on how to work
with Google Drive SDK.
This code:
- Is written with gevent in order to go as fast as possible (ie: we
keep parsing trees while waiting to receive other trees).
- Calls to the API are wrapped by the rate_limited decorator which
tries to minimize how many 'rate limit exceeded' you get.
- Calls to the API are wrapped by an automatic retrier to simplify
how to manage errors (you will get 'rate limit exceeded' no matter how
good your rate limit is).
- httplib2shim is used to avoid httplib2 thread safety issues (the code
was originally written with threads before using gevent) it might work
with normal httplib2...
- gevents are grouped inside GROUP so we can easily wait when needed
(see calls to GROUP.join())
- I tried to be nice with pylint.
This code is dated September 2017 using Google Drive v3 API & Python 3.5
Details:
This tool uses a Service Account to impersonate the users on Google G Suite,
if you got here you're likely to know what that means or have to means to find
out and set it up. If you can't ... well, then maybe you should not attempt to
use this code!!
"""
# pip install attrs gevent google-api-python httplib2shim retry
from gevent import monkey; monkey.patch_all() # pylint: disable=multiple-statements
import logging
import os
import re
import string
import sys
from timeit import default_timer
import apiclient.discovery # pylint: disable=import-error
import apiclient.errors # pylint: disable=import-error
import apiclient.http # pylint: disable=import-error
import attr
import gevent
import httplib2shim
from gevent.pool import Group
from oauth2client.service_account import ServiceAccountCredentials
from retry import retry
from functools import wraps
from typing import List, Dict, Any # pylint: disable=unused-import
BASE = os.path.dirname(__file__)
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger('gdrive')
GROUP = Group() #gevent group.
### CONFIGS:
NOOP = False # If TRUE then dry-run (do not change anything on Drive)
KEYFILE = os.path.join(BASE, "google-service-account-key.json")
logger.setLevel(logging.INFO)
def is_same_title(one, two):
o = re.sub(r'[\W_]+', one, string.printable)
t = re.sub(r'[\W_]+', two, string.printable)
return o.lower() == t.lower()
### END CONFIGS.
def gevent_throttle(calls_per_sec=0):
"""Decorates a Greenlet function for throttling.
https://gist.github.com/p7k/4238388
"""
interval = 1. / calls_per_sec if calls_per_sec else 0
def decorate(func):
blocked = [False] # has to be a list to not get localised inside the while loop
# otherwise, UnboundLocalError: local variable 'blocked' referenced before assignment
last_time = [0] # ditto
@wraps(func) # propagates docstring
def throttled_func(*args, **kwargs):
while True:
# give other greenlets a chance to run, otherwise we
# might get stuck while working thread is sleeping and the block is ON
gevent.sleep(0)
if not blocked[0]:
blocked[0] = True
# check if actually might need to pause
if calls_per_sec:
last, current = last_time[0], default_timer()
elapsed = current - last
if elapsed < interval:
gevent.sleep(interval - elapsed)
last_time[0] = default_timer()
blocked[0] = False
return func(*args, **kwargs)
return throttled_func
return decorate
rate_limited = gevent_throttle(1000/100)
def spawn(func):
@wraps(func)
def with_greenlet(*args, **kwargs):
g = gevent.spawn(func, *args, **kwargs)
GROUP.add(g)
return g
return with_greenlet
@attr.s(repr=False)
class DriveItem(object):
""" Simple representation of a Google Drive item. """
# The original item data as received from Google.
source = attr.ib() # type: Dict[str, Any]
parent = attr.ib() # The DriveItem instance parent of this.
owned_by_me = attr.ib() # Does the current user own this item?
# List of children items (aka: child.parent == this)
children = attr.ib(default=attr.Factory(list), repr=False) # type: List[DriveItem]
def __repr__(self):
return u"%s %s" % (self.source['id'], self.path())
def path(self):
""" Return the full path as a string. """
if self.parent:
return self.parent.path() + '/' + self.source['name']
return '/' + self.source['name']
def is_folder(self):
""" Am I a folder? """
return self.source['mimeType'] == 'application/vnd.google-apps.folder'
@retry(exceptions=apiclient.errors.HttpError, delay=5, jitter=(1, 15), max_delay=20)
def get_parents(self, service):
return service.files().get(fileId=self.source['id'],
fields='parents').execute()
@spawn
@retry(exceptions=apiclient.errors.HttpError, delay=1, jitter=(1, 5), max_delay=5)
@rate_limited
def trash(self, service):
""" Send me to the trash. """
logger.info("Trash: %s", self)
if not NOOP:
service.files().update(fileId=self.source['id'],
body={'trashed': True}).execute()
self.parent.children.remove(self)
self.parent = None
@spawn
@retry(exceptions=apiclient.errors.HttpError, delay=1, jitter=(1, 5), max_delay=5)
@rate_limited
def change_parent(self, service, new_parent, remove_parent=None):
""" Add the new_parent as a parent of self,
if remove_parent is not None we remove it too.
"""
remove_parent_id = None
if not remove_parent:
logger.info("AddParent: %s \n\tTo: %s", self, new_parent)
else:
remove_parent_id = remove_parent.source['id']
logger.info("ChangeParent: %s \n\tFrom: %s\n\tTo: %s",
self, remove_parent, new_parent)
if new_parent.source['id'] == remove_parent_id:
return
if not NOOP:
service.files().update(fileId=self.source['id'],
addParents=new_parent.source['id'],
removeParents=remove_parent_id,
fields='id, parents').execute()
self.parent = new_parent
if remove_parent:
remove_parent.children.remove(self)
new_parent.children.append(self)
def die_for(self, service, stays):
""" Kill Mr. self and add his parent to stays.
Presumably they're the same guy. """
if stays.source['id'] == self.source['id']:
return # They ARE the same.
if stays.source['md5Checksum'] != self.source['md5Checksum']:
logger.warning("md5Checksums not equal: \n\t'%s' \n\t'%s'",
stays.path(), self.path())
return
stays.change_parent(service, self.parent)
self.trash(service)
self.parent.children.append(stays)
self.parent.children.remove(self)
def merge_to(self, service, stays):
""" Reparent all children of source to stays. """
if stays.source['id'] == self.source['id']:
return # They ARE the same.
if not stays.is_folder():
logger.warning("Got a non-folder in merge_to stays: %s", stays)
return
if not self.is_folder():
logger.warning("Got a non-folder in merge_to source: %s", self)
return
for ch in self.children: # pylint: disable=not-an-iterable
ch.change_parent(service, stays, remove_parent=self)
self.trash(service)
@attr.s
class KnownMD5(object):
ALL_KNOWN = []
md5 = attr.ib(cmp=True)
known = attr.ib(default=attr.Factory(list), cmp=False)
def __str__(self):
return self.md5
@staticmethod
def add(md5, item):
found = False
for i in KnownMD5.ALL_KNOWN:
if i.md5 == md5:
i.known += [item]
found = True
break
if not found:
k = KnownMD5(md5=md5)
k.known = [item, ]
KnownMD5.ALL_KNOWN += [k]
def unify(self, service):
logger.debug("KnownMD5.unify(%s)", self.md5)
only_mine = filter(lambda x: x.owned_by_me, self.known)
longest = sorted(only_mine, key=lambda x: x.source['name'].split(' '), reverse=True)
if len(longest) == 0:
return
stays = longest[0]
logger.info("%s Stays", stays)
for d in longest[1:]:
if d.source['id'] == stays.source['id']:
continue
if d.parent.source['id'] != stays.parent.source['id']:
stays.change_parent(service, d.parent)
logger.info("%s is dupe of %s (%s vs %s)", d.path(), stays.path(),
d.source['id'], stays.source['id'])
d.trash(service)
GROUP.join()
def get_service_creds(email, scope):
""" Get Credentials for a given user using the ServiceAccountCredentials. """
main_credentials = ServiceAccountCredentials.from_json_keyfile_name(KEYFILE, scopes=scope)
credentials = main_credentials.create_delegated(email)
http = credentials.authorize(httplib2shim.Http())
credentials.refresh(http)
return http
def get_drive_service(email):
""" Get a service instance for Google Drive API v3. """
scope = 'https://www.googleapis.com/auth/drive'
http = get_service_creds(email, scope=scope)
drive_service = apiclient.discovery.build('drive', 'v3', http=http)
return drive_service
def gdrive_am_i_owner(item):
""" Is the currently authenticated user the (or an) owner of the item? """
return bool(item['ownedByMe'])
class SlowDown(Exception):
pass
@retry(exceptions=SlowDown, delay=5, jitter=(1, 15), max_delay=20)
@rate_limited
def get_list(service, q, page_token, fields=None):
if not fields:
fields = ('nextPageToken, files(id, name, mimeType, ownedByMe, '
'kind, md5Checksum, modifiedTime)')
try:
return service.files().list(q=q, pageToken=page_token, fields=fields).execute()
except apiclient.errors.HttpError as err:
if err.resp.status in [403, 503]:
raise SlowDown()
else:
raise
@spawn
def gdrive_build_tree(service, this, level=0):
""" Build a tree of DriveItem instances,
pad is a padding for print calls.
this is the current DriveItem instance.
service is a service from get_drive_service().
"""
folder_id = this.source['id']
logger.info('%s: Gathering item listings for %s...',
this.path(), folder_id)
q = '\'{}\' in parents and not trashed'.format(folder_id)
page_token = None
while True:
try:
param = {}
if page_token:
param['pageToken'] = page_token
children = get_list(service, q, page_token)
for item in children.get('files', []):
sub = DriveItem(source=item, parent=this, owned_by_me=gdrive_am_i_owner(item))
this.children.append(sub)
if sub.is_folder():
logger.debug(u'%s Subdir Tree: %s (owned=%s)',
this.path(), item['name'], sub.owned_by_me)
if sub.owned_by_me:
gdrive_build_tree(service, sub, level+1)
if item['kind'] == 'drive#file':
logger.debug(u'%s File: (%s)', sub.path(), item['id'])
page_token = children.get('nextPageToken')
if not page_token:
break
except apiclient.errors.HttpError as e:
logger.exception('An error occurred: %s', e)
break
def merge_folders(service, tree):
""" Check the children of a folder for the ones with same name.
We then merge those.
"""
for ch in tree.children:
if not ch.is_folder():
continue
aname = ch.source['name']
for ch2 in tree.children:
if ch == ch2:
continue
if not ch2.is_folder():
continue
if not ch.owned_by_me or not ch2.owned_by_me:
continue
bname = ch2.source['name']
if is_same_title(aname, bname):
logger.info("%s, Merging: %s and %s", tree.path(), aname, bname)
ch2.merge_to(service, ch)
GROUP.join()
merge_folders(service, ch)
def find_known_md5(tree):
""" Build the list of known MD5 sums. """
for ch in tree.children:
if ch.is_folder():
find_known_md5(ch)
md5 = ch.source.get('md5Checksum', None)
if md5:
KnownMD5.add(md5, ch)
def clean_up_root(service, root):
""" Find files in root that have other parents,
then we remove from root. """
@spawn
@retry(exceptions=apiclient.errors.HttpError, delay=1, jitter=(1, 5), max_delay=5)
@rate_limited
def remove_parent(service, item_id, parent_id):
logger.info("Taking %s out of %s", item_id, parent_id)
if not NOOP:
service.files().update(fileId=item_id,
removeParents=parent_id,
fields='id').execute()
folder_id = root.source['id']
q = '\'{}\' in parents and not trashed'.format(folder_id)
fields = 'nextPageToken, files(id, parents)'
page_token = None
while True:
try:
param = {}
if page_token:
param['pageToken'] = page_token
children = get_list(service, q, page_token, fields=fields)
for item in children.get('files', []):
parents = item['parents']
if len(parents) > 1:
remove_parent(service, item['id'], folder_id)
page_token = children.get('nextPageToken')
if not page_token:
break
except apiclient.errors.HttpError as e:
logger.exception('An error occurred: %s', e)
break
if __name__ == '__main__':
usuario = sys.argv[1]
try:
begin_at = sys.argv[2]
except IndexError:
begin_at = 'root'
groot = None
svc = get_drive_service(usuario)
if not groot:
logger.info("Getting Google Drive listing for %s", usuario)
root_item = svc.files().get(fileId=begin_at).execute() # pylint: disable=no-member
groot = DriveItem(source=root_item, parent=None, owned_by_me=True)
gdrive_build_tree(svc, groot)
GROUP.join()
merge_folders(svc, groot)
find_known_md5(groot)
for md5sum in KnownMD5.ALL_KNOWN:
if len(md5sum.known) > 1:
uniques = set([x.source['id'] for x in md5sum.known])
logger.info("%s has %d matches (%d unique)", md5sum,
len(md5sum.known), len(uniques))
md5sum.unify(svc)
clean_up_root(svc, groot)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment