Skip to content

Instantly share code, notes, and snippets.

@csm10495
Created December 11, 2021 22:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save csm10495/d997b81f39d3153c609da27d9a52374a to your computer and use it in GitHub Desktop.
Save csm10495/d997b81f39d3153c609da27d9a52374a to your computer and use it in GitHub Desktop.
Quick/Dirty YouTube video id crawler
'''
Small script that tries to recursively find youtube video ids starting from given urls.
MIT License - Charles Machalow
'''
import time
import re
from datetime import timedelta, datetime
from typing import List, Set
from requests_html import HTMLSession
import requests
import random
import pathlib
import pickle
import getpass
import gzip
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
WATCH_IDS_CACHE_FILE = pathlib.Path('watch_ids')
class Crawler:
def __init__(self):
self.url = random.choice([
'https://www.youtube.com/',
])
# HTMLSession is nice since it tries to render javascript too
self.session = HTMLSession()
# despite the name, the url can also just be the video id
self.already_done_urls = set()
def crawl(self, td: timedelta= timedelta(seconds=10)) -> Set[str]:
'''
Crawls starting at self.url till the given time delta is up
'''
watch_ids = set()
self.death_time = time.time() + td.total_seconds()
while time.time() < self.death_time:
# print (f"Current url: {self.url}")
new_watch_ids = self.get_watch_ids_from_response(self.get(self.url))
if not new_watch_ids:
break
# this is not the most efficient way to do this:
while self.url in self.already_done_urls and time.time() < self.death_time:
self.url = random.choice(list(new_watch_ids))
watch_ids |= new_watch_ids
#while self.url in self.already_done_urls:
# self.url = next(iter(watch_ids))
print(f"Number of watch ids found: {len(watch_ids)}")
return watch_ids
def get(self, url: str) -> requests.Response:
self.already_done_urls.add(url)
# if its a watch url... follow it!
if not url.startswith(('https:', 'http:')):
url = f'https://www.youtube.com/watch?v={url}'
for i in range(10):
try:
return self.session.get(url, allow_redirects=True)
except ConnectionError:
time.sleep(i ** 2)
def get_watch_ids_from_response(self, response: requests.Response) -> Set[str]:
if response.ok:
ret = set([a.split('&')[0] for a in re.findall(r'watch\?v=(.+?)\"', response.text) if '\\' not in a and '...' not in a])
# print(f"GET({response.url}) found {len(ret)} watch ids")
return ret
else:
print(f"Warning: Error in response for GET({response.url}): {response.reason}")
return []
def multi_crawl(num=8, crawl_td: timedelta=timedelta(seconds=60), use_cache=True):
'''
Crawls using the given number of thread workers
'''
results = []
with ThreadPoolExecutor(max_workers=num) as pool:
for i in range(num):
results.append(pool.submit(Crawler().crawl, td=crawl_td))
final_out = set()
for result in results:
final_out |= result.result()
print(f"Total Number of watch ids found: {len(final_out)}")
if use_cache:
if not WATCH_IDS_CACHE_FILE.is_file():
WATCH_IDS_CACHE_FILE.write_bytes(pickle.dumps(set()))
s = pickle.loads(WATCH_IDS_CACHE_FILE.read_bytes())
s |= final_out
WATCH_IDS_CACHE_FILE.write_bytes(pickle.dumps(s))
print(f"Total from cache now: {len(s)}")
else:
s = final_out
return s
def add_to_cache_from_file(f):
'''
Takes a file with youtube links on each line and adds them to the cache
'''
txt = pathlib.Path(f).read_text()
ids = set()
for line in txt.splitlines():
if 'youtu.be' in line:
id = line.split('/')[-1]
elif 'youtube' in line:
id = line.split('v=', 1)[-1].split('&')[0]
else:
continue
if len(id) == 11:
ids.add(id)
s = pickle.loads(WATCH_IDS_CACHE_FILE.read_bytes())
s |= ids
WATCH_IDS_CACHE_FILE.write_bytes(pickle.dumps(s))
def cache_to_gz():
'''
Takes the cache and makes a txt.gz file
'''
s = pickle.loads(WATCH_IDS_CACHE_FILE.read_bytes())
txt = '\n'.join(s)
compressed_value = gzip.compress(bytes(txt, 'utf-8'))
now = datetime.now()
pathlib.Path(f'{getpass.getuser()}_{now.year}_{now.month}_{now.day}.txt.gz').write_bytes(compressed_value)
if __name__ == '__main__':
c = Crawler()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment