Last active
September 19, 2018 18:31
-
-
Save ghassani/778eb68f7b99820426d68d5616e9cbf4 to your computer and use it in GitHub Desktop.
YouTube Channel Video Downloader and Cataloger. Also Website Generator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import argparse | |
from slugify import slugify | |
import os | |
from jinja2 import Environment, FileSystemLoader | |
#import youtube_dl | |
def init_database(path): | |
connection = sqlite3.connect(path) | |
cursor = connection.cursor() | |
return connection | |
def db_find_videos(dbconn, channelId): | |
cursor = dbconn.cursor() | |
cursor.execute('''SELECT * FROM video WHERE channel_id = ?''', (channelId, )) | |
ret = [] | |
for row in cursor.fetchall(): | |
ret.append({ "id" : row[0], "channel_id" : row[1], "name" : row[2], "video_id" : row[3], "published_at" : row[4], "is_favorite" : row[5], "output_file" : row[6] }) | |
return ret | |
def db_find_channel(dbconn, channelId): | |
cursor = dbconn.cursor() | |
cursor.execute('''SELECT * FROM channel WHERE channel_id = ?''', (channelId,)) | |
row = cursor.fetchone() | |
if row is not None: | |
return { "id" : row[0], "channel_id" : row[1], "name" : row[2]} | |
return | |
def main(): | |
parser = argparse.ArgumentParser(description='YouTube Channel Video Downloader') | |
parser.add_argument("channel_id", help="The YouTube channel ID") | |
parser.add_argument("-t", "--templates", help="Path to template files", default='templates') | |
parser.add_argument("-o", "--output", help="Path to download files to", default='website-out') | |
parser.add_argument("-d", "--database", help="Path to database file", default='database.sqlite3') | |
parser.add_argument("-p", "--prependuri", help="Prepend this to the URI", default='') | |
parser.add_argument("-c", "--prependurichannel", help="Prepend the channel name to the URI", action='store_true', default=False) | |
args = parser.parse_args() | |
db = init_database(args.database); | |
db_channel = db_find_channel(db, args.channel_id); | |
if db_channel is None: | |
print 'YouTube Channel ID %s Not Found' % args.channel_id | |
return 1 | |
videos = db_find_videos(db, db_channel['id']) | |
env = Environment( | |
loader=FileSystemLoader(args.templates, followlinks=True) | |
) | |
for video in videos: | |
if args.prependuri: | |
video['url_slug'] = slugify('%s-%s-%s' % (args.prependuri, video['name'],video['video_id'])) | |
elif args.prependurichannel: | |
video['url_slug'] = slugify('%s-%s-%s' % (db_channel['name'], video['name'],video['video_id'])) | |
else: | |
video['url_slug'] = slugify('%s-%s' % (video['name'],video['video_id'])) | |
#video['published_at_obj'] = parser.parse(video['published_at'].encode('utf-8').strip()) | |
homepage_template = env.get_template('home.html') | |
video_template = env.get_template('video.html') | |
homepage_result = homepage_template.render(videos=videos).encode('utf-8').strip() | |
f = open('%s/index.html' % (args.output), "w") | |
f.write(homepage_result) | |
f.close() | |
for video in videos: | |
video_result = video_template.render(videos=videos, video=video).encode('utf-8').strip() | |
if args.prependuri: | |
uri = slugify('%s-%s-%s' % (args.prependuri, video['name'],video['video_id'])) | |
elif args.prependurichannel: | |
uri = slugify('%s-%s-%s' % (db_channel['name'], video['name'],video['video_id'])) | |
else: | |
uri = slugify('%s-%s' % (video['name'],video['video_id'])) | |
f = open('%s/video/%s.html' % (args.output, uri), "w") | |
f.write(video_result) | |
f.close() | |
db.close() | |
return 0 | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
from apiclient.discovery import build | |
import argparse | |
from subprocess import call | |
import os | |
#import youtube_dl | |
api_key = '' | |
def init_youtube(): | |
return build('youtube', 'v3', developerKey=api_key) | |
def init_database(path): | |
connection = sqlite3.connect(path) | |
cursor = connection.cursor() | |
cursor.execute('CREATE TABLE IF NOT EXISTS channel (id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT, name VARCHAR(70))') | |
cursor.execute('CREATE TABLE IF NOT EXISTS video (id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id INTEGER, name VARCHAR(70), video_id TEXT, published_at TEXT, is_favorite TINYINT(1), output_file TEXT)') | |
return connection | |
def youtube_lookup_channel(youtube, channelId): | |
response = youtube.search().list( | |
part='id,snippet', | |
type='channel', | |
channelId=channelId, | |
maxResults=1, | |
order='date' | |
).execute() | |
for search_result in response.get('items', []): | |
if 'channelId' in search_result['id'] and search_result['id']['channelId'] == channelId: | |
return search_result | |
return None | |
def youtube_lookup_channel_videos(youtube, channelId, nextPageToken = None): | |
if nextPageToken is not None: | |
search_response = youtube.search().list( | |
part='id,snippet', | |
type='video', | |
pageToken=nextPageToken, | |
channelId=channelId, | |
maxResults=50, | |
order='date' | |
).execute() | |
else: | |
search_response = youtube.search().list( | |
part='id,snippet', | |
type='video', | |
channelId=channelId, | |
maxResults=50, | |
order='date' | |
).execute() | |
return search_response | |
def db_find_channel(dbconn, channelId): | |
cursor = dbconn.cursor() | |
cursor.execute('''SELECT * FROM channel WHERE channel_id = ?''', (channelId,)) | |
row = cursor.fetchone() | |
if row is not None: | |
return { "id" : row[0], "channel_id" : row[1], "name" : row[2]} | |
return | |
def db_create_channel(dbconn, channelId, channelName): | |
cursor = dbconn.cursor() | |
cursor.execute('''INSERT INTO channel VALUES(NULL,?,?)''', (channelId,channelName,)) | |
ret = { "id" : cursor.lastrowid, "channel_id" : channelId, "name" : channelName } | |
dbconn.commit() | |
return ret | |
def db_find_video(dbconn, channelId, videoId): | |
cursor = dbconn.cursor() | |
cursor.execute('''SELECT * FROM video WHERE channel_id = ? AND video_id = ?''', (channelId, videoId, )) | |
row = cursor.fetchone() | |
if row is not None: | |
return { "id" : row[0], "channel_id" : row[1], "name" : row[2], "video_id" : row[3], "published_at" : row[4], "is_favorite" : row[5], "output_file" : row[6] } | |
return | |
def db_find_videos(dbconn, channelId): | |
cursor = dbconn.cursor() | |
cursor.execute('''SELECT * FROM video WHERE channel_id = ?''', (channelId, )) | |
ret = [] | |
for row in cursor.fetchall(): | |
ret.append({ "id" : row[0], "channel_id" : row[1], "name" : row[2], "video_id" : row[3], "published_at" : row[4], "is_favorite" : row[5], "output_file" : row[6] }) | |
return ret | |
def db_create_video(dbconn, channelId, videoName, videoId, publishedAt, isFavorite = 0, outputFile = ''): | |
cursor = dbconn.cursor() | |
cursor.execute('''INSERT INTO video VALUES(NULL,?,?,?,?,?,?)''', (channelId,videoName,videoId,publishedAt,isFavorite,outputFile)) | |
ret = { "id" : cursor.lastrowid, "channel_id" : channelId, "name" : videoName, "video_id" : videoId, "published_at" : publishedAt, "is_favorite" : isFavorite, "output_file" : outputFile } | |
dbconn.commit() | |
return ret | |
def db_update_video(dbconn, video): | |
cursor = dbconn.cursor() | |
cursor.execute('''UPDATE video SET channel_id = ?, name = ?, video_id = ?, published_at = ?, is_favorite = ?, output_file = ? WHERE id = ?''', (video['channel_id'],video['name'],video['video_id'], video['published_at'], video['is_favorite'], video['output_file'], video['id'])) | |
dbconn.commit() | |
return video | |
def process_videos(dbconn, db_channel, item_results): | |
for result in item_results: | |
if 'videoId' in result['id']: | |
db_video = db_find_video(dbconn, db_channel['id'], result['id']['videoId']) | |
if db_video is None: | |
print 'Creating Video %s ID %s Channel %d Created On %s' % ( result['id']['videoId'], result['snippet']['title'], db_channel['id'], result['snippet']['publishedAt']) | |
db_video = db_create_video(dbconn, db_channel['id'], result['snippet']['title'], result['id']['videoId'], result['snippet']['publishedAt']) | |
def main(): | |
parser = argparse.ArgumentParser(description='YouTube Channel Video Downloader') | |
parser.add_argument("channel_id", help="The YouTube channel ID") | |
parser.add_argument("-o", "--output", help="Path to download files to", default='') | |
parser.add_argument("-d", "--database", help="Path to database file", default='database.sqlite3') | |
args = parser.parse_args() | |
youtube = init_youtube(); | |
db = init_database(args.database); | |
ytchannel = youtube_lookup_channel(youtube, args.channel_id) | |
if ytchannel is None: | |
print 'YouTube Channel ID %s Not Found' % args.channel_id | |
return 1 | |
db_channel = db_find_channel(db, ytchannel['id']['channelId']); | |
if db_channel is None: | |
print "Creating Channel with ID %s - %s" % ( ytchannel['id']['channelId'], ytchannel['snippet']['title'] ) | |
db_channel = db_create_channel(db, ytchannel['id']['channelId'], ytchannel['snippet']['title']) | |
print db_channel | |
response = youtube_lookup_channel_videos(youtube, db_channel['channel_id']) | |
process_videos(db, db_channel, response.get('items', [])) | |
nextPageToken = response.get('nextPageToken', None) | |
while nextPageToken is not None: | |
response = youtube_lookup_channel_videos(youtube, db_channel['channel_id'], nextPageToken) | |
process_videos(db, db_channel, response.get('items', [])) | |
nextPageToken = response.get('nextPageToken', None) | |
for video in db_find_videos(db, db_channel['id']): | |
if args.output: | |
out_dir = args.output | |
output_template = '%s/%s' % ( args.output, '%(id)s.%(ext)s' ) | |
else: | |
out_dir = './' | |
output_template = '%s/%s' % ( args.output, '%(id)s.%(ext)s' ) | |
call(["youtube-dl", '-o', output_template, 'https://www.youtube.com/watch?v=%s' % video['video_id']]) | |
for file in os.listdir(out_dir): | |
if video['video_id'] in file and not file.endswith(".part"): | |
print 'Found File %s' % file | |
video['output_file'] = file | |
db_update_video(db, video) | |
db.close() | |
return 0 | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
youtube-dl | |
google-api-python-client | |
python-slugify | |
jinja2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment