Created
August 30, 2018 07:59
-
-
Save aurimasniekis/a0f1c42ab10d2b7d047cbe15101963e8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Copyright © 2017 Kristof Toth <mrtoth@strongds.hu> | |
# This program is free software. It comes without any warranty, to the extent | |
# permitted by applicable law. You can redistribute it and/or modify it under the | |
# terms of the Do What The Fuck You Want To Public License, Version 2, | |
# as published by Sam Hocevar. See http://www.wtfpl.net/ for more details. | |
from subprocess import call, Popen, PIPE, DEVNULL | |
from os import listdir, remove | |
from os.path import splitext, exists, join, split | |
from re import match | |
from enum import Enum | |
from datetime import timedelta | |
from math import floor | |
from argparse import ArgumentParser | |
from signal import signal, SIGINT | |
from sys import exit | |
from copy import deepcopy | |
from collections import namedtuple | |
from functools import wraps | |
from tempfile import mkdtemp | |
from shutil import rmtree | |
from subprocess import check_output, CalledProcessError | |
from sys import exit | |
VERBOSE = True | |
def call_verbose(before_message='', after_message='Done!'): | |
def tag(f): | |
@wraps(f) | |
def wrapper(*args, **kwargs): | |
print_opt(before_message, end='', flush=True) | |
f(*args, **kwargs) | |
print_opt(after_message) | |
return wrapper | |
return tag | |
def print_opt(*args, **kwargs): | |
if VERBOSE: | |
print(*args, **kwargs) | |
def yes_no_question(question, default): | |
valid = {"yes": True, "y": True, "ye": True, | |
"no": False, "n": False} | |
if default is None: | |
prompt = " [y/n] " | |
elif default == "yes": | |
prompt = " [Y/n] " | |
elif default == "no": | |
prompt = " [y/N] " | |
else: | |
raise ValueError("Invalid default answer: {}!".format(default)) | |
while True: | |
print(question + prompt) | |
choice = input().lower() | |
if default is not None and choice == '': | |
return valid[default] | |
elif choice in valid: | |
return valid[choice] | |
else: | |
print("Please respond with 'yes'(y) or 'no'(n)!") | |
# tempfile.TemporaryDirectory replacement to provide backwards compatibility | |
class temporary_directory: | |
def __enter__(self): | |
self.name = mkdtemp() | |
return self.name | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
rmtree(self.name) | |
def get_output(*args, **kwargs): | |
return check_output(*args, **kwargs).decode().rstrip('\n') | |
@call_verbose(before_message='Checking your system for dependencies... ', after_message='Found all!') | |
def check_dependencies(check_for): | |
error_str = '\nMissing dependencies: {}' | |
missing = [] | |
for command in check_for: | |
try: check_output(command) | |
except (CalledProcessError, FileNotFoundError): missing.append(command[0]) | |
if missing: exit(error_str.format(', '.join(missing))) | |
class Stream(Enum): | |
AUDIO = 1 | |
VIDEO = 2 | |
class File(Enum): | |
LIST = 1 | |
LOOP = 2 | |
FRACTION = 3 | |
OUTPUT = 4 | |
class DownloadFailure(RuntimeError): | |
pass | |
class coub_dl: | |
default_files = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video', | |
File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction', | |
File.OUTPUT: 'out'} | |
output_files = {File.OUTPUT} | |
def __init__(self, url, files_dict): | |
self._url = url | |
self._files_dict = files_dict | |
self._loopdata = namedtuple('loopdata', ('base', 'fraction', 'time', 'file')) | |
def __call__(self): | |
# download streams and update FILE dict with extensions | |
self.download_audio_stream() | |
self.download_video_stream() | |
self.read_extensions() | |
self.check_downloads() | |
self.fix_video_stream() | |
# write concat helper file for ffmpeg | |
self.calculate_loops() | |
self.write_concat_helper() | |
# loop & mux streams | |
self.loop_shorter_stream() | |
self.mux_streams() | |
def download_audio_stream(self): | |
call(('youtube-dl', '--ignore-config', | |
'-f', 'html5-audio-high', | |
'--output', '{}.%(ext)s'.format(self._files_dict[Stream.AUDIO]), | |
self._url), | |
stdout=DEVNULL, stderr=DEVNULL) | |
def download_video_stream(self): | |
call(('youtube-dl', '--ignore-config', | |
'--output', '{}.%(ext)s'.format(self._files_dict[Stream.VIDEO]), | |
self._url), | |
stdout=DEVNULL, stderr=DEVNULL) | |
def read_extensions(self): | |
for stream in {Stream.AUDIO, Stream.VIDEO}: | |
sdir, sfile = split(self._files_dict[stream]) | |
for file in listdir(sdir): | |
if match('^{}\..+$'.format(sfile), file): | |
self._files_dict[stream] = join(sdir, file) | |
def check_downloads(self): | |
check = {Stream.VIDEO, Stream.AUDIO} | |
if not all({exists(self._files_dict[item]) for item in check}): | |
raise DownloadFailure() | |
def fix_video_stream(self): | |
""" magic fix for videos served by coub. see https://github.com/rg3/youtube-dl/issues/13754 """ | |
with open(self._files_dict[Stream.VIDEO], 'r+b') as f: | |
f.seek(0) | |
f.write(bytes(2)) | |
def calculate_loops(self): | |
audioLen = coub_dl.get_length(self._files_dict[Stream.AUDIO]) | |
videoLen = coub_dl.get_length(self._files_dict[Stream.VIDEO]) | |
longer = audioLen if audioLen > videoLen else videoLen | |
self._loopdata.time = audioLen if audioLen < videoLen else videoLen | |
self._loopdata.file = self._files_dict[Stream.AUDIO] if audioLen < videoLen else self._files_dict[Stream.VIDEO] | |
self._files_dict[File.LOOP] += splitext(self._loopdata.file)[1] | |
self._files_dict[File.FRACTION] += splitext(self._loopdata.file)[1] | |
times = longer.total_seconds() / self._loopdata.time.total_seconds() | |
self._loopdata.base = int(floor(times)) | |
self._loopdata.fraction = times % 1 | |
def write_concat_helper(self): | |
with open(self._files_dict[File.LIST], 'w') as f: | |
for i in range(self._loopdata.base): | |
f.write("file '{}'\n".format(self._loopdata.file)) | |
f.write("file '{}'\n".format(self._files_dict[File.FRACTION])) | |
def loop_shorter_stream(self): | |
# prepare last fractional loop | |
call(('ffmpeg', '-i', self._loopdata.file, '-t', str(self._loopdata.fraction * | |
self._loopdata.time.total_seconds()), | |
self._files_dict[File.FRACTION]), | |
stdout=DEVNULL, stderr=DEVNULL) | |
def mux_streams(self): | |
call(('ffmpeg', '-safe', '0', '-f', 'concat', '-i', self._files_dict[File.LIST], | |
'-i', self._files_dict[Stream.AUDIO], | |
'-c:v', 'copy', '-c:a', 'aac', '-strict', 'experimental', '-map', '0:v:0', '-map', '1:a:0', '-shortest', '-movflags', 'faststart', self._files_dict[File.OUTPUT]), | |
stdout=DEVNULL, stderr=DEVNULL) | |
@staticmethod | |
def get_length(file): | |
data = coub_dl.get_duration(coub_dl.get_command_stderr(('ffprobe', file))).split(':') | |
return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2])) | |
@staticmethod | |
def get_command_stderr(command): | |
process = Popen(command, stderr=PIPE, stdout=PIPE) | |
out, err = process.communicate() | |
return err | |
@staticmethod | |
def get_duration(ffprobe_output): | |
durationPattern = r'.*Duration:\s(.+),\sstart.*' | |
regex = match(durationPattern, str(ffprobe_output)) | |
duration = regex.groups()[0] if regex else None | |
if not duration: | |
raise ValueError('Cannot process ffprobe output!') | |
return duration | |
@staticmethod | |
def get_title(url): | |
return get_output(('youtube-dl', '--get-title', url)) | |
def run(URL, output, extension): | |
# create dict that contains files used | |
FILES = deepcopy(coub_dl.default_files) | |
determine_output_filename(URL, output, extension, FILES) | |
# ask what to do if output exists | |
if exists(FILES[File.OUTPUT]): | |
answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), | |
default='no') | |
if not answer: | |
print_opt('Exiting!') | |
exit() | |
else: | |
remove(FILES[File.OUTPUT]) | |
# create temporary directory to work in | |
with temporary_directory() as tempdir: | |
# update temporary file locations in FILES dict | |
for key in {key: FILES[key] for key in FILES if key not in coub_dl.output_files}: | |
FILES[key] = join(tempdir, FILES[key]) | |
coub_dl(URL, FILES)() | |
def determine_output_filename(url, user_supplied, extension, files_dict): | |
if user_supplied is None: | |
files_dict[File.OUTPUT] = coub_dl.get_title(url) | |
else: | |
files_dict[File.OUTPUT] = user_supplied | |
files_dict[File.OUTPUT] += extension | |
def parse_cmd_arguments(): | |
parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.') | |
parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user') | |
parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension)') | |
parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output') | |
parser.add_argument('URLs', type=str, nargs='+', help='The URLs of the sites containing the videos to download') | |
args = parser.parse_args() | |
args.extension = '.' + args.extension | |
return args | |
def decorate_coubdl_uimsgs(*args): | |
for item in args: | |
setattr(coub_dl, item[0], | |
call_verbose(**item[1])(getattr(coub_dl, item[0]))) | |
if __name__ == '__main__': | |
signal(SIGINT, lambda a, b: exit('\nExiting!')) | |
args = parse_cmd_arguments() | |
VERBOSE = False if args.nonverbose else True | |
decorate_coubdl_uimsgs(('download_audio_stream', {'before_message': 'Downloading audio stream... '}), | |
('download_video_stream', {'before_message': 'Downloading video stream... '}), | |
('loop_shorter_stream', {'before_message': 'Looping shorter stream... '}), | |
('mux_streams', {'before_message': 'Muxing streams... '})) | |
check_dependencies((('youtube-dl', '--version'), ('ffmpeg', '-version'))) | |
for url in set(args.URLs): | |
print_opt('\nCreating video from {}'.format(url)) | |
try: run(url, args.output, args.extension) | |
except DownloadFailure: exit('Failed to download streams! This usually happens when Coub changes something.') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Copyright © 2017 Kristof Toth <mrtoth@strongds.hu> | |
# This program is free software. It comes without any warranty, to the extent | |
# permitted by applicable law. You can redistribute it and/or modify it under the | |
# terms of the Do What The Fuck You Want To Public License, Version 2, | |
# as published by Sam Hocevar. See http://www.wtfpl.net/ for more details. | |
from subprocess import call, Popen, PIPE, DEVNULL | |
from os import listdir, remove | |
from os.path import splitext, exists, join, split | |
from re import match | |
from enum import Enum | |
from datetime import timedelta | |
from math import floor | |
from argparse import ArgumentParser | |
from signal import signal, SIGINT | |
from sys import exit | |
from copy import deepcopy | |
from collections import namedtuple | |
from functools import wraps | |
from tempfile import mkdtemp | |
from shutil import rmtree | |
from subprocess import check_output, CalledProcessError | |
from sys import exit | |
VERBOSE = True | |
def call_verbose(before_message='', after_message='Done!'): | |
def tag(f): | |
@wraps(f) | |
def wrapper(*args, **kwargs): | |
print_opt(before_message, end='', flush=True) | |
f(*args, **kwargs) | |
print_opt(after_message) | |
return wrapper | |
return tag | |
def print_opt(*args, **kwargs): | |
if VERBOSE: | |
print(*args, **kwargs) | |
def yes_no_question(question, default): | |
valid = {"yes": True, "y": True, "ye": True, | |
"no": False, "n": False} | |
if default is None: | |
prompt = " [y/n] " | |
elif default == "yes": | |
prompt = " [Y/n] " | |
elif default == "no": | |
prompt = " [y/N] " | |
else: | |
raise ValueError("Invalid default answer: {}!".format(default)) | |
while True: | |
print(question + prompt) | |
choice = input().lower() | |
if default is not None and choice == '': | |
return valid[default] | |
elif choice in valid: | |
return valid[choice] | |
else: | |
print("Please respond with 'yes'(y) or 'no'(n)!") | |
# tempfile.TemporaryDirectory replacement to provide backwards compatibility | |
class temporary_directory: | |
def __enter__(self): | |
self.name = mkdtemp() | |
return self.name | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
rmtree(self.name) | |
def get_output(*args, **kwargs): | |
return check_output(*args, **kwargs).decode().rstrip('\n') | |
@call_verbose(before_message='Checking your system for dependencies... ', after_message='Found all!') | |
def check_dependencies(check_for): | |
error_str = '\nMissing dependencies: {}' | |
missing = [] | |
for command in check_for: | |
try: check_output(command) | |
except (CalledProcessError, FileNotFoundError): missing.append(command[0]) | |
if missing: exit(error_str.format(', '.join(missing))) | |
class Stream(Enum): | |
AUDIO = 1 | |
VIDEO = 2 | |
class File(Enum): | |
LIST = 1 | |
LOOP = 2 | |
FRACTION = 3 | |
OUTPUT = 4 | |
class DownloadFailure(RuntimeError): | |
pass | |
class coub_dl: | |
default_files = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video', | |
File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction', | |
File.OUTPUT: 'out'} | |
output_files = {File.OUTPUT} | |
def __init__(self, url, files_dict): | |
self._url = url | |
self._files_dict = files_dict | |
self._loopdata = namedtuple('loopdata', ('base', 'fraction', 'time', 'file')) | |
def __call__(self): | |
# download streams and update FILE dict with extensions | |
self.download_audio_stream() | |
self.download_video_stream() | |
self.read_extensions() | |
self.check_downloads() | |
self.fix_video_stream() | |
# write concat helper file for ffmpeg | |
self.calculate_loops() | |
self.write_concat_helper() | |
# loop & mux streams | |
self.loop_shorter_stream() | |
self.mux_streams() | |
def download_audio_stream(self): | |
call(('youtube-dl', '--ignore-config', | |
'-f', 'html5-audio-high', | |
'--output', '{}.%(ext)s'.format(self._files_dict[Stream.AUDIO]), | |
self._url), | |
stdout=DEVNULL, stderr=DEVNULL) | |
def download_video_stream(self): | |
call(('youtube-dl', '--ignore-config', | |
'--output', '{}.%(ext)s'.format(self._files_dict[Stream.VIDEO]), | |
self._url), | |
stdout=DEVNULL, stderr=DEVNULL) | |
def read_extensions(self): | |
for stream in {Stream.AUDIO, Stream.VIDEO}: | |
sdir, sfile = split(self._files_dict[stream]) | |
for file in listdir(sdir): | |
if match('^{}\..+$'.format(sfile), file): | |
self._files_dict[stream] = join(sdir, file) | |
def check_downloads(self): | |
check = {Stream.VIDEO, Stream.AUDIO} | |
if not all({exists(self._files_dict[item]) for item in check}): | |
raise DownloadFailure() | |
def fix_video_stream(self): | |
""" magic fix for videos served by coub. see https://github.com/rg3/youtube-dl/issues/13754 """ | |
with open(self._files_dict[Stream.VIDEO], 'r+b') as f: | |
f.seek(0) | |
f.write(bytes(2)) | |
def calculate_loops(self): | |
audioLen = coub_dl.get_length(self._files_dict[Stream.AUDIO]) | |
videoLen = coub_dl.get_length(self._files_dict[Stream.VIDEO]) | |
longer = videoLen | |
self._loopdata.time = videoLen | |
self._loopdata.file = self._files_dict[Stream.AUDIO] if audioLen < videoLen else self._files_dict[Stream.VIDEO] | |
self._files_dict[File.LOOP] += splitext(self._loopdata.file)[1] | |
self._files_dict[File.FRACTION] += splitext(self._loopdata.file)[1] | |
times = longer.total_seconds() / self._loopdata.time.total_seconds() | |
self._loopdata.base = int(floor(times)) | |
self._loopdata.fraction = times % 1 | |
def write_concat_helper(self): | |
with open(self._files_dict[File.LIST], 'w') as f: | |
for i in range(self._loopdata.base): | |
f.write("file '{}'\n".format(self._loopdata.file)) | |
f.write("file '{}'\n".format(self._files_dict[File.FRACTION])) | |
def loop_shorter_stream(self): | |
# prepare last fractional loop | |
call(('ffmpeg', '-i', self._loopdata.file, '-t', str(self._loopdata.fraction * | |
self._loopdata.time.total_seconds()), | |
self._files_dict[File.FRACTION]), | |
stdout=DEVNULL, stderr=DEVNULL) | |
def mux_streams(self): | |
call(('ffmpeg', '-safe', '0', '-f', 'concat', '-i', self._files_dict[File.LIST], | |
'-i', self._files_dict[Stream.AUDIO], | |
'-c:v', 'copy', '-c:a', 'aac', '-strict', 'experimental', '-map', '0:v:0', '-map', '1:a:0', '-shortest', '-movflags', 'faststart', self._files_dict[File.OUTPUT]), | |
stdout=DEVNULL, stderr=DEVNULL) | |
@staticmethod | |
def get_length(file): | |
data = coub_dl.get_duration(coub_dl.get_command_stderr(('ffprobe', file))).split(':') | |
return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2])) | |
@staticmethod | |
def get_command_stderr(command): | |
process = Popen(command, stderr=PIPE, stdout=PIPE) | |
out, err = process.communicate() | |
return err | |
@staticmethod | |
def get_duration(ffprobe_output): | |
durationPattern = r'.*Duration:\s(.+),\sstart.*' | |
regex = match(durationPattern, str(ffprobe_output)) | |
duration = regex.groups()[0] if regex else None | |
if not duration: | |
raise ValueError('Cannot process ffprobe output!') | |
return duration | |
@staticmethod | |
def get_title(url): | |
return get_output(('youtube-dl', '--get-title', url)) | |
def run(URL, output, extension): | |
# create dict that contains files used | |
FILES = deepcopy(coub_dl.default_files) | |
determine_output_filename(URL, output, extension, FILES) | |
# ask what to do if output exists | |
if exists(FILES[File.OUTPUT]): | |
answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), | |
default='no') | |
if not answer: | |
print_opt('Exiting!') | |
exit() | |
else: | |
remove(FILES[File.OUTPUT]) | |
# create temporary directory to work in | |
with temporary_directory() as tempdir: | |
# update temporary file locations in FILES dict | |
for key in {key: FILES[key] for key in FILES if key not in coub_dl.output_files}: | |
FILES[key] = join(tempdir, FILES[key]) | |
coub_dl(URL, FILES)() | |
def determine_output_filename(url, user_supplied, extension, files_dict): | |
if user_supplied is None: | |
files_dict[File.OUTPUT] = coub_dl.get_title(url) | |
else: | |
files_dict[File.OUTPUT] = user_supplied | |
files_dict[File.OUTPUT] += extension | |
def parse_cmd_arguments(): | |
parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.') | |
parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user') | |
parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension)') | |
parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output') | |
parser.add_argument('URLs', type=str, nargs='+', help='The URLs of the sites containing the videos to download') | |
args = parser.parse_args() | |
args.extension = '.' + args.extension | |
return args | |
def decorate_coubdl_uimsgs(*args): | |
for item in args: | |
setattr(coub_dl, item[0], | |
call_verbose(**item[1])(getattr(coub_dl, item[0]))) | |
if __name__ == '__main__': | |
signal(SIGINT, lambda a, b: exit('\nExiting!')) | |
args = parse_cmd_arguments() | |
VERBOSE = False if args.nonverbose else True | |
decorate_coubdl_uimsgs(('download_audio_stream', {'before_message': 'Downloading audio stream... '}), | |
('download_video_stream', {'before_message': 'Downloading video stream... '}), | |
('loop_shorter_stream', {'before_message': 'Looping shorter stream... '}), | |
('mux_streams', {'before_message': 'Muxing streams... '})) | |
check_dependencies((('youtube-dl', '--version'), ('ffmpeg', '-version'))) | |
for url in set(args.URLs): | |
print_opt('\nCreating video from {}'.format(url)) | |
try: run(url, args.output, args.extension) | |
except DownloadFailure: exit('Failed to download streams! This usually happens when Coub changes something.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment