Skip to content

Instantly share code, notes, and snippets.

@ghassani
Last active September 19, 2018 18:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ghassani/778eb68f7b99820426d68d5616e9cbf4 to your computer and use it in GitHub Desktop.
Save ghassani/778eb68f7b99820426d68d5616e9cbf4 to your computer and use it in GitHub Desktop.
YouTube Channel Video Downloader and Cataloger. Also Website Generator
import sqlite3
import argparse
from slugify import slugify
import os
from jinja2 import Environment, FileSystemLoader
#import youtube_dl
def init_database(path):
connection = sqlite3.connect(path)
cursor = connection.cursor()
return connection
def db_find_videos(dbconn, channelId):
cursor = dbconn.cursor()
cursor.execute('''SELECT * FROM video WHERE channel_id = ?''', (channelId, ))
ret = []
for row in cursor.fetchall():
ret.append({ "id" : row[0], "channel_id" : row[1], "name" : row[2], "video_id" : row[3], "published_at" : row[4], "is_favorite" : row[5], "output_file" : row[6] })
return ret
def db_find_channel(dbconn, channelId):
cursor = dbconn.cursor()
cursor.execute('''SELECT * FROM channel WHERE channel_id = ?''', (channelId,))
row = cursor.fetchone()
if row is not None:
return { "id" : row[0], "channel_id" : row[1], "name" : row[2]}
return
def main():
parser = argparse.ArgumentParser(description='YouTube Channel Video Downloader')
parser.add_argument("channel_id", help="The YouTube channel ID")
parser.add_argument("-t", "--templates", help="Path to template files", default='templates')
parser.add_argument("-o", "--output", help="Path to download files to", default='website-out')
parser.add_argument("-d", "--database", help="Path to database file", default='database.sqlite3')
parser.add_argument("-p", "--prependuri", help="Prepend this to the URI", default='')
parser.add_argument("-c", "--prependurichannel", help="Prepend the channel name to the URI", action='store_true', default=False)
args = parser.parse_args()
db = init_database(args.database);
db_channel = db_find_channel(db, args.channel_id);
if db_channel is None:
print 'YouTube Channel ID %s Not Found' % args.channel_id
return 1
videos = db_find_videos(db, db_channel['id'])
env = Environment(
loader=FileSystemLoader(args.templates, followlinks=True)
)
for video in videos:
if args.prependuri:
video['url_slug'] = slugify('%s-%s-%s' % (args.prependuri, video['name'],video['video_id']))
elif args.prependurichannel:
video['url_slug'] = slugify('%s-%s-%s' % (db_channel['name'], video['name'],video['video_id']))
else:
video['url_slug'] = slugify('%s-%s' % (video['name'],video['video_id']))
#video['published_at_obj'] = parser.parse(video['published_at'].encode('utf-8').strip())
homepage_template = env.get_template('home.html')
video_template = env.get_template('video.html')
homepage_result = homepage_template.render(videos=videos).encode('utf-8').strip()
f = open('%s/index.html' % (args.output), "w")
f.write(homepage_result)
f.close()
for video in videos:
video_result = video_template.render(videos=videos, video=video).encode('utf-8').strip()
if args.prependuri:
uri = slugify('%s-%s-%s' % (args.prependuri, video['name'],video['video_id']))
elif args.prependurichannel:
uri = slugify('%s-%s-%s' % (db_channel['name'], video['name'],video['video_id']))
else:
uri = slugify('%s-%s' % (video['name'],video['video_id']))
f = open('%s/video/%s.html' % (args.output, uri), "w")
f.write(video_result)
f.close()
db.close()
return 0
if __name__ == '__main__':
main()
import sqlite3
from apiclient.discovery import build
import argparse
from subprocess import call
import os
#import youtube_dl
api_key = ''
def init_youtube():
return build('youtube', 'v3', developerKey=api_key)
def init_database(path):
connection = sqlite3.connect(path)
cursor = connection.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS channel (id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT, name VARCHAR(70))')
cursor.execute('CREATE TABLE IF NOT EXISTS video (id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id INTEGER, name VARCHAR(70), video_id TEXT, published_at TEXT, is_favorite TINYINT(1), output_file TEXT)')
return connection
def youtube_lookup_channel(youtube, channelId):
response = youtube.search().list(
part='id,snippet',
type='channel',
channelId=channelId,
maxResults=1,
order='date'
).execute()
for search_result in response.get('items', []):
if 'channelId' in search_result['id'] and search_result['id']['channelId'] == channelId:
return search_result
return None
def youtube_lookup_channel_videos(youtube, channelId, nextPageToken = None):
if nextPageToken is not None:
search_response = youtube.search().list(
part='id,snippet',
type='video',
pageToken=nextPageToken,
channelId=channelId,
maxResults=50,
order='date'
).execute()
else:
search_response = youtube.search().list(
part='id,snippet',
type='video',
channelId=channelId,
maxResults=50,
order='date'
).execute()
return search_response
def db_find_channel(dbconn, channelId):
cursor = dbconn.cursor()
cursor.execute('''SELECT * FROM channel WHERE channel_id = ?''', (channelId,))
row = cursor.fetchone()
if row is not None:
return { "id" : row[0], "channel_id" : row[1], "name" : row[2]}
return
def db_create_channel(dbconn, channelId, channelName):
cursor = dbconn.cursor()
cursor.execute('''INSERT INTO channel VALUES(NULL,?,?)''', (channelId,channelName,))
ret = { "id" : cursor.lastrowid, "channel_id" : channelId, "name" : channelName }
dbconn.commit()
return ret
def db_find_video(dbconn, channelId, videoId):
cursor = dbconn.cursor()
cursor.execute('''SELECT * FROM video WHERE channel_id = ? AND video_id = ?''', (channelId, videoId, ))
row = cursor.fetchone()
if row is not None:
return { "id" : row[0], "channel_id" : row[1], "name" : row[2], "video_id" : row[3], "published_at" : row[4], "is_favorite" : row[5], "output_file" : row[6] }
return
def db_find_videos(dbconn, channelId):
cursor = dbconn.cursor()
cursor.execute('''SELECT * FROM video WHERE channel_id = ?''', (channelId, ))
ret = []
for row in cursor.fetchall():
ret.append({ "id" : row[0], "channel_id" : row[1], "name" : row[2], "video_id" : row[3], "published_at" : row[4], "is_favorite" : row[5], "output_file" : row[6] })
return ret
def db_create_video(dbconn, channelId, videoName, videoId, publishedAt, isFavorite = 0, outputFile = ''):
cursor = dbconn.cursor()
cursor.execute('''INSERT INTO video VALUES(NULL,?,?,?,?,?,?)''', (channelId,videoName,videoId,publishedAt,isFavorite,outputFile))
ret = { "id" : cursor.lastrowid, "channel_id" : channelId, "name" : videoName, "video_id" : videoId, "published_at" : publishedAt, "is_favorite" : isFavorite, "output_file" : outputFile }
dbconn.commit()
return ret
def db_update_video(dbconn, video):
cursor = dbconn.cursor()
cursor.execute('''UPDATE video SET channel_id = ?, name = ?, video_id = ?, published_at = ?, is_favorite = ?, output_file = ? WHERE id = ?''', (video['channel_id'],video['name'],video['video_id'], video['published_at'], video['is_favorite'], video['output_file'], video['id']))
dbconn.commit()
return video
def process_videos(dbconn, db_channel, item_results):
for result in item_results:
if 'videoId' in result['id']:
db_video = db_find_video(dbconn, db_channel['id'], result['id']['videoId'])
if db_video is None:
print 'Creating Video %s ID %s Channel %d Created On %s' % ( result['id']['videoId'], result['snippet']['title'], db_channel['id'], result['snippet']['publishedAt'])
db_video = db_create_video(dbconn, db_channel['id'], result['snippet']['title'], result['id']['videoId'], result['snippet']['publishedAt'])
def main():
parser = argparse.ArgumentParser(description='YouTube Channel Video Downloader')
parser.add_argument("channel_id", help="The YouTube channel ID")
parser.add_argument("-o", "--output", help="Path to download files to", default='')
parser.add_argument("-d", "--database", help="Path to database file", default='database.sqlite3')
args = parser.parse_args()
youtube = init_youtube();
db = init_database(args.database);
ytchannel = youtube_lookup_channel(youtube, args.channel_id)
if ytchannel is None:
print 'YouTube Channel ID %s Not Found' % args.channel_id
return 1
db_channel = db_find_channel(db, ytchannel['id']['channelId']);
if db_channel is None:
print "Creating Channel with ID %s - %s" % ( ytchannel['id']['channelId'], ytchannel['snippet']['title'] )
db_channel = db_create_channel(db, ytchannel['id']['channelId'], ytchannel['snippet']['title'])
print db_channel
response = youtube_lookup_channel_videos(youtube, db_channel['channel_id'])
process_videos(db, db_channel, response.get('items', []))
nextPageToken = response.get('nextPageToken', None)
while nextPageToken is not None:
response = youtube_lookup_channel_videos(youtube, db_channel['channel_id'], nextPageToken)
process_videos(db, db_channel, response.get('items', []))
nextPageToken = response.get('nextPageToken', None)
for video in db_find_videos(db, db_channel['id']):
if args.output:
out_dir = args.output
output_template = '%s/%s' % ( args.output, '%(id)s.%(ext)s' )
else:
out_dir = './'
output_template = '%s/%s' % ( args.output, '%(id)s.%(ext)s' )
call(["youtube-dl", '-o', output_template, 'https://www.youtube.com/watch?v=%s' % video['video_id']])
for file in os.listdir(out_dir):
if video['video_id'] in file and not file.endswith(".part"):
print 'Found File %s' % file
video['output_file'] = file
db_update_video(db, video)
db.close()
return 0
if __name__ == '__main__':
main()
youtube-dl
google-api-python-client
python-slugify
jinja2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment