Skip to content

Instantly share code, notes, and snippets.

Created June 9, 2017 23:26
sneakpeekbot source
import praw
import prawcore.exceptions
import json
import regex as re
import os
from time import sleep, time
from collections import OrderedDict
import signal
import traceback
# Dictionary where key is the subreddit linking from and the value is the subreddit being linked to
ignore_links_to_from = {}
# List of subreddits that do not want any NSFW links
no_nsfw_comments = []
# Dictionary where key is post ID and value is a list of subreddits processed in that post
submissions = {}
# Unwanted regex patterns, if any of these are present in the comment then that comment is ignored
# All of these are self-explanatory except the last one which matches if the subreddit is in a reddit quote block
patterns = ["((?<!top posts)\sover (?:to|in|at) /?r/)",
"((?<!top post)\sover (?:to|in|at) /?r/)",
"also,? check out /r/",
"you can check /r/",
"ask (?:this\s)?(?:in\s|at\s)?/?r/",
"ask the [a-z]+ (?:on|in) /?r/",
"/?r/\w+ has a [a-z]{3,}?ly",
"OP in /?r/"
"go to /?r/\w+ and search",
"asking in /?r/",
"I asked in /?r/",
"try asking (?:this\s)?on /?r/",
"try /r/\w+\?",
"/?r/\w+'s sidebar",
"asking (?:this\s)?over at /?r/",
"your question to /?r/",
"post this in /?r/",
"post it in /?r/",
"posted to /?r/",
"repost to /?r/",
"(?:she|he) posted on /?r/",
"try posting (?:this\s)?in /?r/",
"have you tried /?r/",
"mod(?:erator)?s? (?:of|in|on|for) /?r/",
"/?r/\w+ is (?:a\s)shit",
"I'm not subbed to /?r/",
"I am not subbed to /?r/",
"unsubscribe from /?r/",
"I hate /?r/",
"(?:run|go) back to /?r/",
"(?:deleted|banned) from /?r/",
"selling in /?r/",
"~~/r/\w+~~",
"(?:^\s*>|\s*>)[^\\\n]+/r/\w+[^\\\n\\\n]+"]
def check_scores():
# This function should be called periodically as part of the main scan. This was removed from the code.
limit = 500
threshold = 0
for my_comment in bot_profile.comments.new(limit=limit):
if my_comment.score < threshold:
# If you need the URL of the parent comment logged:
# url = my_comment.permalink().replace(my_comment.id, my_comment.parent_id[3:])
my_comment.delete()
def check_comments():
# All of the individual checks in if conditions are split up for logging purposes but the specific logging
# has been removed from this file
for comment in r_all.stream.comments():
# Could also search the comment's text instead of the html but this way we let reddit handle subreddit links
# that aren't really links
found_subs = re.findall("<a href=\"/r/(\w+)\">/?r/", comment.body_html)
# No subreddit links in this batch of comments, stream some more
if not found_subs:
continue
# Only handle the first match
subreddit_name = found_subs[0].lower()
current_subreddit_name = str(comment.subreddit).lower()
# If summoned in a comment, handle multiple subreddits and then go to the next loop
if re.search("\+/?u/sneakpeekbot", comment.body):
if current_subreddit_name not in banned and current_subreddit_name not in custom_blacklist:
subreddit_scan(subreddit_name, comment, current_subreddit_name, True)
continue
# Conditions that would stop the comment from being processed
conditions = [subreddit_name in top500subs,
subreddit_name in memes,
subreddit_name in bot_subreddits,
subreddit_name in custom_ignore_link,
comment.author in bot_users,
comment.author in custom_blacklist_users,
current_subreddit_name == subreddit_name,
current_subreddit_name in banned,
current_subreddit_name in custom_blacklist,
comment.parent() in posted_comments_id,
comment.is_root,
len(set(found_subs)) > 2]
if any(conditions):
continue
if str(comment.submission) in submissions:
if subreddit_name in submissions[str(comment.submission)]:
# Subreddit has already been processed in this post
continue
elif len(submissions[str(comment.submission)]) >= 3:
# Limit of 3 sneak peeks per post
continue
# Conditions dependent on an API request are handled separately to avoid waste
# Used to have more conditions but they were temporarily removed
conditions_api = [comment.author in comment.subreddit.moderator]
if any(conditions_api):
continue
# The (...|...).* is a better way of joining multiple regex statements and returning when any are matched
if re.findall("(" + "|".join(patterns) + ").*", comment.body, flags=re.IGNORECASE):
# Unwanted pattern was matched, ignore this comment
continue
if current_subreddit_name in ignore_links_to_from:
if subreddit_name in ignore_links_to_from[current_subreddit_name]:
# This particular link shouldn't be processed in this particular subreddit
continue
# Process the comment now
try:
subreddit_scan(subreddit_name, comment, current_subreddit_name, False)
except praw.exceptions.APIException as e_API:
# Posting too much in this subreddit
# Handle the error
pass
except (prawcore.exceptions.Redirect, prawcore.exceptions.NotFound) as e_404:
# Linked subreddit does not exist
# Handle the error
pass
except prawcore.exceptions.Forbidden as e_403:
# Either the bot is banned from the subreddit or the linked subreddit is banned/quarantined
# Handle the error
pass
def subreddit_scan(subreddit_name, comment, current_subreddit, summon):
if summon:
subreddits_all = re.findall("/?r/(\w+)+", comment.body)
# Get unique subreddits/Remove duplicates before processing
subreddits = list(OrderedDict.fromkeys(subreddits_all).keys())
# If not multiple subreddits then can be handled the usual way
# TODO: Remove code reuse
if len(subreddits) > 1:
in_nsfw_subreddit = comment.subreddit.over18
string_pattern = "**{}{}:**\n\n\#1: {} \n\#2: {} \n\#3: {}\n\n----\n\n"
post_strings = []
for subreddit_multi in subreddits:
subreddit = reddit.subreddit(subreddit_multi)
if subreddit.over18 and not in_nsfw_subreddit:
# NSFW subreddit links ignored when the bot is being summoned
continue
posts_multi = []
for submission in subreddit.top(limit=3):
nsfw_post_string = "[NSFW] " if submission.over_18 and not subreddit.over18 else ""
# Handle special characters in the title so reddit's markup isn't broken
title = str(submission.title).replace("[", "\[").replace("]", "\]")
if title.endswith("\\"):
title = title[:-1] + "\ "
post_url = submission.url.replace("//www.reddit.com", "//np.reddit.com")\
.replace("(", "%28").replace(")", "%29")
# Separate comments link if not self post. Ternary operators used just because
comments_link_string = " | [{} comment{}](https://np.reddit.com{})".format(
submission.num_comments, "" if submission.num_comments == 1 else "s",
submission.permalink) if not submission.is_self else ""
comment_format = "[{}{}]({}){}".format(nsfw_post_string, title, post_url,
comments_link_string)
posts_multi.append(comment_format)
subreddit_np = "[/r/{}](https://np.reddit.com/r/{})".format(subreddit.display_name,
subreddit.display_name)
nsfw_string = " [NSFW]" if subreddit.over18 else ""
post_strings.append(string_pattern.format(
subreddit_np, nsfw_string, posts_multi[0], posts_multi[1], posts_multi[2]))
comment_reply_string = "**Hi, here's a sneak peek of those subreddits using the top posts of all time!**\n\n"
footer_string = "^^I'm ^^a ^^bot, ^^beep ^^boop ^^| [^^Contact ^^me](https://www.reddit.com/message/compose/?to=sneakpeekbot) ^^| [^^Info](https://np.reddit.com/r/sneakpeekbot/) ^^| [^^Opt-out](https://np.reddit.com/r/sneakpeekbot/comments/5lveo6/blacklist/)"
for index, subreddit_posts in enumerate(post_strings):
if index == 3:
comment_reply_string += "^^I'm ^^only ^^showing ^^you ^^the ^^first ^^3 ^^subreddits ^^\(out ^^of ^^the ^^{} ^^possible) ^^to ^^avoid ^^spamming ^^the ^^page \n".format(
len(post_strings))
break
comment_reply_string += subreddit_posts
my_comment_id = str(comment.reply(comment_reply_string + footer_string))
save_ids(comment.id, my_comment_id, str(comment.submission), subreddits)
return
else:
# Makes it more explicit that we are passing to the default code
pass
subreddit = reddit.subreddit(subreddit_name)
posts = []
in_nsfw_subreddit = comment.subreddit.over18
if summon and subreddit.over18 and not in_nsfw_subreddit:
# NSFW subreddit links ignored when the bot is being summoned
return
if subreddit.over18:
if current_subreddit in no_nsfw_comments:
# NSFW subreddit link in a subreddit where the mods have requested SFW only peeks
return
nsfw_string = " [NSFW]"
else:
nsfw_string = ""
# Subreddit more than 3 years old
if (time() - subreddit.created_utc) / (60 * 60 * 24) > (3 * 365):
time_filter = "year"
time_filter_string = "the year"
top_posts_link = "https://np.reddit.com/r/" + subreddit.display_name + "/top/?sort=top&t=year"
else:
time_filter = "all"
time_filter_string = "all time"
top_posts_link = "https://np.reddit.com/r/" + subreddit.display_name + "/top/?sort=top&t=all"
for submission in subreddit.top(time_filter=time_filter, limit=3):
nsfw_post_string = "[NSFW] " if submission.over_18 and not subreddit.over18 else ""
# Handle special characters in the title so reddit's markup isn't broken
title = str(submission.title).replace("[", "\[").replace("]", "\]")
post_url = submission.url.replace("//www.reddit.com", "//np.reddit.com")\
.replace("(", "%28").replace(")", "%29")
# Separate comments link if not self post. Ternary operators used just because
comments_link_string = " | [{} comment{}](https://np.reddit.com{})".format(
submission.num_comments, "" if submission.num_comments == 1 else "s", submission.permalink)\
if not submission.is_self else ""
comment_format = "[{}{}]({}){}".format(nsfw_post_string, title, post_url, comments_link_string)
posts.append(comment_format)
if len(posts) < 3:
# Subreddit has less than 3 total posts, refuse to peek this
return
subreddit_np = "[/r/{}](https://np.reddit.com/r/{})".format(subreddit.display_name, subreddit.display_name)
message = "**Here's a sneak peek of {}{} using the [top posts]({}) of {}!**\n\n\#1: {} \n\#2: {} \n\#3: {}\n\n----\n" \
"^^I'm ^^a ^^bot, ^^beep ^^boop ^^| ^^Downvote ^^to ^^remove ^^| [^^Contact ^^me](https://www.reddit.com/message/compose/?to=sneakpeekbot)" \
" ^^| [^^Info](https://np.reddit.com/r/sneakpeekbot/) ^^| [^^Opt-out](https://np.reddit.com/r/sneakpeekbot/comments/5lveo6/blacklist/)" \
.format(subreddit_np, nsfw_string, top_posts_link, time_filter_string, posts[0], posts[1], posts[2])
my_comment_id = str(comment.reply(message))
save_ids(comment.id, my_comment_id, str(comment.submission), subreddit.display_name)
def save_ids(comment_id, my_comment_id, submission_id, linked_subreddit):
posted_comments_id.append(comment_id)
posted_comments_id.append(my_comment_id)
with open("lists/comments_replied.txt", "w") as c_file:
for comment_id in posted_comments_id:
c_file.write(comment_id + "\n")
# Insert the processed subreddit inside the submissions dictionary
if isinstance(linked_subreddit, list):
# Multiple links, bot was summoned
for subreddit in linked_subreddit:
if submission_id in submissions:
submissions[submission_id].append(subreddit.lower())
else:
submissions[submission_id] = [subreddit.lower()]
else:
if submission_id in submissions: # TODO: clean this
submissions[submission_id].append(linked_subreddit.lower())
else:
submissions[submission_id] = [linked_subreddit.lower()]
with open("lists/submissions.txt", "w") as s_file:
json.dump(submissions, s_file, sort_keys=True)
def txt_to_list(file_path):
return_list = []
if os.path.isfile(file_path):
with open(file_path, "r") as text_file:
return_list = text_file.read()
return_list = return_list.split("\n")
return_list = list(filter(None, return_list))
return return_list
def signal_handler(var1, var2):
# Useful for backing up files from a ephemeral server
quit()
if __name__ == "__main__":
# signal.signal(signal.SIGTERM, signal_handler)
# All of the following files are .txt files with the format of a value on separate lines
top500subs = txt_to_list("lists/top500subs.txt")
# Comments replied to already
posted_comments_id = txt_to_list("lists/comments_replied.txt")
# Certain subreddits that are only linked as a meme/hashtag, not all inclusive...
memes = txt_to_list("lists/memes.txt")
# Subreddits that have opted out
custom_blacklist = txt_to_list("lists/custom_blacklist.txt")
# Bots shouldn't hang around with other bots, what would Jesus think??
bot_users = txt_to_list("lists/bot_users.txt")
# Subreddits that do not want to be peeked
custom_ignore_link = txt_to_list("lists/custom_ignore_link.txt")
bot_subreddits = txt_to_list("lists/bot_subreddits.txt")
# Similar to banned except these are the biggest subreddits in those lists and are removed from the subreddit scan
filter_list = txt_to_list("lists/filter_list.txt")
# Users that have opted out
custom_blacklist_users = txt_to_list("lists/custom_blacklist_users.txt")
banned = txt_to_list("lists/banned.txt")
# API credentials stored in praw.ini file in same folder as script
# See https://praw.readthedocs.io/en/latest/getting_started/configuration/prawini.html for more information
reddit = praw.Reddit('sneakpeekbot', user_agent='USERAGENT')
# "Filter" /r/all by removing the subreddits we don't want
r_all = reddit.subreddit("all-" + "-".join(filter_list))
bot_profile = reddit.redditor("sneakpeekbot")
if os.path.isfile("lists/submissions.txt"):
with open("lists/submissions.txt") as f:
submissions = json.load(f)
check_scores()
# While loop used so that any error is logged and then the process continues
while True:
try:
check_comments()
except (prawcore.exceptions.Forbidden, prawcore.exceptions.ServerError) as e:
sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment