Created
June 9, 2017 23:26
-
-
Save anonymous/fff90607eaef99c23c3e70d4cdcf0ff3 to your computer and use it in GitHub Desktop.
sneakpeekbot source
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import praw | |
import prawcore.exceptions | |
import json | |
import regex as re | |
import os | |
from time import sleep, time | |
from collections import OrderedDict | |
import signal | |
import traceback | |
# Dictionary where key is the subreddit linking from and the value is the subreddit being linked to | |
ignore_links_to_from = {} | |
# List of subreddits that do not want any NSFW links | |
no_nsfw_comments = [] | |
# Dictionary where key is post ID and value is a list of subreddits processed in that post | |
submissions = {} | |
# Unwanted regex patterns, if any of these are present in the comment then that comment is ignored | |
# All of these are self-explanatory except the last one which matches if the subreddit is in a reddit quote block | |
patterns = ["((?<!top posts)\sover (?:to|in|at) /?r/)", | |
"((?<!top post)\sover (?:to|in|at) /?r/)", | |
"also,? check out /r/", | |
"you can check /r/", | |
"ask (?:this\s)?(?:in\s|at\s)?/?r/", | |
"ask the [a-z]+ (?:on|in) /?r/", | |
"/?r/\w+ has a [a-z]{3,}?ly", | |
"OP in /?r/" | |
"go to /?r/\w+ and search", | |
"asking in /?r/", | |
"I asked in /?r/", | |
"try asking (?:this\s)?on /?r/", | |
"try /r/\w+\?", | |
"/?r/\w+'s sidebar", | |
"asking (?:this\s)?over at /?r/", | |
"your question to /?r/", | |
"post this in /?r/", | |
"post it in /?r/", | |
"posted to /?r/", | |
"repost to /?r/", | |
"(?:she|he) posted on /?r/", | |
"try posting (?:this\s)?in /?r/", | |
"have you tried /?r/", | |
"mod(?:erator)?s? (?:of|in|on|for) /?r/", | |
"/?r/\w+ is (?:a\s)shit", | |
"I'm not subbed to /?r/", | |
"I am not subbed to /?r/", | |
"unsubscribe from /?r/", | |
"I hate /?r/", | |
"(?:run|go) back to /?r/", | |
"(?:deleted|banned) from /?r/", | |
"selling in /?r/", | |
"~~/r/\w+~~", | |
"(?:^\s*>|\s*>)[^\\\n]+/r/\w+[^\\\n\\\n]+"] | |
def check_scores(): | |
# This function should be called periodically as part of the main scan. This was removed from the code. | |
limit = 500 | |
threshold = 0 | |
for my_comment in bot_profile.comments.new(limit=limit): | |
if my_comment.score < threshold: | |
# If you need the URL of the parent comment logged: | |
# url = my_comment.permalink().replace(my_comment.id, my_comment.parent_id[3:]) | |
my_comment.delete() | |
def check_comments(): | |
# All of the individual checks in if conditions are split up for logging purposes but the specific logging | |
# has been removed from this file | |
for comment in r_all.stream.comments(): | |
# Could also search the comment's text instead of the html but this way we let reddit handle subreddit links | |
# that aren't really links | |
found_subs = re.findall("<a href=\"/r/(\w+)\">/?r/", comment.body_html) | |
# No subreddit links in this batch of comments, stream some more | |
if not found_subs: | |
continue | |
# Only handle the first match | |
subreddit_name = found_subs[0].lower() | |
current_subreddit_name = str(comment.subreddit).lower() | |
# If summoned in a comment, handle multiple subreddits and then go to the next loop | |
if re.search("\+/?u/sneakpeekbot", comment.body): | |
if current_subreddit_name not in banned and current_subreddit_name not in custom_blacklist: | |
subreddit_scan(subreddit_name, comment, current_subreddit_name, True) | |
continue | |
# Conditions that would stop the comment from being processed | |
conditions = [subreddit_name in top500subs, | |
subreddit_name in memes, | |
subreddit_name in bot_subreddits, | |
subreddit_name in custom_ignore_link, | |
comment.author in bot_users, | |
comment.author in custom_blacklist_users, | |
current_subreddit_name == subreddit_name, | |
current_subreddit_name in banned, | |
current_subreddit_name in custom_blacklist, | |
comment.parent() in posted_comments_id, | |
comment.is_root, | |
len(set(found_subs)) > 2] | |
if any(conditions): | |
continue | |
if str(comment.submission) in submissions: | |
if subreddit_name in submissions[str(comment.submission)]: | |
# Subreddit has already been processed in this post | |
continue | |
elif len(submissions[str(comment.submission)]) >= 3: | |
# Limit of 3 sneak peeks per post | |
continue | |
# Conditions dependent on an API request are handled separately to avoid waste | |
# Used to have more conditions but they were temporarily removed | |
conditions_api = [comment.author in comment.subreddit.moderator] | |
if any(conditions_api): | |
continue | |
# The (...|...).* is a better way of joining multiple regex statements and returning when any are matched | |
if re.findall("(" + "|".join(patterns) + ").*", comment.body, flags=re.IGNORECASE): | |
# Unwanted pattern was matched, ignore this comment | |
continue | |
if current_subreddit_name in ignore_links_to_from: | |
if subreddit_name in ignore_links_to_from[current_subreddit_name]: | |
# This particular link shouldn't be processed in this particular subreddit | |
continue | |
# Process the comment now | |
try: | |
subreddit_scan(subreddit_name, comment, current_subreddit_name, False) | |
except praw.exceptions.APIException as e_API: | |
# Posting too much in this subreddit | |
# Handle the error | |
pass | |
except (prawcore.exceptions.Redirect, prawcore.exceptions.NotFound) as e_404: | |
# Linked subreddit does not exist | |
# Handle the error | |
pass | |
except prawcore.exceptions.Forbidden as e_403: | |
# Either the bot is banned from the subreddit or the linked subreddit is banned/quarantined | |
# Handle the error | |
pass | |
def subreddit_scan(subreddit_name, comment, current_subreddit, summon): | |
if summon: | |
subreddits_all = re.findall("/?r/(\w+)+", comment.body) | |
# Get unique subreddits/Remove duplicates before processing | |
subreddits = list(OrderedDict.fromkeys(subreddits_all).keys()) | |
# If not multiple subreddits then can be handled the usual way | |
# TODO: Remove code reuse | |
if len(subreddits) > 1: | |
in_nsfw_subreddit = comment.subreddit.over18 | |
string_pattern = "**{}{}:**\n\n\#1: {} \n\#2: {} \n\#3: {}\n\n----\n\n" | |
post_strings = [] | |
for subreddit_multi in subreddits: | |
subreddit = reddit.subreddit(subreddit_multi) | |
if subreddit.over18 and not in_nsfw_subreddit: | |
# NSFW subreddit links ignored when the bot is being summoned | |
continue | |
posts_multi = [] | |
for submission in subreddit.top(limit=3): | |
nsfw_post_string = "[NSFW] " if submission.over_18 and not subreddit.over18 else "" | |
# Handle special characters in the title so reddit's markup isn't broken | |
title = str(submission.title).replace("[", "\[").replace("]", "\]") | |
if title.endswith("\\"): | |
title = title[:-1] + "\ " | |
post_url = submission.url.replace("//www.reddit.com", "//np.reddit.com")\ | |
.replace("(", "%28").replace(")", "%29") | |
# Separate comments link if not self post. Ternary operators used just because | |
comments_link_string = " | [{} comment{}](https://np.reddit.com{})".format( | |
submission.num_comments, "" if submission.num_comments == 1 else "s", | |
submission.permalink) if not submission.is_self else "" | |
comment_format = "[{}{}]({}){}".format(nsfw_post_string, title, post_url, | |
comments_link_string) | |
posts_multi.append(comment_format) | |
subreddit_np = "[/r/{}](https://np.reddit.com/r/{})".format(subreddit.display_name, | |
subreddit.display_name) | |
nsfw_string = " [NSFW]" if subreddit.over18 else "" | |
post_strings.append(string_pattern.format( | |
subreddit_np, nsfw_string, posts_multi[0], posts_multi[1], posts_multi[2])) | |
comment_reply_string = "**Hi, here's a sneak peek of those subreddits using the top posts of all time!**\n\n" | |
footer_string = "^^I'm ^^a ^^bot, ^^beep ^^boop ^^| [^^Contact ^^me](https://www.reddit.com/message/compose/?to=sneakpeekbot) ^^| [^^Info](https://np.reddit.com/r/sneakpeekbot/) ^^| [^^Opt-out](https://np.reddit.com/r/sneakpeekbot/comments/5lveo6/blacklist/)" | |
for index, subreddit_posts in enumerate(post_strings): | |
if index == 3: | |
comment_reply_string += "^^I'm ^^only ^^showing ^^you ^^the ^^first ^^3 ^^subreddits ^^\(out ^^of ^^the ^^{} ^^possible) ^^to ^^avoid ^^spamming ^^the ^^page \n".format( | |
len(post_strings)) | |
break | |
comment_reply_string += subreddit_posts | |
my_comment_id = str(comment.reply(comment_reply_string + footer_string)) | |
save_ids(comment.id, my_comment_id, str(comment.submission), subreddits) | |
return | |
else: | |
# Makes it more explicit that we are passing to the default code | |
pass | |
subreddit = reddit.subreddit(subreddit_name) | |
posts = [] | |
in_nsfw_subreddit = comment.subreddit.over18 | |
if summon and subreddit.over18 and not in_nsfw_subreddit: | |
# NSFW subreddit links ignored when the bot is being summoned | |
return | |
if subreddit.over18: | |
if current_subreddit in no_nsfw_comments: | |
# NSFW subreddit link in a subreddit where the mods have requested SFW only peeks | |
return | |
nsfw_string = " [NSFW]" | |
else: | |
nsfw_string = "" | |
# Subreddit more than 3 years old | |
if (time() - subreddit.created_utc) / (60 * 60 * 24) > (3 * 365): | |
time_filter = "year" | |
time_filter_string = "the year" | |
top_posts_link = "https://np.reddit.com/r/" + subreddit.display_name + "/top/?sort=top&t=year" | |
else: | |
time_filter = "all" | |
time_filter_string = "all time" | |
top_posts_link = "https://np.reddit.com/r/" + subreddit.display_name + "/top/?sort=top&t=all" | |
for submission in subreddit.top(time_filter=time_filter, limit=3): | |
nsfw_post_string = "[NSFW] " if submission.over_18 and not subreddit.over18 else "" | |
# Handle special characters in the title so reddit's markup isn't broken | |
title = str(submission.title).replace("[", "\[").replace("]", "\]") | |
post_url = submission.url.replace("//www.reddit.com", "//np.reddit.com")\ | |
.replace("(", "%28").replace(")", "%29") | |
# Separate comments link if not self post. Ternary operators used just because | |
comments_link_string = " | [{} comment{}](https://np.reddit.com{})".format( | |
submission.num_comments, "" if submission.num_comments == 1 else "s", submission.permalink)\ | |
if not submission.is_self else "" | |
comment_format = "[{}{}]({}){}".format(nsfw_post_string, title, post_url, comments_link_string) | |
posts.append(comment_format) | |
if len(posts) < 3: | |
# Subreddit has less than 3 total posts, refuse to peek this | |
return | |
subreddit_np = "[/r/{}](https://np.reddit.com/r/{})".format(subreddit.display_name, subreddit.display_name) | |
message = "**Here's a sneak peek of {}{} using the [top posts]({}) of {}!**\n\n\#1: {} \n\#2: {} \n\#3: {}\n\n----\n" \ | |
"^^I'm ^^a ^^bot, ^^beep ^^boop ^^| ^^Downvote ^^to ^^remove ^^| [^^Contact ^^me](https://www.reddit.com/message/compose/?to=sneakpeekbot)" \ | |
" ^^| [^^Info](https://np.reddit.com/r/sneakpeekbot/) ^^| [^^Opt-out](https://np.reddit.com/r/sneakpeekbot/comments/5lveo6/blacklist/)" \ | |
.format(subreddit_np, nsfw_string, top_posts_link, time_filter_string, posts[0], posts[1], posts[2]) | |
my_comment_id = str(comment.reply(message)) | |
save_ids(comment.id, my_comment_id, str(comment.submission), subreddit.display_name) | |
def save_ids(comment_id, my_comment_id, submission_id, linked_subreddit): | |
posted_comments_id.append(comment_id) | |
posted_comments_id.append(my_comment_id) | |
with open("lists/comments_replied.txt", "w") as c_file: | |
for comment_id in posted_comments_id: | |
c_file.write(comment_id + "\n") | |
# Insert the processed subreddit inside the submissions dictionary | |
if isinstance(linked_subreddit, list): | |
# Multiple links, bot was summoned | |
for subreddit in linked_subreddit: | |
if submission_id in submissions: | |
submissions[submission_id].append(subreddit.lower()) | |
else: | |
submissions[submission_id] = [subreddit.lower()] | |
else: | |
if submission_id in submissions: # TODO: clean this | |
submissions[submission_id].append(linked_subreddit.lower()) | |
else: | |
submissions[submission_id] = [linked_subreddit.lower()] | |
with open("lists/submissions.txt", "w") as s_file: | |
json.dump(submissions, s_file, sort_keys=True) | |
def txt_to_list(file_path): | |
return_list = [] | |
if os.path.isfile(file_path): | |
with open(file_path, "r") as text_file: | |
return_list = text_file.read() | |
return_list = return_list.split("\n") | |
return_list = list(filter(None, return_list)) | |
return return_list | |
def signal_handler(var1, var2): | |
# Useful for backing up files from a ephemeral server | |
quit() | |
if __name__ == "__main__": | |
# signal.signal(signal.SIGTERM, signal_handler) | |
# All of the following files are .txt files with the format of a value on separate lines | |
top500subs = txt_to_list("lists/top500subs.txt") | |
# Comments replied to already | |
posted_comments_id = txt_to_list("lists/comments_replied.txt") | |
# Certain subreddits that are only linked as a meme/hashtag, not all inclusive... | |
memes = txt_to_list("lists/memes.txt") | |
# Subreddits that have opted out | |
custom_blacklist = txt_to_list("lists/custom_blacklist.txt") | |
# Bots shouldn't hang around with other bots, what would Jesus think?? | |
bot_users = txt_to_list("lists/bot_users.txt") | |
# Subreddits that do not want to be peeked | |
custom_ignore_link = txt_to_list("lists/custom_ignore_link.txt") | |
bot_subreddits = txt_to_list("lists/bot_subreddits.txt") | |
# Similar to banned except these are the biggest subreddits in those lists and are removed from the subreddit scan | |
filter_list = txt_to_list("lists/filter_list.txt") | |
# Users that have opted out | |
custom_blacklist_users = txt_to_list("lists/custom_blacklist_users.txt") | |
banned = txt_to_list("lists/banned.txt") | |
# API credentials stored in praw.ini file in same folder as script | |
# See https://praw.readthedocs.io/en/latest/getting_started/configuration/prawini.html for more information | |
reddit = praw.Reddit('sneakpeekbot', user_agent='USERAGENT') | |
# "Filter" /r/all by removing the subreddits we don't want | |
r_all = reddit.subreddit("all-" + "-".join(filter_list)) | |
bot_profile = reddit.redditor("sneakpeekbot") | |
if os.path.isfile("lists/submissions.txt"): | |
with open("lists/submissions.txt") as f: | |
submissions = json.load(f) | |
check_scores() | |
# While loop used so that any error is logged and then the process continues | |
while True: | |
try: | |
check_comments() | |
except (prawcore.exceptions.Forbidden, prawcore.exceptions.ServerError) as e: | |
sleep(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment