Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple Hunchly data forwarding script that downloads any YouTube videos a user browses to.
# pip install flask pytube3
from flask import Flask,request
from pytube import YouTube
import hashlib
import json
import os
import shutil
import threading
import urllib
# set the IP address for it to listen on
LISTEN_IP = '127.0.0.1'
app = Flask(__name__)
if not os.path.exists("videodata"):
os.mkdir("videos")
os.mkdir("videodata")
#
# Helper function to hash the video in blocks.
#
def sum_video(filename):
h = hashlib.sha256()
with open(filename, 'rb') as f:
for chunk in iter(lambda: f.read(128 * 8196), b''):
h.update(chunk)
return h.hexdigest()
#
# This function does the heavy lifting for the YouTube retrieval.
#
def process_url(hunchly_page):
# now we run our enrichment plugins that we want
if hunchly_page == None:
return "OK"
# check the Url for YouTube (note you want more variations/intelligence here)
url_object = urllib.parse.urlparse(hunchly_page['page']['url'])
# now store it in your database of choice or do whatever you want!
if "youtube.com" in url_object.netloc:
# now we want to download the video
try:
yt = YouTube(hunchly_page['page']['url'])
ys = yt.streams.get_highest_resolution()
# download it to disk
ys.download(output_path="videos")
# hash the video
video_hash = sum_video("videos/{}".format(ys.default_filename))
row = {}
row['URL'] = hunchly_page['page']['url']
row['Author'] = yt.author
row['Date'] = str(yt.publish_date)
row['ChannelID'] = yt.channel_id
row['Description'] = yt.description.replace("\r","").replace("\n","")
row['Keywords'] = yt.keywords
row['Filesize'] = ys.filesize
row['Hash'] = video_hash
# store the JSON on disk for loading into BigQuery / Elasticsearch
with open("videodata/{}-metadata.json".format(video_hash),"w") as fd:
json.dump(row, fd)
# move the file to be named by hash
extension = ys.default_filename.split(".")[-1]
shutil.move("videos/{}".format(ys.default_filename),"videos/{}.{}".format(video_hash,extension))
# store the closed captioning for searching later
with open("videodata/{}-captions.txt".format(video_hash),"w") as fd:
for caption in yt.caption_tracks:
fd.write(caption.generate_srt_captions())
except:
pass
return
#
# Define the route for the data forwarder to hit.
#
@app.route('/',methods=["POST","GET"])
def newpage():
hunchly_page = request.json
# we spin up a separate thread so the Hunchly UI updates more quickly
t = threading.Thread(target=process_url,args=(hunchly_page,))
t.start()
return "OK"
if __name__ == "__main__":
import os
if 'WINGDB_ACTIVE' in os.environ:
app.debug = False
app.run(host=LISTEN_IP)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment