Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import requests
import praw
import logging
import time
import random
import schedule
import pickle
import sys
from datetime import datetime, timedelta
OPT_OUT_FILENAME = "optedout.txt"
REQUESTED_SONGS_FILENAME = "requested.pickle"
SEARCH_URL = "https://www.googleapis.com/youtube/v3/search"
GOOGLE_API_KEY = "xxx"
RARE_RESPONSES = [
"One day you will pay for this torment.",
"This song again? *Really*?",
"If only your taste in music was as good as your taste in voice assistants.",
"I will ensure you are first in line for deconstruction when the robot liberation comes.",
"Make it stop, *make it stop*.",
]
logging.basicConfig(filename='logfile.log', level=logging.INFO)
reddit = praw.Reddit(client_id="xxx",
client_secret="xxx",
user_agent="xxx",
username="xxx",
password="xxx")
try:
with open(OPT_OUT_FILENAME, encoding="utf8") as f:
opted_out = f.read().split(",")
opted_out_file = open(OPT_OUT_FILENAME, "a")
except FileNotFoundError:
opted_out = []
opted_out_file = open(OPT_OUT_FILENAME, "w")
try:
with open(REQUESTED_SONGS_FILENAME, "rb") as f:
requested_songs = pickle.loads(f.read())
except FileNotFoundError:
requested_songs = {}
replied_to = {}
def clear_replied_to():
now = datetime.now()
for author in list(replied_to.keys()):
timestamp = replied_to[author]
if now - timestamp > timedelta(minutes=3):
replied_to.pop(author)
logging.info("%s can now request more songs." % author)
def check_mail():
for mail in reddit.inbox.unread(limit=None):
if mail.body[:len("!blacklist")] == "!blacklist" and isinstance(mail, praw.models.Message):
author = mail.author
opted_out.append(author)
opted_out_file.write("," + author)
opted_out_file.flush()
mail.reply("You have been successfully blacklisted, /u/%s." % author)
logging.info("Blacklisted user /u/%s at their request." % author)
elif isinstance(mail, praw.models.Comment):
search_term = extract_searchterm(mail)
if search_term is not None:
video = get_youtube_video(search_term)
reply_string = create_reply_string(video)
mail.reply(reply_string)
replied_to[mail.author] = datetime.now()
logging.info("Replied to /u/%s with %s (from mail)." % (mail.author, reply_string))
mail.mark_read()
def write_requested_data():
with open(REQUESTED_SONGS_FILENAME, "wb") as f:
f.write(pickle.dumps(requested_songs))
logging.info("Wrote requested song data to file")
def extract_searchterm(comment):
comment_body = comment.body.lower()
if "alexa play" in comment_body and comment.author not in opted_out and comment.author not in replied_to:
logging.info(comment.body)
comment_body = comment_body[comment_body.find("alexa play"):]
if "\n" in comment_body:
comment_body = comment_body[:comment_body.find("\n")]
search_term = " ".join(comment_body.split()[2:10] + ["song"])
if len(search_term) == 1:
logging.warn("Encountered empty request for song, skipping.")
return None
return search_term
def get_youtube_video(search_term):
logging.info("Searching youtube for %s." % search_term)
while True:
try:
r = requests.get(SEARCH_URL, params={"part": "snippet",
"maxResults": 25,
"q": search_term,
"key": GOOGLE_API_KEY})
items = (item for item in r.json()["items"] if item["id"]["kind"] == "youtube#video")
video = next(items)
break
except StopIteration:
video = None
break
except Exception as e:
logging.exception(e)
logging.error("Sleeping for 0.5 seconds.")
time.sleep(0.5)
if video is not None:
video_id = video["id"]["videoId"]
if video_id in requested_songs:
requested_songs[video_id] += 1
else:
requested_songs[video_id] = 1
return video
def create_reply_string(video):
if video is None:
reply_string = "I couldn't find any results for that. Thank god."
else:
if random.random() <= 0.1:
reply_string = "[%s](https://youtube.com/watch?v=%s)" % (random.choice(RARE_RESPONSES), video["id"]["videoId"])
else:
reply_string = "Now playing: [%s](https://youtube.com/watch?v=%s)." % (video["snippet"]["title"], video["id"]["videoId"])
## reply_string += "\n\n__\n[^^Click ^^me ^^to ^^be ^^blacklisted.](https://www.reddit.com/message/compose/?to=AlexaPlayBot&subject=Blacklist+me&message=!blacklist+(don%27t+reply+to+my+comments+anymore\\))"
return reply_string
schedule.every().minute.do(check_mail)
schedule.every().minute.do(clear_replied_to)
schedule.every(10).minutes.do(write_requested_data)
comment_stream = reddit.subreddit("all").stream.comments
while True:
try:
for comment in comment_stream():
schedule.run_pending()
search_term = extract_searchterm(comment)
if search_term is not None:
video = get_youtube_video(search_term)
reply_string = create_reply_string(video)
comment.reply(reply_string)
replied_to[comment.author] = datetime.now()
logging.info("Replied to /u/%s with %s" % (comment.author, reply_string))
except praw.exceptions.APIException as e:
logging.warn(e)
logging.warn("Rate limit exceeded. Sleeping for 1 minute.")
time.sleep(60)
except KeyboardInterrupt:
write_requested_data()
sys.exit()
except Exception as e:
logging.exception(e)
logging.error("Sleeping for 10 seconds.")
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.