Last active
September 28, 2022 02:40
-
-
Save salut1618/a786c7921f930ac5e330f80ade280de6 to your computer and use it in GitHub Desktop.
drive api python wrapper, Oauth2 client credentials required as credentials.json file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# google-api-python-client | |
# google-auth-httplib2 | |
# google-auth-oauthlib | |
# requests | |
import asyncio | |
from dataclasses import dataclass | |
from pathlib import Path | |
from typing import Union | |
from googleapiclient.errors import HttpError | |
from google.oauth2.credentials import Credentials | |
from google.auth.transport.requests import Request | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload, MediaIoBaseUpload, MediaIoBaseDownload | |
from io import BytesIO | |
from json import loads, dumps | |
import os, mimetypes | |
import requests as r | |
mimetypes.init() | |
SCOPES = ["https://www.googleapis.com/auth/drive", | |
"https://www.googleapis.com/auth/drive.appdata", | |
"https://www.googleapis.com/auth/drive.file", | |
"https://www.googleapis.com/auth/drive.metadata"] | |
@dataclass | |
class Item: | |
name: str | |
id: str | |
is_directory: bool | |
parent_id: str | |
size: int | |
def isURL(url): | |
try: | |
r.head(url) | |
return True | |
except: | |
return False | |
class Drivebase: | |
def __init__(self) -> None: | |
self.creds = None | |
self.service = None | |
def init(self, credentials: dict = None): | |
if credentials: | |
creds = Credentials.from_authorized_user_info(credentials) | |
else: | |
creds = None | |
if os.path.exists('token.json'): | |
creds = Credentials.from_authorized_user_file('token.json', SCOPES) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file( | |
'credentials.json', SCOPES, | |
redirect_uri='urn:ietf:wg:oauth:2.0:oob') | |
flow.credentials | |
auth_url, _ = flow.authorization_url(prompt='consent') | |
print('Please go to this URL: {}'.format(auth_url)) | |
# The user will get an authorization code. This code is used to get the | |
# access token. | |
code = input('Enter the authorization code: ') | |
flow.fetch_token(code=code) | |
creds = flow.credentials | |
# Save the credentials for the next run | |
with open('token.json', 'w') as token: | |
token.write(creds.to_json()) | |
self.creds = creds | |
self.service = build('drive', 'v3', credentials=self.creds) | |
def __get_mime(self, name, default=None): | |
mime, e = mimetypes.guess_type(name) | |
if mime is None: mime = default | |
return mime | |
def search(self, search_query): | |
"""uses q paramter to search items with google api | |
Args: | |
search_query (str): search query [https://developers.google.com/drive/api/guides/ref-search-terms] | |
Yields: | |
Item: search items | |
""" | |
nextPageToken = None | |
while True: | |
# pylint: disable=maybe-no-member | |
results = self.service.files().list(q=search_query, | |
fields="nextPageToken, files(id, name, mimeType, size)", | |
pageToken=nextPageToken or "").execute() | |
items = results.get('files', []) | |
for item in items: | |
yield Item(item['name'], item['id'], item['mimeType']=="application/vnd.google-apps.folder", id, int(item.get('size', '0'))) | |
if "nextPageToken" in results: | |
nextPageToken = results['nextPageToken'] | |
else: | |
break | |
def get_children(self, id): | |
for e in self.search(f"'{id}' in parents and trashed=false"): | |
yield e | |
def upload(self, content: Union[str, bytes, BytesIO], name: str, parent_id: str) -> Item: | |
"""Uploads file and returns Item Object | |
Args: | |
content (Union[str, bytes]): either a URL, path or a bytes object | |
name (str): name of the file to be stored as | |
parent_id (str): parent id (aka directory id) | |
Returns: | |
Item: the uploaded file | |
""" | |
file_metadata = {"name": name, 'parents': [parent_id]} | |
if isinstance(content, str): | |
if isURL(content): | |
res = r.get(content) | |
# pylint: disable=maybe-no-member | |
file = self.service.files().create( | |
body=file_metadata, | |
media_body=MediaIoBaseUpload(BytesIO(res.content), | |
res.headers['content-type'], | |
resumable=True | |
), | |
fields='id' | |
).execute() | |
else: | |
# pylint: disable=maybe-no-member | |
file = self.service.files().create( | |
body=file_metadata, | |
media_body=MediaFileUpload(content, resumable=True), | |
fields='id' | |
).execute() | |
else: | |
mime = self.__get_mime(name, "application/octet-stream") | |
# pylint: disable=maybe-no-member | |
file = self.service.files().create( | |
body=file_metadata, | |
media_body=MediaIoBaseUpload(content if isinstance(content, BytesIO) else BytesIO(content), | |
mime, | |
resumable=True), | |
fields='id, size' | |
).execute() | |
return Item(name, file.get('id'), False, parent_id, int(file.get('size', '0'))) | |
def create_path(self, path: str) -> Union[Item, None]: | |
"""creates a path and returns an item object if succesfull otherwise None | |
Args: | |
path (str): the path to create e.g. "foo/bar/" | |
Returns: | |
Union[Item, None]: | |
""" | |
item = None | |
last_parent_id = "root" | |
for (i, name) in enumerate([x for x in path.split('/') if x]): | |
item = None | |
for child in self.search(f"'{last_parent_id}' in parents and name='{name}' and trashed=false"): | |
last_parent_id = child.id | |
item = child | |
break | |
if item is None: | |
file_metadata = { | |
'name': name, | |
'parents': [last_parent_id], | |
'mimeType': 'application/vnd.google-apps.folder' | |
} | |
# pylint: disable=maybe-no-member | |
file = self.service.files().create(body=file_metadata, fields='id' | |
).execute() | |
item = Item(name, file['id'], True, last_parent_id, 0) | |
last_parent_id = item.id | |
if i == len([x for x in path.split('/') if x]) - 1: | |
return item | |
def get(self, item_id, out_type: Union[str, bytes, BytesIO, dict]) -> Union[str, bytes, BytesIO, dict]: | |
"""Retrieves an item from drive and returns as string, bytes, bytesio or json | |
Args: | |
item_id (str): id of item | |
out_type (Union[str, bytes, BytesIO, dict]): return type | |
Returns: | |
Union[str, bytes, BytesIO, dict]: downlaoded content | |
""" | |
try: | |
# pylint: disable=maybe-no-member | |
request = self.service.files().get_media(fileId=item_id) | |
file = BytesIO() | |
downloader = MediaIoBaseDownload(file, request) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
except HttpError as error: | |
print(F'An error occurred: {error}') | |
file = None | |
return None | |
if out_type == str: | |
return file.getvalue().decode() | |
elif out_type == BytesIO: | |
return file | |
elif out_type == bytes: | |
return file.getvalue() | |
elif out_type == dict: | |
return loads(file.getvalue().decode()) | |
def get_item_by_id(self, item_id) -> Item: | |
"""Retrieve Item object by querying its ID | |
Returns: | |
Item: The Item representing the id | |
""" | |
try: | |
res = self.service.files().get(fileId=item_id, fields='parents,name,mimeType').execute() | |
return Item(res.get('name'), item_id, res.get('mimeType') == "application/vnd.google-apps.folder", res.get('parents', [item_id])[0], int(res.get('size', '0'))) | |
except HttpError as error: | |
print(F'An error occurred: {error}') | |
return None | |
def get_item_by_path(self, path) -> Union[Item, None]: | |
last_parent_id = "root" | |
for (i, name) in enumerate([x for x in path.split('/') if x]): | |
item = None | |
for child in self.search(f"'{last_parent_id}' in parents and name='{name}' and trashed=false"): | |
if child.name == name: | |
last_parent_id = child.id | |
item = child | |
break | |
if item is None: | |
return None | |
if i == len([x for x in path.split('/') if x]) - 1: | |
return item | |
def delete(self, file_id) -> bool: | |
"""Deletes an item by a given id | |
""" | |
try: | |
self.service.files().delete(fileId=file_id, fields='parents,name,mimeType').execute() | |
return True | |
except HttpError as error: | |
print(F'An error occurred: {error}') | |
return False | |
def rename(self, file_id: str, new_name: str): | |
"""Renames an item by a given id to a given name | |
""" | |
try: | |
name = new_name | |
_i = self.get_item_by_id(file_id) | |
body = {'name':name} | |
if not _i.is_directory: | |
if self.__get_mime(name) is None: | |
name += "." + _i.name.split('.')[-1] | |
else: | |
body['mimeType'] = self.__get_mime(name, "application/octet-stream") | |
self.service.files().update(fileId=file_id, body=body).execute() | |
return True | |
except HttpError as error: | |
print(F'An error occurred: {error}') | |
return False | |
def move(self, item: Item, new_parent_id: str) -> Union[Item,None]: | |
try: | |
res = self.service.files().update(fileId=item.id, addParents=new_parent_id,removeParents=item.parent_id).execute() | |
item.parent_id = new_parent_id | |
return item | |
except HttpError as error: | |
print(F'An error occurred: {error}') | |
return None | |
def copy(self, item_id, new_name, new_parent_id) -> Union[Item,None]: | |
try: | |
file_metadata = {'parents': [new_parent_id], "name": new_name} | |
res = self.service.files().copy(fileId=item_id, body=file_metadata).execute() | |
return Item(res.get('name'), item_id, res.get('mimeType') == "application/vnd.google-apps.folder", new_parent_id, int(res.get('size', '0'))) | |
except HttpError as error: | |
print(F'An error occurred: {error}') | |
return None | |
class AsyncDrivebase(Drivebase): | |
def __init__(self) -> None: | |
super().__init__() | |
async def init(self, credentials: dict = None): | |
return await asyncio.to_thread(super().init, *(credentials,)) | |
async def create_path(self, path: str) -> Union[Item, None]: | |
return await asyncio.to_thread( super().create_path, *(path,)) | |
async def async_search(self, search_query): | |
"""creates a path and returns an item object if succesfull otherwise None | |
Args: | |
path (str): the path to create e.g. "foo/bar/" | |
Returns: | |
Union[Item, None]: | |
""" | |
nextPageToken = None | |
while True: | |
# pylint: disable=maybe-no-member | |
results = await asyncio.to_thread(self.service.files().list(q=search_query, | |
fields="nextPageToken, files(id, name, mimeType, size)", | |
pageToken=nextPageToken or "").execute) | |
items = results.get('files', []) | |
for item in items: | |
yield Item(item['name'], item['id'], item['mimeType']=="application/vnd.google-apps.folder", id, int(item.get('size', '0'))) | |
if "nextPageToken" in results: | |
nextPageToken = results['nextPageToken'] | |
else: | |
break | |
async def get_children(self, id): | |
async for e in self.async_search(f"'{id}' in parents and trashed=false"): | |
yield e | |
async def upload(self, content: Union[str, bytes, BytesIO], name: str, parent_id: str) -> Item: | |
return await asyncio.to_thread( super().upload, *((content, name, parent_id,))) | |
async def get(self, item_id, out_type: Union[str, bytes, BytesIO, dict]) -> Union[str, bytes, BytesIO, dict]: | |
return await asyncio.to_thread(super().get, *(item_id, out_type,)) | |
async def get_item_by_id(self, item_id) -> Item: | |
return await asyncio.to_thread(super().get_item_by_id, *(item_id,)) | |
async def get_item_by_path(self, path) -> Union[Item, None]: | |
return await asyncio.to_thread( super().get_item_by_path, *(path,)) | |
async def delete(self, file_id) -> bool: | |
return await asyncio.to_thread(super().delete, *(file_id,)) | |
async def rename(self, file_id: str, new_name: str): | |
return await asyncio.to_thread(super().rename, *(file_id, new_name,)) | |
async def move(self, item: Item, new_parent_id: str) -> Union[Item, None]: | |
return await asyncio.to_thread(super().move, *(item, new_parent_id,)) | |
async def copy(self, item_id, new_name, new_parent_id) -> Union[Item, None]: | |
return await asyncio.to_thread(super().copy, *(item_id, new_name, new_parent_id,)) | |
def main(): | |
db = Drivebase() | |
db.init() | |
# create a folder | |
parent = db.create_path('tests/1') | |
# create a text file with hello world | |
item = db.upload(b'hello world', "hello.txt", parent.id) | |
# delete it | |
input(f"Click Enter to delete the newly created file\n") | |
db.delete(item.id) | |
#print all children of root | |
input("Click enter to print children of root") | |
for child in db.get_children('root'): | |
print(f"{child.name}") | |
# db.get_item_by_id('ID') | |
# db.get_item_by_path('PATH/TO/ITEM') | |
# for res in db.search("name='THENAME.EXT'"): | |
# ... | |
# db.rename("ID", "NEW NAME") | |
if __name__ == '__main__': | |
main() | |
# test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment