Created
January 23, 2019 22:00
-
-
Save callistabee/36e703a7a5d6e7eb26618f90a23e4f41 to your computer and use it in GitHub Desktop.
super simple utility for interacting with google drive api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
import argparse | |
class Drive: | |
def __init__(self, cred_path): | |
credentials = ( | |
service_account | |
.Credentials | |
.from_service_account_file( | |
cred_path, | |
scopes=["https://www.googleapis.com/auth/drive"] | |
) | |
) | |
self.drive = build("drive", "v3", credentials=credentials) | |
def listdir(self, folder): | |
try: | |
folder_id = ( | |
self | |
.drive | |
.files() | |
.list(q="name='%s'" % folder) | |
.execute() | |
)['files'][0]['id'] | |
except IndexError: | |
raise KeyError("folder '%s' not found" % folder) | |
results = ( | |
self | |
.drive | |
.files() | |
.list(q="'%s' in parents" % folder_id) | |
.execute() | |
) | |
return [ | |
(f['name'], f['id']) | |
for f in results['files'] | |
] | |
def delete(self, file_id): | |
self.drive.files().delete(fileId=file_id).execute() | |
def download(self, file_id=None, file_name=None): | |
if not ((file_id is None) ^ (file_name is None)): | |
raise ValueError( | |
"must provide exactly one of file_id or file_name" | |
) | |
if file_name is not None: | |
matches = ( | |
self | |
.drive | |
.files() | |
.list(q="name='%s'" % file_name) | |
.execute() | |
)['files'] | |
if len(matches) > 1: | |
raise KeyError("non-unique file name") | |
elif len(matches) == 0: | |
raise KeyError("file '%s' not found" % file_name) | |
file_id = matches[0]['id'] | |
result = self.drive.files().get_media(fileId=file_id).execute() | |
return result | |
def upload(self, file_path, destination): | |
dest_pieces = destination.split('/') | |
if len(dest_pieces) == 1: | |
name, = dest_pieces | |
folder = None | |
elif len(dest_pieces) == 2: | |
folder, name = dest_pieces | |
else: | |
raise NotImplementedError('no nested folders') | |
metadata = {"name": name} | |
if folder: | |
try: | |
folder_id = ( | |
self | |
.drive | |
.files() | |
.list(q="name='%s'" % folder) | |
.execute() | |
)['files'][0]['id'] | |
metadata['parents'] = [folder_id] | |
except: | |
print "folder '%s' not found" % folder | |
raise | |
self.drive.files().create( | |
body=metadata, | |
media_body=file_path | |
).execute() | |
def upload(drive, args): | |
with open(args.file_list, 'r') as file_list: | |
upload_args_list = [ | |
line.strip().split(',') | |
for line in file_list | |
] | |
for file_path, destination in upload_args_list: | |
print "uploading %s to %s" % (file_path, destination) | |
drive.upload(file_path, destination) | |
def delete(drive, args): | |
drive.delete(args.file_id) | |
def listdir(drive, args): | |
print "\n".join(map(str, drive.listdir(args.folder_name))) | |
def download(drive, args): | |
result = drive.download( | |
file_id=args.file_id, | |
file_name=args.file_name | |
) | |
import sys | |
sys.stdout.write(result) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("cred_file") | |
subparsers = parser.add_subparsers() | |
upload_parser = subparsers.add_parser("upload") | |
upload_parser.add_argument("file_list") | |
upload_parser.set_defaults(func=upload) | |
listdir_parser = subparsers.add_parser("listdir") | |
listdir_parser.add_argument("folder_name") | |
listdir_parser.set_defaults(func=listdir) | |
delete_parser = subparsers.add_parser("delete") | |
delete_parser.add_argument("file_id") | |
delete_parser.set_defaults(func=delete) | |
download_parser = subparsers.add_parser("download") | |
download_parser.add_argument("--file_id") | |
download_parser.add_argument("--file_name") | |
download_parser.set_defaults(func=download) | |
args = parser.parse_args() | |
drive = Drive(args.cred_file) | |
args.func(drive, args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment