Skip to content

Instantly share code, notes, and snippets.

@jonbarrow
Created November 14, 2019 04:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jonbarrow/37dd13dac876e9d1860e2c43fe8b6f7c to your computer and use it in GitHub Desktop.
Save jonbarrow/37dd13dac876e9d1860e2c43fe8b6f7c to your computer and use it in GitHub Desktop.
Python scraper for wonderfulsubs.com
from pyee import BaseEventEmitter # Event emitter class base
import requests # request module
import asyncio # async utils
import concurrent.futures # futures (like a JS promise)
# constants
URL_BASE = "https://www.wonderfulsubs.com/api/media"
SEARCH_URL = "%s/search?q=" % URL_BASE
SERIES_URL = "%s/series?series=" % URL_BASE
STREAM_URL = "%s/stream?code=" % URL_BASE
HEADERS = {
"referer": "https://www.wonderfulsubs.com"
}
# scraper class extends eventemitter
class WonderfulSubs(BaseEventEmitter):
streams = [] # streams will populate here as they are found and emitted
session = requests.Session() # reuse a requests session to speed up requests
def __init__(self):
super().__init__() # setup BaseEventEmitter
self.session.trust_env = False # this speeds up the session by disabling environment proxys
# to speed up requests further, the requests made with this session will have proxies set to None
# and stream enabled. Disabling proxies will stop requests from checking for them and enabling
# stream means the request will immediately start downloading the response
# method to init scraping
def scrape(self, kitsu_details, episode_number=1):
# search for the correct title
titles = kitsu_details.get("attributes", {}).get("titles", {})
title_english = titles.get("en", None)
title_japanese_english = titles.get("en_jp", None)
title = title_english or title_japanese_english # WS prefers english titles over Japanese-english titles
if not title:
raise ValueError("Could not find a valid title")
# request the search results
response = self.session.get("%s%s" % (SEARCH_URL, title), headers=HEADERS, proxies=None, stream=True)
body = response.json()
search_results = body["json"]["series"]
# generator to look over the search results and return on first match. Default value is None
anime = next((anime for anime in search_results if anime["kitsu_id"] == kitsu_details["id"]), None)
if not anime:
raise ValueError("Could not find anime")
# the anime ID is the end of the URL. Can be easily grabbed by splitting the string and grabbing the last element in the array
anime_id = anime["url"].split("/")[-1]
# get the series data
response = self.session.get("%s%s" % (SERIES_URL, anime_id), headers=HEADERS, proxies=None, stream=True)
body = response.json()
seasons = body["json"]["seasons"]["ws"]["media"]
# generator to look over the seasons and return on first match. Default value is None
season = next((season for season in seasons if season["type"] == "episodes" and (season["title"] == title or season["japanese_title"] == title)), None)
if not season:
raise ValueError("Could not find anime season")
episodes = season["episodes"]
# generator to look over the episodes and return on first match. Default value is None
episode = next((episode for episode in episodes if episode["episode_number"] == episode_number), None)
if not episode:
raise ValueError("Could not find episode")
sources = episode["sources"]
urls = [] # will be populated with complete URLs instead of relative paths
# map the URLs since they come as relative paths only
for source in sources:
source_type = type(source["retrieve_url"])
# some results are arrays of URLs, some are single URls. Account for both
if source_type is str:
urls.append("%s%s" % (STREAM_URL, source["retrieve_url"]))
elif source_type is list:
for url in source["retrieve_url"]:
urls.append("%s%s" % (STREAM_URL, url))
# get the event loop and start parsing each URL
loop = asyncio.get_event_loop()
loop.run_until_complete(self.parse_sources(urls))
# when completely done, alert the client
self.emit("finished", self.streams)
# sets up asyncio tasks and thread pool
async def parse_sources(self, urls):
# create a worker pool with 20 max worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
tasks = [] # request tasks
# create a new task in the event loop and add it to the task list
loop = asyncio.get_event_loop()
for url in urls:
tasks.append(loop.run_in_executor(executor, self.get_source_streams, url))
# use asyncio to run the tasks and wait for them all to finish
for response in await asyncio.gather(*tasks):
pass # don't need to do anything with the response since there isn't one
# gets the actual streams
def get_source_streams(self, url):
# request stream data
response = self.session.get(url, headers=HEADERS)
body = response.json()
# if the response was a 404 then there was no data (ususally means a dead embed host)
if body["status"] != 404:
streams = body["urls"] # list of streams
# loop over each stream, format the response, and emit it to the client
for stream in streams:
meta_dict = {
"provider": 'WS',
"provider_full": 'Wonderful Subs',
"file": stream["src"]
}
if body.get("label", None) and body["label"] == 'Auto (HLS)':
meta_dict["m3u8"] = True
if stream["captions"]:
meta_dict["subtitles_file"] = stream["captions"]["src"]
self.emit("stream", meta_dict)
self.streams.append(meta_dict) # append to the master list
#############################
# TESTING #
#############################
# fake response from Kitsu
kitsu_details = {
"id": "12",
"attributes": {
"titles": {
"en": "One Piece",
"en_jp": "One Piece",
}
}
}
# create a new instance of the scraper
scraper = WonderfulSubs()
# setup an event handler for the class events
@scraper.on('stream')
def handler(stream):
print(stream)
# start scraping
scraper.scrape(kitsu_details)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment