Skip to content

Instantly share code, notes, and snippets.

@salut1618
Last active September 28, 2022 02:40
Show Gist options
  • Save salut1618/a786c7921f930ac5e330f80ade280de6 to your computer and use it in GitHub Desktop.
Save salut1618/a786c7921f930ac5e330f80ade280de6 to your computer and use it in GitHub Desktop.
drive api python wrapper, Oauth2 client credentials required as credentials.json file
# google-api-python-client
# google-auth-httplib2
# google-auth-oauthlib
# requests
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Union
from googleapiclient.errors import HttpError
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseUpload, MediaIoBaseDownload
from io import BytesIO
from json import loads, dumps
import os, mimetypes
import requests as r
mimetypes.init()
SCOPES = ["https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/drive.appdata",
"https://www.googleapis.com/auth/drive.file",
"https://www.googleapis.com/auth/drive.metadata"]
@dataclass
class Item:
name: str
id: str
is_directory: bool
parent_id: str
size: int
def isURL(url):
try:
r.head(url)
return True
except:
return False
class Drivebase:
def __init__(self) -> None:
self.creds = None
self.service = None
def init(self, credentials: dict = None):
if credentials:
creds = Credentials.from_authorized_user_info(credentials)
else:
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES,
redirect_uri='urn:ietf:wg:oauth:2.0:oob')
flow.credentials
auth_url, _ = flow.authorization_url(prompt='consent')
print('Please go to this URL: {}'.format(auth_url))
# The user will get an authorization code. This code is used to get the
# access token.
code = input('Enter the authorization code: ')
flow.fetch_token(code=code)
creds = flow.credentials
# Save the credentials for the next run
with open('token.json', 'w') as token:
token.write(creds.to_json())
self.creds = creds
self.service = build('drive', 'v3', credentials=self.creds)
def __get_mime(self, name, default=None):
mime, e = mimetypes.guess_type(name)
if mime is None: mime = default
return mime
def search(self, search_query):
"""uses q paramter to search items with google api
Args:
search_query (str): search query [https://developers.google.com/drive/api/guides/ref-search-terms]
Yields:
Item: search items
"""
nextPageToken = None
while True:
# pylint: disable=maybe-no-member
results = self.service.files().list(q=search_query,
fields="nextPageToken, files(id, name, mimeType, size)",
pageToken=nextPageToken or "").execute()
items = results.get('files', [])
for item in items:
yield Item(item['name'], item['id'], item['mimeType']=="application/vnd.google-apps.folder", id, int(item.get('size', '0')))
if "nextPageToken" in results:
nextPageToken = results['nextPageToken']
else:
break
def get_children(self, id):
for e in self.search(f"'{id}' in parents and trashed=false"):
yield e
def upload(self, content: Union[str, bytes, BytesIO], name: str, parent_id: str) -> Item:
"""Uploads file and returns Item Object
Args:
content (Union[str, bytes]): either a URL, path or a bytes object
name (str): name of the file to be stored as
parent_id (str): parent id (aka directory id)
Returns:
Item: the uploaded file
"""
file_metadata = {"name": name, 'parents': [parent_id]}
if isinstance(content, str):
if isURL(content):
res = r.get(content)
# pylint: disable=maybe-no-member
file = self.service.files().create(
body=file_metadata,
media_body=MediaIoBaseUpload(BytesIO(res.content),
res.headers['content-type'],
resumable=True
),
fields='id'
).execute()
else:
# pylint: disable=maybe-no-member
file = self.service.files().create(
body=file_metadata,
media_body=MediaFileUpload(content, resumable=True),
fields='id'
).execute()
else:
mime = self.__get_mime(name, "application/octet-stream")
# pylint: disable=maybe-no-member
file = self.service.files().create(
body=file_metadata,
media_body=MediaIoBaseUpload(content if isinstance(content, BytesIO) else BytesIO(content),
mime,
resumable=True),
fields='id, size'
).execute()
return Item(name, file.get('id'), False, parent_id, int(file.get('size', '0')))
def create_path(self, path: str) -> Union[Item, None]:
"""creates a path and returns an item object if succesfull otherwise None
Args:
path (str): the path to create e.g. "foo/bar/"
Returns:
Union[Item, None]:
"""
item = None
last_parent_id = "root"
for (i, name) in enumerate([x for x in path.split('/') if x]):
item = None
for child in self.search(f"'{last_parent_id}' in parents and name='{name}' and trashed=false"):
last_parent_id = child.id
item = child
break
if item is None:
file_metadata = {
'name': name,
'parents': [last_parent_id],
'mimeType': 'application/vnd.google-apps.folder'
}
# pylint: disable=maybe-no-member
file = self.service.files().create(body=file_metadata, fields='id'
).execute()
item = Item(name, file['id'], True, last_parent_id, 0)
last_parent_id = item.id
if i == len([x for x in path.split('/') if x]) - 1:
return item
def get(self, item_id, out_type: Union[str, bytes, BytesIO, dict]) -> Union[str, bytes, BytesIO, dict]:
"""Retrieves an item from drive and returns as string, bytes, bytesio or json
Args:
item_id (str): id of item
out_type (Union[str, bytes, BytesIO, dict]): return type
Returns:
Union[str, bytes, BytesIO, dict]: downlaoded content
"""
try:
# pylint: disable=maybe-no-member
request = self.service.files().get_media(fileId=item_id)
file = BytesIO()
downloader = MediaIoBaseDownload(file, request)
done = False
while done is False:
status, done = downloader.next_chunk()
except HttpError as error:
print(F'An error occurred: {error}')
file = None
return None
if out_type == str:
return file.getvalue().decode()
elif out_type == BytesIO:
return file
elif out_type == bytes:
return file.getvalue()
elif out_type == dict:
return loads(file.getvalue().decode())
def get_item_by_id(self, item_id) -> Item:
"""Retrieve Item object by querying its ID
Returns:
Item: The Item representing the id
"""
try:
res = self.service.files().get(fileId=item_id, fields='parents,name,mimeType').execute()
return Item(res.get('name'), item_id, res.get('mimeType') == "application/vnd.google-apps.folder", res.get('parents', [item_id])[0], int(res.get('size', '0')))
except HttpError as error:
print(F'An error occurred: {error}')
return None
def get_item_by_path(self, path) -> Union[Item, None]:
last_parent_id = "root"
for (i, name) in enumerate([x for x in path.split('/') if x]):
item = None
for child in self.search(f"'{last_parent_id}' in parents and name='{name}' and trashed=false"):
if child.name == name:
last_parent_id = child.id
item = child
break
if item is None:
return None
if i == len([x for x in path.split('/') if x]) - 1:
return item
def delete(self, file_id) -> bool:
"""Deletes an item by a given id
"""
try:
self.service.files().delete(fileId=file_id, fields='parents,name,mimeType').execute()
return True
except HttpError as error:
print(F'An error occurred: {error}')
return False
def rename(self, file_id: str, new_name: str):
"""Renames an item by a given id to a given name
"""
try:
name = new_name
_i = self.get_item_by_id(file_id)
body = {'name':name}
if not _i.is_directory:
if self.__get_mime(name) is None:
name += "." + _i.name.split('.')[-1]
else:
body['mimeType'] = self.__get_mime(name, "application/octet-stream")
self.service.files().update(fileId=file_id, body=body).execute()
return True
except HttpError as error:
print(F'An error occurred: {error}')
return False
def move(self, item: Item, new_parent_id: str) -> Union[Item,None]:
try:
res = self.service.files().update(fileId=item.id, addParents=new_parent_id,removeParents=item.parent_id).execute()
item.parent_id = new_parent_id
return item
except HttpError as error:
print(F'An error occurred: {error}')
return None
def copy(self, item_id, new_name, new_parent_id) -> Union[Item,None]:
try:
file_metadata = {'parents': [new_parent_id], "name": new_name}
res = self.service.files().copy(fileId=item_id, body=file_metadata).execute()
return Item(res.get('name'), item_id, res.get('mimeType') == "application/vnd.google-apps.folder", new_parent_id, int(res.get('size', '0')))
except HttpError as error:
print(F'An error occurred: {error}')
return None
class AsyncDrivebase(Drivebase):
def __init__(self) -> None:
super().__init__()
async def init(self, credentials: dict = None):
return await asyncio.to_thread(super().init, *(credentials,))
async def create_path(self, path: str) -> Union[Item, None]:
return await asyncio.to_thread( super().create_path, *(path,))
async def async_search(self, search_query):
"""creates a path and returns an item object if succesfull otherwise None
Args:
path (str): the path to create e.g. "foo/bar/"
Returns:
Union[Item, None]:
"""
nextPageToken = None
while True:
# pylint: disable=maybe-no-member
results = await asyncio.to_thread(self.service.files().list(q=search_query,
fields="nextPageToken, files(id, name, mimeType, size)",
pageToken=nextPageToken or "").execute)
items = results.get('files', [])
for item in items:
yield Item(item['name'], item['id'], item['mimeType']=="application/vnd.google-apps.folder", id, int(item.get('size', '0')))
if "nextPageToken" in results:
nextPageToken = results['nextPageToken']
else:
break
async def get_children(self, id):
async for e in self.async_search(f"'{id}' in parents and trashed=false"):
yield e
async def upload(self, content: Union[str, bytes, BytesIO], name: str, parent_id: str) -> Item:
return await asyncio.to_thread( super().upload, *((content, name, parent_id,)))
async def get(self, item_id, out_type: Union[str, bytes, BytesIO, dict]) -> Union[str, bytes, BytesIO, dict]:
return await asyncio.to_thread(super().get, *(item_id, out_type,))
async def get_item_by_id(self, item_id) -> Item:
return await asyncio.to_thread(super().get_item_by_id, *(item_id,))
async def get_item_by_path(self, path) -> Union[Item, None]:
return await asyncio.to_thread( super().get_item_by_path, *(path,))
async def delete(self, file_id) -> bool:
return await asyncio.to_thread(super().delete, *(file_id,))
async def rename(self, file_id: str, new_name: str):
return await asyncio.to_thread(super().rename, *(file_id, new_name,))
async def move(self, item: Item, new_parent_id: str) -> Union[Item, None]:
return await asyncio.to_thread(super().move, *(item, new_parent_id,))
async def copy(self, item_id, new_name, new_parent_id) -> Union[Item, None]:
return await asyncio.to_thread(super().copy, *(item_id, new_name, new_parent_id,))
def main():
db = Drivebase()
db.init()
# create a folder
parent = db.create_path('tests/1')
# create a text file with hello world
item = db.upload(b'hello world', "hello.txt", parent.id)
# delete it
input(f"Click Enter to delete the newly created file\n")
db.delete(item.id)
#print all children of root
input("Click enter to print children of root")
for child in db.get_children('root'):
print(f"{child.name}")
# db.get_item_by_id('ID')
# db.get_item_by_path('PATH/TO/ITEM')
# for res in db.search("name='THENAME.EXT'"):
# ...
# db.rename("ID", "NEW NAME")
if __name__ == '__main__':
main()
# test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment