Skip to content

Instantly share code, notes, and snippets.

@knabben
Created April 11, 2018 12:02
Show Gist options
  • Save knabben/5bf216d4bc11eeca082f49ec5eb1c5fe to your computer and use it in GitHub Desktop.
Save knabben/5bf216d4bc11eeca082f49ec5eb1c5fe to your computer and use it in GitHub Desktop.
Async fetch
#!/usr/bin/python
import os
import sys
import aiohttp
import asyncio
import yaml
from bs4 import BeautifulSoup
from concurrent import futures
from urllib.parse import unquote
from urllib import request
URL_PREFIX, USER, PASS = None, None, None
class Video(object):
"""
Video object container
"""
def __init__(self, path=None, name=None, article=None, category=None):
self.path = path
self.name = name
self.article = article
self.category = category
@staticmethod
def download_videos(videos):
"""
Download videos in parallel via URL
"""
with futures.ProcessPoolExecutor(max_workers=8) as executor:
future_to_url = {executor.submit(video.save_file): video.path
for video in videos}
# Consumer futures results
for future in futures.as_completed(future_to_url):
try:
future.result()
except Exception as exc:
print(exc)
def save_file(self):
"""
Do the HTTP request and save the response to a file
"""
data = ""
# Create default folder
default_path = '/tmp/videos/{}/{}/'.format(self.category, self.article)
if not os.path.isdir(default_path):
os.makedirs(default_path)
# Do HTTP request
with request.urlopen(self.path) as conn:
data = conn.read()
# Save response to a file
file_path = "{}{}".format(default_path, self.name)
print("Downloading at: {}".format(file_path))
with open(file_path, "wb") as file:
file.write(data)
def __repr__(self):
return "<Video: {}>".format(self.path)
class Article(object):
"""
Articles object container
"""
def __init__(self, identifier=None, path=None, category=None):
self.ident = identifier
self.path = path
self.category = category # Category name, we have no reverse relation
self.videos = []
def __repr__(self):
return "{} (course): path {}, ident {}".format(
self.category, self.path, self.ident
)
def save_video(self, video_url, name):
"""
Save video object with fetch url
"""
if not video_url:
print("ERROR on: ", video)
return
video = Video(path=video_url, name=name, article=self.ident,
category=self.category)
print("Fetching VIDEO: {} {}".format(video_url, name))
self.videos.append(video)
async def fill_videos(self, session, html):
text = BeautifulSoup(html, "html.parser")
for el in text.find_all('h4'):
try:
async with session.get(el.a.attrs['href']) as resp:
bs = BeautifulSoup(await resp.read(), "html.parser")
meta = bs.find_all('meta')
video_url = meta[-2:-1][0].attrs['content'].split('=')[-1]
name = meta[19].attrs['content']
# Save video object on videos list
self.save_video(unquote(video_url), name)
except (IndexError, AttributeError):
continue
async def fetch_videos(self, session):
"""
Fetch videos metadata
"""
url = "{}/{}".format(URL_PREFIX, self.path)
async with session.get(url) as resp:
await self.fill_videos(session, await resp.read())
class Category(object):
""" Defines a category, class should be used as a container """
def __init__(self, path):
self.path = path
self.name = self.path.split('/')[-1]
self.articles = []
def __repr__(self):
return "<Category: path {}, name {}>".format(self.path, self.name)
def fill_articles(self, html):
""" Create a list of courses """
text = BeautifulSoup(html, "html.parser")
for bs_article in text.find_all('a', 'link-overlay'):
try:
article_link = bs_article.attrs['href']
article_name = bs_article.attrs['id']
# Ignore the rest of links
if article_link.startswith('/courses/'):
article = Article(
identifier=article_name, path=article_link,
category=self.name
)
print(">> ARTICLE: ", article)
self.articles.append(article)
except (AttributeError, KeyError):
continue
async def fetch_articles(self, session):
url = "{}/{}".format(URL_PREFIX, self.path)
print("> FETCHING COURSE {}".format(url))
async with session.get(url) as resp:
self.fill_articles(await resp.read())
return self.articles
class Crawler(object):
def __init__(self, url):
self.client = aiohttp.ClientSession(loop=loop)
self.url = url
async def crawl(self):
"""
Crawler main function
"""
# login on client session
result_status = await self.login()
assert result_status == 200
# Fetch all categories
categories = await self.fetch_categories()
tasks = [loop.create_task(category.fetch_articles(self.client))
for category in categories]
await asyncio.gather(*tasks)
# From articles fill all videos URLs
all_articles, tasks = [], []
for category in categories:
for article in category.articles:
all_articles.append(article)
tasks.append(
loop.create_task(article.fetch_videos(self.client))
)
await asyncio.gather(*tasks)
# Launch videos download process, go parallel here
Video.download_videos(
[video for article in all_articles for video in article.videos]
)
async def fetch_categories(self):
"""
Fetch all categories from website
"""
print("> FETCHING ALL CATEGORIES")
url = "{}/courses".format(URL_PREFIX)
async with self.client.get(url) as resp:
bs = BeautifulSoup(await resp.text(), "html.parser")
links = bs.find_all('a', class_="anchor-to-technology")
return [Category(el.attrs['href']) for el in links]
async def login(self):
"""
Login on website and keep it on session
"""
url = "{}/users/sign_in".format(URL_PREFIX)
print("> FETCHING TOKEN DATA")
async with self.client.get(url) as resp:
try:
xsrf = resp.headers["Set-Cookie"].split(';')[0].split('=')[1]
except IndexError:
print("ERROR: No cookie settings here")
return
# fetch authenticity_token
token = BeautifulSoup(await resp.text(), "html.parser")
token = token.find_all('input', type='hidden')[1]['value']
# set data and headers to posterior access
data, headers = {
"authenticity_token": token,
"user[email]": USER,
"user[password]": PASS,
"utf8": "x"
}, {
"Referer": "https://egghead.io/users/sign_in",
"X-CSRF-Token": xsrf,
}
print("> LOGGING ON SITE")
url = "{}/users/sign_in".format(URL_PREFIX)
async with self.client.post(url, data=data, headers=headers) as resp:
return resp.status
def main(loop):
global URL_PREFIX, USER, PASS
URL_PREFIX, USER, PASS = data.get('website'), \
data.get('username'), data.get('password')
crawler = Crawler(URL_PREFIX)
loop.run_until_complete(crawler.crawl())
if __name__ == '__main__':
# read configuration from yaml
try:
data = yaml.load(open('.config.yml'))
except FileNotFoundError:
print("Configuration file not found.")
sys.exit(1)
loop = asyncio.get_event_loop()
main(loop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment