Skip to content

Instantly share code, notes, and snippets.

@callistabee
Created January 23, 2019 22:00
Show Gist options
  • Save callistabee/36e703a7a5d6e7eb26618f90a23e4f41 to your computer and use it in GitHub Desktop.
Save callistabee/36e703a7a5d6e7eb26618f90a23e4f41 to your computer and use it in GitHub Desktop.
super simple utility for interacting with google drive api
from google.oauth2 import service_account
from googleapiclient.discovery import build
import argparse
class Drive:
def __init__(self, cred_path):
credentials = (
service_account
.Credentials
.from_service_account_file(
cred_path,
scopes=["https://www.googleapis.com/auth/drive"]
)
)
self.drive = build("drive", "v3", credentials=credentials)
def listdir(self, folder):
try:
folder_id = (
self
.drive
.files()
.list(q="name='%s'" % folder)
.execute()
)['files'][0]['id']
except IndexError:
raise KeyError("folder '%s' not found" % folder)
results = (
self
.drive
.files()
.list(q="'%s' in parents" % folder_id)
.execute()
)
return [
(f['name'], f['id'])
for f in results['files']
]
def delete(self, file_id):
self.drive.files().delete(fileId=file_id).execute()
def download(self, file_id=None, file_name=None):
if not ((file_id is None) ^ (file_name is None)):
raise ValueError(
"must provide exactly one of file_id or file_name"
)
if file_name is not None:
matches = (
self
.drive
.files()
.list(q="name='%s'" % file_name)
.execute()
)['files']
if len(matches) > 1:
raise KeyError("non-unique file name")
elif len(matches) == 0:
raise KeyError("file '%s' not found" % file_name)
file_id = matches[0]['id']
result = self.drive.files().get_media(fileId=file_id).execute()
return result
def upload(self, file_path, destination):
dest_pieces = destination.split('/')
if len(dest_pieces) == 1:
name, = dest_pieces
folder = None
elif len(dest_pieces) == 2:
folder, name = dest_pieces
else:
raise NotImplementedError('no nested folders')
metadata = {"name": name}
if folder:
try:
folder_id = (
self
.drive
.files()
.list(q="name='%s'" % folder)
.execute()
)['files'][0]['id']
metadata['parents'] = [folder_id]
except:
print "folder '%s' not found" % folder
raise
self.drive.files().create(
body=metadata,
media_body=file_path
).execute()
def upload(drive, args):
with open(args.file_list, 'r') as file_list:
upload_args_list = [
line.strip().split(',')
for line in file_list
]
for file_path, destination in upload_args_list:
print "uploading %s to %s" % (file_path, destination)
drive.upload(file_path, destination)
def delete(drive, args):
drive.delete(args.file_id)
def listdir(drive, args):
print "\n".join(map(str, drive.listdir(args.folder_name)))
def download(drive, args):
result = drive.download(
file_id=args.file_id,
file_name=args.file_name
)
import sys
sys.stdout.write(result)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("cred_file")
subparsers = parser.add_subparsers()
upload_parser = subparsers.add_parser("upload")
upload_parser.add_argument("file_list")
upload_parser.set_defaults(func=upload)
listdir_parser = subparsers.add_parser("listdir")
listdir_parser.add_argument("folder_name")
listdir_parser.set_defaults(func=listdir)
delete_parser = subparsers.add_parser("delete")
delete_parser.add_argument("file_id")
delete_parser.set_defaults(func=delete)
download_parser = subparsers.add_parser("download")
download_parser.add_argument("--file_id")
download_parser.add_argument("--file_name")
download_parser.set_defaults(func=download)
args = parser.parse_args()
drive = Drive(args.cred_file)
args.func(drive, args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment