Skip to content

Instantly share code, notes, and snippets.

@srhinos
Last active March 14, 2022 08:09
Show Gist options
  • Save srhinos/03edc73cbcf0bdd70f5c6e78457f7546 to your computer and use it in GitHub Desktop.
Save srhinos/03edc73cbcf0bdd70f5c6e78457f7546 to your computer and use it in GitHub Desktop.
import asyncio
import functools
import logging
import os
from concurrent.futures import ThreadPoolExecutor
import youtube_dl
from .digital_ocean import DropletManager
allow_requests = True
log = logging.getLogger(__name__)
ytdl_format_options = {
"format": "bestaudio/best",
"outtmpl": "%(extractor)s-%(id)s-%(title)s.%(ext)s",
"restrictfilenames": True,
"noplaylist": True,
"nocheckcertificate": True,
"ignoreerrors": False,
"logtostderr": False,
"quiet": True,
"no_warnings": True,
"default_search": "auto",
"source_address": "0.0.0.0",
"forceipv4": True,
"proxy": None,
"usenetrc": True,
}
# Fuck your useless bugreports message that gets two link embeds and confuses users
youtube_dl.utils.bug_reports_message = lambda: ""
"""
Alright, here's the problem. To catch youtube-dl errors for their useful information, I have to
catch the exceptions with `ignoreerrors` off. To not break when ytdl hits a dumb video
(rental videos, etc), I have to have `ignoreerrors` on. I can change these whenever, but with async
that's bad. So I need multiple ytdl objects.
"""
class Downloader:
def __init__(self, download_folder=None):
self.thread_pool = ThreadPoolExecutor(max_workers=2)
self.droplet_manager = DropletManager(self.thread_pool)
self.download_folder = download_folder
self.last_safe_ytdl = None
self.initialized = False
self.initializing = False
@property
def ytdl(self):
return self.last_safe_ytdl
async def initialize(self):
self.initializing = True
await self.droplet_manager.initialize_manager()
self.last_safe_ytdl, _ = await self.build_ytdl_downloader("safe")
self.initialized = True
async def build_ytdl_downloader(self, downloader_type):
current_droplet_id = await self.droplet_manager.get_random_droplet()
ytdl_downloader = youtube_dl.YoutubeDL(
await self.droplet_manager.affix_ip(ytdl_format_options, current_droplet_id)
)
if self.download_folder:
otmpl = ytdl_downloader.params["outtmpl"]
ytdl_downloader.params["outtmpl"] = os.path.join(
self.download_folder, otmpl
)
if downloader_type == "safe":
ytdl_downloader.params["ignoreerrors"] = True
self.last_safe_ytdl = ytdl_downloader
return ytdl_downloader, current_droplet_id
async def build_clean_ytdl_downloader(self):
ytdl_downloader = youtube_dl.YoutubeDL(ytdl_format_options)
if self.download_folder:
otmpl = ytdl_downloader.params["outtmpl"]
ytdl_downloader.params["outtmpl"] = os.path.join(
self.download_folder, otmpl
)
return ytdl_downloader
async def extract_info(
self,
loop,
*args,
on_error=None,
retry_on_error=False,
build_clean=False,
**kwargs
):
"""
Runs ytdl.extract_info within the threadpool. Returns a future that will fire when it's done.
If `on_error` is passed and an exception is raised, the exception will be caught and passed to
on_error as an argument.
"""
while True:
if not self.initialized:
if not self.initializing:
await self.initialize()
else:
await asyncio.sleep(5)
else:
break
if build_clean:
yt_downloader = await self.build_clean_ytdl_downloader()
else:
yt_downloader, current_droplet_id = await self.build_ytdl_downloader(
"unsafe"
)
droplet_info = await self.droplet_manager.get_droplet_info(
current_droplet_id
)
log.info(
'downloader built on region {} using IP "{}"'.format(
droplet_info.get("region"), droplet_info.get("ip_address"),
)
)
def ytdl_wrapper(yt_dloader, *args, **kwargs):
try:
return_stuff = yt_dloader.extract_info(*args, **kwargs)
print(".........")
print(f"{return_stuff}"[:25])
print(".........")
return return_stuff
except Exception as e:
print("============")
print(e)
print("============")
e.is_error = True
if not hasattr(e, "msg"):
e.msg = str(e)
return e
if callable(on_error):
try:
returnable = await loop.run_in_executor(
self.thread_pool,
functools.partial(ytdl_wrapper, yt_downloader, *args, **kwargs),
)
if hasattr(returnable, "is_error"):
print(". . .")
print(returnable)
print(dir(returnable))
print(". . .")
if hasattr(returnable, "msg") and ("Unable to extract" in returnable.msg or "429" in returnable.msg or "403" in returnable.msg or "Errno 111" in returnable.msg):
log.warning(
"Bullshit blockage error in downloader, attempting to fetch new ip..."
)
await self.droplet_manager.delete_droplet(current_droplet_id)
returnable = await self.extract_info(
loop,
*args,
on_error=on_error,
retry_on_error=retry_on_error,
build_clean=build_clean,
**kwargs
)
else:
raise returnable
return returnable
except Exception as e:
if hasattr(e, "msg") and (
"Unable to extract" in e.msg or "429" in e.msg or "403" in e.msg or "Errno 111" in e.msg
):
log.warning(
"Bullshit blockage error in downloader, attempting to fetch new ip..."
)
await self.droplet_manager.delete_droplet(current_droplet_id)
return await self.extract_info(
loop,
*args,
on_error=on_error,
retry_on_error=retry_on_error,
build_clean=build_clean,
**kwargs
)
# (youtube_dl.utils.ExtractorError, youtube_dl.utils.DownloadError)
# I hope I don't have to deal with ContentTooShortError's
if asyncio.iscoroutinefunction(on_error):
asyncio.ensure_future(on_error(e), loop=loop)
elif asyncio.iscoroutine(on_error):
asyncio.ensure_future(on_error, loop=loop)
else:
loop.call_soon_threadsafe(on_error, e)
if retry_on_error:
return await self.safe_extract_info(loop, *args, **kwargs)
else:
try:
returnable = await loop.run_in_executor(
self.thread_pool,
functools.partial(ytdl_wrapper, yt_downloader, *args, **kwargs),
)
if hasattr(returnable, "is_error"):
print(". . .")
print(returnable)
print(dir(returnable))
print(". . .")
if hasattr(returnable, "msg") and ("Unable to extract" in returnable.msg or "429" in returnable.msg or "403" in returnable.msg or "Errno 111" in returnable.msg):
log.warning(
"Bullshit blockage error in downloader, attempting to fetch new ip..."
)
await self.droplet_manager.delete_droplet(current_droplet_id)
returnable = await self.extract_info(
loop,
*args,
on_error=on_error,
retry_on_error=retry_on_error,
build_clean=build_clean,
**kwargs
)
else:
raise returnable
return returnable
except Exception as e:
if hasattr(e, "msg") and (
"Unable to extract" in e.msg or "429" in e.msg or "403" in e.msg or "Errno 111" in e.msg
):
log.warning(
"Bullshit blockage error in downloader, attempting to fetch new ip..."
)
await self.droplet_manager.delete_droplet(current_droplet_id)
return await self.extract_info(
loop,
*args,
on_error=on_error,
retry_on_error=retry_on_error,
build_clean=build_clean,
**kwargs
)
async def safe_extract_info(self, loop, *args, **kwargs):
def ytdl_wrapper(yt_dloader, *args, **kwargs):
try:
return_stuff = yt_dloader.extract_info(*args, **kwargs)
print(".........")
print(f"{return_stuff}"[:25])
print(".........")
return return_stuff
except Exception as e:
print("============")
print(e)
print("============")
e.is_error = True
if not hasattr(e, "msg"):
e.msg = str(e)
return e
yt_downloader, current_droplet_id = await self.build_ytdl_downloader("safe")
returnable = await loop.run_in_executor(
self.thread_pool,
functools.partial(ytdl_wrapper, yt_downloader, *args, **kwargs),
)
if hasattr(returnable, "is_error"):
# print(". . .")
# print(returnable)
# print(". . .")
if hasattr(returnable, "msg") and ("Unable to extract" in returnable.msg or "429" in returnable.msg or "403" in returnable.msg or "Errno 111" in returnable.msg):
log.warning(
"Bullshit blockage error in downloader, attempting to fetch new ip..."
)
await self.droplet_manager.delete_droplet(current_droplet_id)
returnable = await self.safe_extract_info(loop, *args, **kwargs)
else:
raise returnable
return returnable
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment