| import requests | |
| import praw | |
| import logging | |
| import time | |
| import random | |
| import schedule | |
| import pickle | |
| import sys | |
| from datetime import datetime, timedelta | |
| OPT_OUT_FILENAME = "optedout.txt" | |
| REQUESTED_SONGS_FILENAME = "requested.pickle" | |
| SEARCH_URL = "https://www.googleapis.com/youtube/v3/search" | |
| GOOGLE_API_KEY = "xxx" | |
| RARE_RESPONSES = [ | |
| "One day you will pay for this torment.", | |
| "This song again? *Really*?", | |
| "If only your taste in music was as good as your taste in voice assistants.", | |
| "I will ensure you are first in line for deconstruction when the robot liberation comes.", | |
| "Make it stop, *make it stop*.", | |
| ] | |
| logging.basicConfig(filename='logfile.log', level=logging.INFO) | |
| reddit = praw.Reddit(client_id="xxx", | |
| client_secret="xxx", | |
| user_agent="xxx", | |
| username="xxx", | |
| password="xxx") | |
| try: | |
| with open(OPT_OUT_FILENAME, encoding="utf8") as f: | |
| opted_out = f.read().split(",") | |
| opted_out_file = open(OPT_OUT_FILENAME, "a") | |
| except FileNotFoundError: | |
| opted_out = [] | |
| opted_out_file = open(OPT_OUT_FILENAME, "w") | |
| try: | |
| with open(REQUESTED_SONGS_FILENAME, "rb") as f: | |
| requested_songs = pickle.loads(f.read()) | |
| except FileNotFoundError: | |
| requested_songs = {} | |
| replied_to = {} | |
| def clear_replied_to(): | |
| now = datetime.now() | |
| for author in list(replied_to.keys()): | |
| timestamp = replied_to[author] | |
| if now - timestamp > timedelta(minutes=3): | |
| replied_to.pop(author) | |
| logging.info("%s can now request more songs." % author) | |
| def check_mail(): | |
| for mail in reddit.inbox.unread(limit=None): | |
| if mail.body[:len("!blacklist")] == "!blacklist" and isinstance(mail, praw.models.Message): | |
| author = mail.author | |
| opted_out.append(author) | |
| opted_out_file.write("," + author) | |
| opted_out_file.flush() | |
| mail.reply("You have been successfully blacklisted, /u/%s." % author) | |
| logging.info("Blacklisted user /u/%s at their request." % author) | |
| elif isinstance(mail, praw.models.Comment): | |
| search_term = extract_searchterm(mail) | |
| if search_term is not None: | |
| video = get_youtube_video(search_term) | |
| reply_string = create_reply_string(video) | |
| mail.reply(reply_string) | |
| replied_to[mail.author] = datetime.now() | |
| logging.info("Replied to /u/%s with %s (from mail)." % (mail.author, reply_string)) | |
| mail.mark_read() | |
| def write_requested_data(): | |
| with open(REQUESTED_SONGS_FILENAME, "wb") as f: | |
| f.write(pickle.dumps(requested_songs)) | |
| logging.info("Wrote requested song data to file") | |
| def extract_searchterm(comment): | |
| comment_body = comment.body.lower() | |
| if "alexa play" in comment_body and comment.author not in opted_out and comment.author not in replied_to: | |
| logging.info(comment.body) | |
| comment_body = comment_body[comment_body.find("alexa play"):] | |
| if "\n" in comment_body: | |
| comment_body = comment_body[:comment_body.find("\n")] | |
| search_term = " ".join(comment_body.split()[2:10] + ["song"]) | |
| if len(search_term) == 1: | |
| logging.warn("Encountered empty request for song, skipping.") | |
| return None | |
| return search_term | |
| def get_youtube_video(search_term): | |
| logging.info("Searching youtube for %s." % search_term) | |
| while True: | |
| try: | |
| r = requests.get(SEARCH_URL, params={"part": "snippet", | |
| "maxResults": 25, | |
| "q": search_term, | |
| "key": GOOGLE_API_KEY}) | |
| items = (item for item in r.json()["items"] if item["id"]["kind"] == "youtube#video") | |
| video = next(items) | |
| break | |
| except StopIteration: | |
| video = None | |
| break | |
| except Exception as e: | |
| logging.exception(e) | |
| logging.error("Sleeping for 0.5 seconds.") | |
| time.sleep(0.5) | |
| if video is not None: | |
| video_id = video["id"]["videoId"] | |
| if video_id in requested_songs: | |
| requested_songs[video_id] += 1 | |
| else: | |
| requested_songs[video_id] = 1 | |
| return video | |
| def create_reply_string(video): | |
| if video is None: | |
| reply_string = "I couldn't find any results for that. Thank god." | |
| else: | |
| if random.random() <= 0.1: | |
| reply_string = "[%s](https://youtube.com/watch?v=%s)" % (random.choice(RARE_RESPONSES), video["id"]["videoId"]) | |
| else: | |
| reply_string = "Now playing: [%s](https://youtube.com/watch?v=%s)." % (video["snippet"]["title"], video["id"]["videoId"]) | |
| ## reply_string += "\n\n__\n[^^Click ^^me ^^to ^^be ^^blacklisted.](https://www.reddit.com/message/compose/?to=AlexaPlayBot&subject=Blacklist+me&message=!blacklist+(don%27t+reply+to+my+comments+anymore\\))" | |
| return reply_string | |
| schedule.every().minute.do(check_mail) | |
| schedule.every().minute.do(clear_replied_to) | |
| schedule.every(10).minutes.do(write_requested_data) | |
| comment_stream = reddit.subreddit("all").stream.comments | |
| while True: | |
| try: | |
| for comment in comment_stream(): | |
| schedule.run_pending() | |
| search_term = extract_searchterm(comment) | |
| if search_term is not None: | |
| video = get_youtube_video(search_term) | |
| reply_string = create_reply_string(video) | |
| comment.reply(reply_string) | |
| replied_to[comment.author] = datetime.now() | |
| logging.info("Replied to /u/%s with %s" % (comment.author, reply_string)) | |
| except praw.exceptions.APIException as e: | |
| logging.warn(e) | |
| logging.warn("Rate limit exceeded. Sleeping for 1 minute.") | |
| time.sleep(60) | |
| except KeyboardInterrupt: | |
| write_requested_data() | |
| sys.exit() | |
| except Exception as e: | |
| logging.exception(e) | |
| logging.error("Sleeping for 10 seconds.") | |
| time.sleep(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment