|
import click |
|
import json |
|
import pickle |
|
import os |
|
import pandas |
|
from datetime import datetime |
|
from googleapiclient.errors import HttpError |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
from googleapiclient.discovery import build |
|
from google.auth.transport.requests import Request |
|
import logging |
|
|
|
class YouTubeAPI: |
|
SERVICE = None |
|
API_NAME = 'youtube' |
|
API_VERSION = 'v3' |
|
SCOPES = ['https://www.googleapis.com/auth/youtube'] |
|
CLIENT_SECRET = 'client-secret.json' |
|
|
|
def __init__(self): |
|
# Loops until service is created, |
|
while (YouTubeAPI.createService() != True): |
|
pass |
|
|
|
@classmethod |
|
def createService(cls, prefix=''): |
|
""" |
|
Creates a service instance to be referenced by :class:`YouTubeAPI`. Function automatically runs on object creation, rerunning if service creation fails. |
|
""" |
|
cred = None |
|
working_dir = os.getcwd() |
|
token_dir = 'token files' |
|
pickle_file = f'token_{cls.API_NAME}_{cls.API_VERSION}{prefix}.pickle' |
|
|
|
# Check if token dir exists first, if not, create the folder |
|
if not os.path.exists(os.path.join(working_dir, token_dir)): |
|
os.mkdir(os.path.join(working_dir, token_dir)) |
|
|
|
if os.path.exists(os.path.join(working_dir, token_dir, pickle_file)): |
|
with open(os.path.join(working_dir, token_dir, pickle_file), 'rb') as token: |
|
cred = pickle.load(token) |
|
|
|
try: |
|
if not cred or not cred.valid: |
|
if cred and cred.expired and cred.refresh_token: |
|
cred.refresh(Request()) |
|
else: |
|
flow = InstalledAppFlow.from_client_secrets_file(cls.CLIENT_SECRET, cls.SCOPES) |
|
cred = flow.run_local_server() |
|
|
|
with open(os.path.join(working_dir, token_dir, pickle_file), 'wb') as token: |
|
pickle.dump(cred, token) |
|
|
|
cls.SERVICE = build(cls.API_NAME, cls.API_VERSION, credentials=cred) |
|
logging.info(f'{cls.API_NAME} {cls.API_VERSION} service created successfully') |
|
return True |
|
except Exception as e: |
|
logging.error(f"Error: {e}") |
|
logging.warning(f'Failed to create {cls.API_NAME} API service instance. Retrying...') |
|
os.remove(os.path.join(working_dir, token_dir, pickle_file)) |
|
return False |
|
|
|
@classmethod |
|
def listPlaylistItems(cls, playlistId): |
|
""" |
|
Creates a modifiable list of playlist objects from a requested playlist ID. |
|
|
|
Args: |
|
playlistId (str): Valid playlist ID linking to user owned playlist. |
|
|
|
Returns: |
|
export (list): Constructed list of playlist objects. If playlistId is invalid, list will be empty. |
|
""" |
|
export = [] |
|
try: |
|
responce = cls.SERVICE.playlistItems().list( |
|
part='contentDetails', |
|
playlistId=playlistId, |
|
maxResults=50 |
|
).execute() |
|
|
|
export.extend(responce['items']) |
|
nextPageToken = responce.get('nextPageToken') |
|
logging.info(f'Starting playlist listing from token {nextPageToken}') |
|
|
|
while nextPageToken: |
|
responce = cls.SERVICE.playlistItems().list( |
|
part='contentDetails', |
|
playlistId=playlistId, |
|
maxResults=50, |
|
pageToken=nextPageToken |
|
).execute() |
|
|
|
export.extend(responce['items']) |
|
nextPageToken = responce.get('nextPageToken') |
|
logging.info(f'Listing token {nextPageToken}') |
|
if (nextPageToken == None): |
|
logging.info("Playlist object created!") |
|
return export |
|
|
|
except HttpError as e: |
|
errMsg = json.loads(e.content) |
|
logging.error(f"HTTP Error: {errMsg['error']['message']}") |
|
return [] |
|
except Exception as e: |
|
errMsg = json.loads(e.content) |
|
logging.error(f"Error: {errMsg['error']['message']}") |
|
return [] |
|
|
|
def playlistVideoExtract(self, playlistItems): |
|
""" |
|
Extracts contentDetails from each playlist object within a list |
|
|
|
Args: |
|
playlistItems (list): List of playlist objects created from :meth:`listPlaylistItems`. |
|
|
|
Returns: |
|
export (list): List of refined playlist objects, contentDetails exposed |
|
""" |
|
videos = tuple(v['contentDetails'] for v in playlistItems) |
|
export = [] |
|
|
|
for batch_num in range(0, len(videos), 50): |
|
video_batch = videos[batch_num: batch_num + 50] |
|
responce_videos = self.service().videos().list( |
|
id=','.join(list(map(lambda v: v['videoId'], video_batch))), |
|
part='snippet, contentDetails', |
|
maxResults=50 |
|
).execute() |
|
|
|
export.extend(responce_videos['items']) |
|
return export |
|
|
|
def playlistClear(self, playlistItems, queue): |
|
""" |
|
Sends API calls to delete playlist item objects on YouTube's end. |
|
|
|
Args: |
|
playlistItems (list): List of playlist objects created from :meth:`listPlaylistItems`. |
|
queue (int): Amount of playlist item objects to process. |
|
|
|
Returns: |
|
True: Method execution was successful. |
|
False: An error has occured during clearing. Typically this is triggered by max daily quota. |
|
""" |
|
try: |
|
for item in playlistItems[:queue]: |
|
self.service().playlistItems().delete(id=item['id']).execute() |
|
return True |
|
except HttpError as e: |
|
errMsg = json.loads(e.content) |
|
logging.error(f"HTTP Error: {errMsg['error']['message']}") |
|
return False |
|
|
|
@classmethod |
|
def listPlaylists(cls, channelId): |
|
pass |
|
|
|
@classmethod |
|
def service(cls): |
|
""" |
|
Pass-through for the class service object |
|
""" |
|
return cls.SERVICE |
|
|
|
def filename(playlistId, fileType): |
|
dt = datetime.now().strftime("%Y%m%d-%H%M") |
|
return 'YTPLExport_' + playlistId + '_' + dt + '.' + fileType |
|
|
|
|
|
|
|
@click.group() |
|
@click.version_option('0.2.2') |
|
@click.option('--verbose', is_flag=True, help="Sets logging level to INFO for debugging") |
|
def main(verbose): |
|
""" |
|
=== YouTube PlayList Export (YTPLE) === |
|
""" |
|
if verbose: |
|
logging.basicConfig(level=logging.INFO) |
|
pass |
|
|
|
@main.command() |
|
@click.option('--playlistId', default='LL', help='Specifies playlist ID') |
|
@click.option('--length', type=int, default=None, help='Sets the max playlist item count') |
|
@click.option('--clear', is_flag=True, help='Clears the playlist after export') |
|
def spreadsheetExport(playlistid, length, clear): |
|
""" |
|
Generates ODS spreadsheet from specified playlist ID |
|
""" |
|
apiCall = YouTubeAPI() |
|
print("Gathering playlist information...") |
|
playlistItems = apiCall.listPlaylistItems(playlistid) |
|
videos_info = apiCall.playlistVideoExtract(playlistItems) |
|
|
|
if videos_info: |
|
print(f'--Exporting {playlistid} as ODS spreadsheet--') |
|
excelWriter = pandas.ExcelWriter(filename(playlistid, 'ods')) |
|
rows = [] |
|
columns = ['Video ID', 'Video Title', 'Published', 'Video ID'] |
|
|
|
for video in videos_info[:length]: |
|
rows.append([ |
|
video['id'], |
|
video['snippet']['title'], |
|
video['snippet']['publishedAt'][:-1], |
|
f'https://youtu.be/{video["id"]}' |
|
]) |
|
|
|
df = pandas.DataFrame(data=rows, columns=columns) |
|
df.to_excel(excelWriter, sheet_name='Liked Videos', index=False) |
|
excelWriter.save() |
|
print('Spreadsheet Export Created!') |
|
|
|
if clear: |
|
print("Clearing playlist...") |
|
if (apiCall.playlistClear(playlistItems, length)): |
|
print("Done!") |
|
else: |
|
print("Unable to clear playlist! May have to try again tomorrow!") |
|
|
|
@main.command() |
|
@click.option('--playlistId', default='LL', help='Specifies playlist ID') |
|
@click.option('--length', type=int, default=None, help='Sets the max playlist item count') |
|
@click.option('--clear', is_flag=True, help='Clears the playlist after export') |
|
def bookmarkExport(playlistid, length, clear): |
|
""" |
|
Generates HTML bookmark from specified playlist ID |
|
""" |
|
apiCall = YouTubeAPI() |
|
print("Gathering playlist information...") |
|
playlistItems = apiCall.listPlaylistItems(playlistid) |
|
videos_info = apiCall.playlistVideoExtract(playlistItems) |
|
|
|
if videos_info: |
|
print(f'--Exporting {playlistid} as HTML bookmark--') |
|
htmlWriter = open(filename(playlistid, 'html'),"w", encoding='utf-8') |
|
html = '<!DOCTYPE NETSCAPE-Bookmark-file-1><META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=UTF-8"><TITLE>Bookmarks</TITLE><H1>Bookmarks Menu</H1><DL><p><DT><H3>YouTube Export</H3><DL><p>' |
|
|
|
for video in videos_info[:length]: |
|
html = html + (f'<DT><A HREF="https://www.youtube.com/watch?v={video["id"]}" TAGS="{video["snippet"]["channelTitle"]}">{video["snippet"]["title"]}</A>') |
|
|
|
html = html + '</DL><p></DL>' |
|
htmlWriter.write(html) |
|
htmlWriter.close() |
|
print("Bookmark Export Completed!") |
|
else: |
|
print("No videos detected in playlist!") |
|
|
|
if clear: |
|
print("Clearing playlist...") |
|
if (apiCall.playlistClear(playlistItems, length)): |
|
print("Done!") |
|
else: |
|
print("Unable to clear playlist! May have to try again tomorrow!") |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
main() |