Skip to content

Instantly share code, notes, and snippets.

@anarcie
Created October 30, 2017 21:26
Show Gist options
  • Save anarcie/2677f2a2244dbf4938b176c3d4aa1aa3 to your computer and use it in GitHub Desktop.
Save anarcie/2677f2a2244dbf4938b176c3d4aa1aa3 to your computer and use it in GitHub Desktop.
4Chan Radio
from __future__ import unicode_literals
import requests,youtube_dl,os,shutil, isodate, random, re, string
import json
import time
import re, sys
import subprocess
import datetime
#Globals
global_Last_Request = time.time()
global_Request_Min_Delay = 1.2
global_ignore_list = []
global_max_song_length_min = 5
global_ydl_opts = {
'format': 'bestaudio/best',
'quiet': True,
'outtmpl': './songs/%(title)s.%(ext)s',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '192',
}]
}
def clearSongs():
folder = './songs'
for the_file in os.listdir(folder):
file_path = os.path.join(folder, the_file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
print(e)
def clearSongsWavs():
folder = './songs'
for the_file in os.listdir(folder):
file_path = os.path.join(folder, the_file)
try:
if os.path.isfile(file_path):
if ".wav" in file_path:
os.unlink(file_path)
except Exception as e:
print(e)
#Sends out a get request to 4chan
#Requests are rate limited to 1 request a second
#as per api docs
def sendGet(arg1):
try:
global global_Last_Request
current_Request = time.time()
#Dont exceed the request rate limit.
#Loop until rate limited exceeded
while(current_Request - global_Last_Request < global_Request_Min_Delay):
current_Request = time.time()
time.sleep(global_Request_Min_Delay/2)
print('Requesting {}'.format(arg1))
r = requests.get('http://a.4cdn.org/{}'.format(arg1), timeout=2.00)
#Update the last request time.
global_Last_Request = time.time()
return r.text
except Exception:
return False
def isMusic(cat, tags, duration):
if duration > global_max_song_length_min * 60:
print ('Song Too Long, Skipping: {} Seconds'.format(duration))
return False
if str(cat) in ['10']:
print ('Music: Catagory')
return True
for tag in tags:
if tag.lower() in ['music', 'song']:
print ('Music: Tags')
return True
print ('Music: Nope')
return False
def getCatagory(videoID):
global global_ignore_list
if videoID not in global_ignore_list:
try:
tags = ''
catagory = ''
print ('Lookup up video ID {}'.format(videoID))
apiKey = 'INSERT YOUR OWN DAMN KEY'
apiURL = 'https://www.googleapis.com/youtube/v3/videos?id={}&part=snippet,statistics,contentDetails&key={}'
youtubeURL = apiURL.format(videoID, apiKey)
r = requests.get(youtubeURL)
info = json.loads(r.text)
catagory = info['items'][0]['snippet']['categoryId']
tags = info['items'][0]['snippet']['tags']
duration = info['items'][0]['contentDetails']['duration']
duration= isodate.parse_duration(duration)
duration = duration.total_seconds()
global_ignore_list.append(videoID)
return isMusic(catagory, tags, duration)
except Exception as e:
print ('Error processing video ID {}'.format(videoID))
print (e)
return False
else:
print ('Skipping previously scanned video ID {}'.format(videoID))
def downloadAudio(videoURL):
try:
global global_ydl_opts
with youtube_dl.YoutubeDL(global_ydl_opts) as ydl:
dl = ydl.download([videoURL])
except Exception as e:
print (e)
#Get list of threads on a given board
def getBoard(shortName):
parseBoard(sendGet('{}/threads.json'.format(shortName)), shortName)
#Get the thread based on board and Thread ID
def getThread(board, threadNumber):
parseThread(sendGet('{}/thread/{}.json'.format(board, threadNumber)))
#Parse board, getting all threads
def parseBoard(resp, shortName):
total_threads = 0
curr_thread = 0
if resp:
for page in json.loads(resp):
threads = page['threads']
total_threads = len(threads) * len(threads)
for thread in threads:
curr_thread = curr_thread + 1
print ('Processing Thread: {} of {}'.format(str(curr_thread), str(total_threads)))
getThread(shortName, thread['no'])
#Parse a thread for Youtube links
def parseThread(resp):
if resp:
for post in json.loads(resp)['posts']:
if 'com' in post:
parseComment(post['com'])
def parseComment(com):
regexString = 'http(?:s?):\/\/(?:www\.)?youtu(?:be\.com\/watch\?v=|\.be\/)([\w\-\_]*)(&(amp;)?‌​[\w\?‌​=]*)?'
com = com.replace('<wbr>', '')
if 'youtube' in com:
matchList = re.finditer(regexString, com, re.IGNORECASE)
for match in matchList:
VideoID = match.group(1)
VideoURL = match.group()
if (getCatagory(VideoID)):
print (VideoURL)
downloadAudio(VideoID) #VideoURL
def normalizeName(name):
return "".join(i for i in name if ord(i)<128)
def NormalizeAll():
for dirpath,_,filenames in os.walk('./songs'):
for file in filenames:
if file.endswith('.mp3'):
print ("Normalizing File: {}".format(file))
fullPath = os.path.abspath(os.path.join(dirpath, file.replace("'", "\'")))
normalName = normalizeName(os.path.abspath(os.path.join(dirpath, file.replace("'", "\'"))))
os.rename(fullPath, normalName)
def convertAudio():
for dirpath,_,filenames in os.walk('./songs'):
for file in filenames:
if file.endswith('.wav'):
print ("Converting {} to MP3 format...".format(file))
try:
fullPath = normalizeName(os.path.abspath(os.path.join(dirpath, file.replace("'", "\'"))))
command = 'ffmpeg -i "./songs/{}" -vn -ar 44100 -ac 2 -ab 192k -f mp3 "./songs/{}.mp3"'.format(file,file.replace('.wav', '').replace("'", ""))
subprocess.check_output(command, stderr=subprocess.STDOUT,shell=True)
os.unlink(fullPath)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output))
def buildRadioList():
RadioList = open('radioList.txt', 'w+')
for dirpath,_,filenames in os.walk('./songs'):
random.shuffle(filenames)
for file in filenames:
if file.endswith('.mp3'):
fullPath = os.path.abspath(os.path.join(dirpath, file.replace("'", "")))
fullPath = normalizeName(fullPath)
if os.path.exists(fullPath):
RadioList.write('file \'' + fullPath + '\'\n')
def buildBroadcast(board):
try:
command = 'ffmpeg -y -f concat -safe 0 -i radioList.txt -c copy "./Broadcasts/4Chan Radio {}.mp3"'.format(board)
print(command)
subprocess.check_output(command, stderr=subprocess.STDOUT,shell=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output))
def processBoard(board):
clearSongs()
getBoard(board)
convertAudio()
NormalizeAll()
buildRadioList()
buildBroadcast(board)
#MAIN
#processBoard('r9k')
#processBoard('pol')
#processBoard('bant')
processBoard('mu')
processBoard('fit')
processBoard('a')
processBoard('c')
#processBoard('b')
print ("Complete")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment