Skip to content

Instantly share code, notes, and snippets.

Created June 9, 2017 23:26

Revisions

  1. @invalid-email-address Anonymous created this gist Jun 9, 2017.
    342 changes: 342 additions & 0 deletions sneakpeekbot.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,342 @@
    import praw
    import prawcore.exceptions
    import json
    import regex as re
    import os
    from time import sleep, time
    from collections import OrderedDict
    import signal
    import traceback


    # Dictionary where key is the subreddit linking from and the value is the subreddit being linked to
    ignore_links_to_from = {}
    # List of subreddits that do not want any NSFW links
    no_nsfw_comments = []
    # Dictionary where key is post ID and value is a list of subreddits processed in that post
    submissions = {}
    # Unwanted regex patterns, if any of these are present in the comment then that comment is ignored
    # All of these are self-explanatory except the last one which matches if the subreddit is in a reddit quote block
    patterns = ["((?<!top posts)\sover (?:to|in|at) /?r/)",
    "((?<!top post)\sover (?:to|in|at) /?r/)",
    "also,? check out /r/",
    "you can check /r/",
    "ask (?:this\s)?(?:in\s|at\s)?/?r/",
    "ask the [a-z]+ (?:on|in) /?r/",
    "/?r/\w+ has a [a-z]{3,}?ly",
    "OP in /?r/"
    "go to /?r/\w+ and search",
    "asking in /?r/",
    "I asked in /?r/",
    "try asking (?:this\s)?on /?r/",
    "try /r/\w+\?",
    "/?r/\w+'s sidebar",
    "asking (?:this\s)?over at /?r/",
    "your question to /?r/",
    "post this in /?r/",
    "post it in /?r/",
    "posted to /?r/",
    "repost to /?r/",
    "(?:she|he) posted on /?r/",
    "try posting (?:this\s)?in /?r/",
    "have you tried /?r/",
    "mod(?:erator)?s? (?:of|in|on|for) /?r/",
    "/?r/\w+ is (?:a\s)shit",
    "I'm not subbed to /?r/",
    "I am not subbed to /?r/",
    "unsubscribe from /?r/",
    "I hate /?r/",
    "(?:run|go) back to /?r/",
    "(?:deleted|banned) from /?r/",
    "selling in /?r/",
    "~~/r/\w+~~",
    "(?:^\s*>|\s*>)[^\\\n]+/r/\w+[^\\\n\\\n]+"]


    def check_scores():
    # This function should be called periodically as part of the main scan. This was removed from the code.
    limit = 500
    threshold = 0

    for my_comment in bot_profile.comments.new(limit=limit):
    if my_comment.score < threshold:
    # If you need the URL of the parent comment logged:
    # url = my_comment.permalink().replace(my_comment.id, my_comment.parent_id[3:])
    my_comment.delete()


    def check_comments():
    # All of the individual checks in if conditions are split up for logging purposes but the specific logging
    # has been removed from this file
    for comment in r_all.stream.comments():
    # Could also search the comment's text instead of the html but this way we let reddit handle subreddit links
    # that aren't really links
    found_subs = re.findall("<a href=\"/r/(\w+)\">/?r/", comment.body_html)

    # No subreddit links in this batch of comments, stream some more
    if not found_subs:
    continue

    # Only handle the first match
    subreddit_name = found_subs[0].lower()
    current_subreddit_name = str(comment.subreddit).lower()

    # If summoned in a comment, handle multiple subreddits and then go to the next loop
    if re.search("\+/?u/sneakpeekbot", comment.body):
    if current_subreddit_name not in banned and current_subreddit_name not in custom_blacklist:
    subreddit_scan(subreddit_name, comment, current_subreddit_name, True)
    continue

    # Conditions that would stop the comment from being processed
    conditions = [subreddit_name in top500subs,
    subreddit_name in memes,
    subreddit_name in bot_subreddits,
    subreddit_name in custom_ignore_link,
    comment.author in bot_users,
    comment.author in custom_blacklist_users,
    current_subreddit_name == subreddit_name,
    current_subreddit_name in banned,
    current_subreddit_name in custom_blacklist,
    comment.parent() in posted_comments_id,
    comment.is_root,
    len(set(found_subs)) > 2]

    if any(conditions):
    continue

    if str(comment.submission) in submissions:
    if subreddit_name in submissions[str(comment.submission)]:
    # Subreddit has already been processed in this post
    continue
    elif len(submissions[str(comment.submission)]) >= 3:
    # Limit of 3 sneak peeks per post
    continue

    # Conditions dependent on an API request are handled separately to avoid waste
    # Used to have more conditions but they were temporarily removed
    conditions_api = [comment.author in comment.subreddit.moderator]

    if any(conditions_api):
    continue

    # The (...|...).* is a better way of joining multiple regex statements and returning when any are matched
    if re.findall("(" + "|".join(patterns) + ").*", comment.body, flags=re.IGNORECASE):
    # Unwanted pattern was matched, ignore this comment
    continue

    if current_subreddit_name in ignore_links_to_from:
    if subreddit_name in ignore_links_to_from[current_subreddit_name]:
    # This particular link shouldn't be processed in this particular subreddit
    continue

    # Process the comment now
    try:
    subreddit_scan(subreddit_name, comment, current_subreddit_name, False)
    except praw.exceptions.APIException as e_API:
    # Posting too much in this subreddit
    # Handle the error
    pass
    except (prawcore.exceptions.Redirect, prawcore.exceptions.NotFound) as e_404:
    # Linked subreddit does not exist
    # Handle the error
    pass
    except prawcore.exceptions.Forbidden as e_403:
    # Either the bot is banned from the subreddit or the linked subreddit is banned/quarantined
    # Handle the error
    pass


    def subreddit_scan(subreddit_name, comment, current_subreddit, summon):
    if summon:
    subreddits_all = re.findall("/?r/(\w+)+", comment.body)
    # Get unique subreddits/Remove duplicates before processing
    subreddits = list(OrderedDict.fromkeys(subreddits_all).keys())

    # If not multiple subreddits then can be handled the usual way
    # TODO: Remove code reuse
    if len(subreddits) > 1:
    in_nsfw_subreddit = comment.subreddit.over18
    string_pattern = "**{}{}:**\n\n\#1: {} \n\#2: {} \n\#3: {}\n\n----\n\n"
    post_strings = []
    for subreddit_multi in subreddits:
    subreddit = reddit.subreddit(subreddit_multi)
    if subreddit.over18 and not in_nsfw_subreddit:
    # NSFW subreddit links ignored when the bot is being summoned
    continue
    posts_multi = []

    for submission in subreddit.top(limit=3):
    nsfw_post_string = "[NSFW] " if submission.over_18 and not subreddit.over18 else ""
    # Handle special characters in the title so reddit's markup isn't broken
    title = str(submission.title).replace("[", "\[").replace("]", "\]")
    if title.endswith("\\"):
    title = title[:-1] + "\ "
    post_url = submission.url.replace("//www.reddit.com", "//np.reddit.com")\
    .replace("(", "%28").replace(")", "%29")
    # Separate comments link if not self post. Ternary operators used just because
    comments_link_string = " | [{} comment{}](https://np.reddit.com{})".format(
    submission.num_comments, "" if submission.num_comments == 1 else "s",
    submission.permalink) if not submission.is_self else ""
    comment_format = "[{}{}]({}){}".format(nsfw_post_string, title, post_url,
    comments_link_string)
    posts_multi.append(comment_format)

    subreddit_np = "[/r/{}](https://np.reddit.com/r/{})".format(subreddit.display_name,
    subreddit.display_name)
    nsfw_string = " [NSFW]" if subreddit.over18 else ""

    post_strings.append(string_pattern.format(
    subreddit_np, nsfw_string, posts_multi[0], posts_multi[1], posts_multi[2]))
    comment_reply_string = "**Hi, here's a sneak peek of those subreddits using the top posts of all time!**\n\n"
    footer_string = "^^I'm ^^a ^^bot, ^^beep ^^boop ^^| [^^Contact ^^me](https://www.reddit.com/message/compose/?to=sneakpeekbot) ^^| [^^Info](https://np.reddit.com/r/sneakpeekbot/) ^^| [^^Opt-out](https://np.reddit.com/r/sneakpeekbot/comments/5lveo6/blacklist/)"

    for index, subreddit_posts in enumerate(post_strings):
    if index == 3:
    comment_reply_string += "^^I'm ^^only ^^showing ^^you ^^the ^^first ^^3 ^^subreddits ^^\(out ^^of ^^the ^^{} ^^possible) ^^to ^^avoid ^^spamming ^^the ^^page \n".format(
    len(post_strings))
    break
    comment_reply_string += subreddit_posts

    my_comment_id = str(comment.reply(comment_reply_string + footer_string))
    save_ids(comment.id, my_comment_id, str(comment.submission), subreddits)
    return
    else:
    # Makes it more explicit that we are passing to the default code
    pass

    subreddit = reddit.subreddit(subreddit_name)
    posts = []
    in_nsfw_subreddit = comment.subreddit.over18

    if summon and subreddit.over18 and not in_nsfw_subreddit:
    # NSFW subreddit links ignored when the bot is being summoned
    return

    if subreddit.over18:
    if current_subreddit in no_nsfw_comments:
    # NSFW subreddit link in a subreddit where the mods have requested SFW only peeks
    return
    nsfw_string = " [NSFW]"
    else:
    nsfw_string = ""

    # Subreddit more than 3 years old
    if (time() - subreddit.created_utc) / (60 * 60 * 24) > (3 * 365):
    time_filter = "year"
    time_filter_string = "the year"
    top_posts_link = "https://np.reddit.com/r/" + subreddit.display_name + "/top/?sort=top&t=year"
    else:
    time_filter = "all"
    time_filter_string = "all time"
    top_posts_link = "https://np.reddit.com/r/" + subreddit.display_name + "/top/?sort=top&t=all"

    for submission in subreddit.top(time_filter=time_filter, limit=3):
    nsfw_post_string = "[NSFW] " if submission.over_18 and not subreddit.over18 else ""
    # Handle special characters in the title so reddit's markup isn't broken
    title = str(submission.title).replace("[", "\[").replace("]", "\]")
    post_url = submission.url.replace("//www.reddit.com", "//np.reddit.com")\
    .replace("(", "%28").replace(")", "%29")
    # Separate comments link if not self post. Ternary operators used just because
    comments_link_string = " | [{} comment{}](https://np.reddit.com{})".format(
    submission.num_comments, "" if submission.num_comments == 1 else "s", submission.permalink)\
    if not submission.is_self else ""
    comment_format = "[{}{}]({}){}".format(nsfw_post_string, title, post_url, comments_link_string)

    posts.append(comment_format)

    if len(posts) < 3:
    # Subreddit has less than 3 total posts, refuse to peek this
    return

    subreddit_np = "[/r/{}](https://np.reddit.com/r/{})".format(subreddit.display_name, subreddit.display_name)

    message = "**Here's a sneak peek of {}{} using the [top posts]({}) of {}!**\n\n\#1: {} \n\#2: {} \n\#3: {}\n\n----\n" \
    "^^I'm ^^a ^^bot, ^^beep ^^boop ^^| ^^Downvote ^^to ^^remove ^^| [^^Contact ^^me](https://www.reddit.com/message/compose/?to=sneakpeekbot)" \
    " ^^| [^^Info](https://np.reddit.com/r/sneakpeekbot/) ^^| [^^Opt-out](https://np.reddit.com/r/sneakpeekbot/comments/5lveo6/blacklist/)" \
    .format(subreddit_np, nsfw_string, top_posts_link, time_filter_string, posts[0], posts[1], posts[2])

    my_comment_id = str(comment.reply(message))
    save_ids(comment.id, my_comment_id, str(comment.submission), subreddit.display_name)


    def save_ids(comment_id, my_comment_id, submission_id, linked_subreddit):
    posted_comments_id.append(comment_id)
    posted_comments_id.append(my_comment_id)

    with open("lists/comments_replied.txt", "w") as c_file:
    for comment_id in posted_comments_id:
    c_file.write(comment_id + "\n")

    # Insert the processed subreddit inside the submissions dictionary
    if isinstance(linked_subreddit, list):
    # Multiple links, bot was summoned
    for subreddit in linked_subreddit:
    if submission_id in submissions:
    submissions[submission_id].append(subreddit.lower())
    else:
    submissions[submission_id] = [subreddit.lower()]
    else:
    if submission_id in submissions: # TODO: clean this
    submissions[submission_id].append(linked_subreddit.lower())
    else:
    submissions[submission_id] = [linked_subreddit.lower()]

    with open("lists/submissions.txt", "w") as s_file:
    json.dump(submissions, s_file, sort_keys=True)


    def txt_to_list(file_path):
    return_list = []
    if os.path.isfile(file_path):
    with open(file_path, "r") as text_file:
    return_list = text_file.read()
    return_list = return_list.split("\n")
    return_list = list(filter(None, return_list))
    return return_list


    def signal_handler(var1, var2):
    # Useful for backing up files from a ephemeral server
    quit()


    if __name__ == "__main__":
    # signal.signal(signal.SIGTERM, signal_handler)
    # All of the following files are .txt files with the format of a value on separate lines
    top500subs = txt_to_list("lists/top500subs.txt")
    # Comments replied to already
    posted_comments_id = txt_to_list("lists/comments_replied.txt")
    # Certain subreddits that are only linked as a meme/hashtag, not all inclusive...
    memes = txt_to_list("lists/memes.txt")
    # Subreddits that have opted out
    custom_blacklist = txt_to_list("lists/custom_blacklist.txt")
    # Bots shouldn't hang around with other bots, what would Jesus think??
    bot_users = txt_to_list("lists/bot_users.txt")
    # Subreddits that do not want to be peeked
    custom_ignore_link = txt_to_list("lists/custom_ignore_link.txt")
    bot_subreddits = txt_to_list("lists/bot_subreddits.txt")
    # Similar to banned except these are the biggest subreddits in those lists and are removed from the subreddit scan
    filter_list = txt_to_list("lists/filter_list.txt")
    # Users that have opted out
    custom_blacklist_users = txt_to_list("lists/custom_blacklist_users.txt")
    banned = txt_to_list("lists/banned.txt")

    # API credentials stored in praw.ini file in same folder as script
    # See https://praw.readthedocs.io/en/latest/getting_started/configuration/prawini.html for more information
    reddit = praw.Reddit('sneakpeekbot', user_agent='USERAGENT')
    # "Filter" /r/all by removing the subreddits we don't want
    r_all = reddit.subreddit("all-" + "-".join(filter_list))
    bot_profile = reddit.redditor("sneakpeekbot")

    if os.path.isfile("lists/submissions.txt"):
    with open("lists/submissions.txt") as f:
    submissions = json.load(f)

    check_scores()

    # While loop used so that any error is logged and then the process continues
    while True:
    try:
    check_comments()
    except (prawcore.exceptions.Forbidden, prawcore.exceptions.ServerError) as e:
    sleep(10)