-
-
Save changemyview/7cb2fec154d709cf584a to your computer and use it in GitHub Desktop.
The script for /r/changemyview's DeltaBot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
from __future__ import division | |
import time | |
import logging | |
import re | |
import praw | |
import pprint | |
import urllib2 | |
# change this as necessary | |
SUBREDDIT_NAME = 'changemyview' | |
BOT_ACCOUNT = ['deltabot', 'PASSWORD'] | |
# 30 seconds for now, change to 60*60 for 1 hour | |
PERIOD_SCAN = 60*30 | |
# these can optionally be changed | |
TOKENS = [u'∆', u'∆'] | |
MSG_CONFIRM = 'Confirmed: 1 delta awarded to /u/%s' | |
# unnecessary now that we have wiki solution | |
#TRACKER_URL = "http://www.reddit.com/r/snorrrlax/comments/1adxhd/deltabots_delta_tracker/" | |
# ignore everything else | |
########################### | |
TOKEN_REGEX = u'(?<!["\'])%s(?!["\'])' | |
# set up logging | |
logging.basicConfig(level=logging.DEBUG) | |
logging.getLogger('requests').setLevel(logging.ERROR) # hide messages from requests | |
#strings for table updating | |
TABLE_HEAD = '\n\n| Rank | Username | Deltas |\n| :------: | ------ | ------: |' | |
TABLE_LEADER_ENTRY = "\n| 1 | **/u/%s** | [%s](// \"deltas received\") |" | |
TABLE_ENTRY = '\n| %s | /u/%s | [%s](// "deltas received") |' | |
class Bot(object): | |
def __init__(self): | |
logging.info('connecting to reddit') | |
self.reddit = praw.Reddit(SUBREDDIT_NAME + ' bot') | |
self.reddit.login(*BOT_ACCOUNT) | |
self.subreddit = self.reddit.get_subreddit(SUBREDDIT_NAME) | |
def add_points(self, redditor, num_points=None): | |
"""Recalculate a user's delta and update flair.""" | |
if num_points == None: | |
num_points = 1 | |
old_flair = self.subreddit.get_flair(redditor.name) | |
if 'flair_css_class' not in old_flair or not old_flair['flair_css_class']: | |
old_flair['flair_css_class'] = '' | |
if not old_flair['flair_text']: | |
old_flair['flair_text'] = '' | |
old_points = 0 | |
else: | |
try: | |
old_points = int(old_flair['flair_text']) | |
except ValueError: | |
logging.debug("Old flair wasn't numeric.") | |
return False | |
new_flair_str = '%d' % (old_points + num_points) | |
if 'points' not in old_flair['flair_css_class']: | |
new_flair_class = 'points ' + old_flair['flair_css_class'] | |
else: | |
new_flair_class = old_flair['flair_css_class'] | |
self.subreddit.set_flair(redditor.name, new_flair_str, new_flair_class) | |
return True | |
#TODO: Seems slow. Can we streamline this? | |
def find_delta(self, comment, check_confirmed=True): | |
"""Search a comment to see whether or not there's a delta token""" | |
if comment.is_root: | |
logging.debug('not a reply') | |
return False | |
# skip deleted submissions | |
if not comment.submission.author: | |
logging.debug('author deleted') | |
return False | |
# strip any blockquotes so we don't count deltas twice | |
comment.body = self.strip_quotations(comment.body) | |
# search for token | |
for token in TOKENS: | |
REGEX = TOKEN_REGEX % token | |
if re.search(REGEX, comment.body): | |
# see if bot already confirmed | |
replyers = [c.author.name.lower() for c in comment.replies if c.author] | |
if self.reddit.get_info(thing_id=comment.parent_id).author == comment.author: | |
logging.debug('commentor responded to self with delta') | |
return False | |
if check_confirmed: | |
if BOT_ACCOUNT[0].lower() in replyers: | |
logging.debug('already confirmed') | |
return False | |
# get parent | |
parent = self.reddit.get_info(thing_id=comment.parent_id) | |
if parent.author is None: | |
return False | |
else: | |
return parent | |
logging.debug('delta token not found') | |
return False | |
def strip_quotations(self, comment_body): | |
"""Delete quotations from reddit posts (by > token)""" | |
# TODO: this will strip ANY paragraph with ANY > symbol in it, | |
# not just >s that indicate blockquotes. Needs refinement. | |
split_comment = comment_body.split("\n") | |
stripped_comment = "" | |
for paragraph in split_comment: | |
if paragraph.find(">") == -1: | |
stripped_comment = stripped_comment+ paragraph | |
return stripped_comment | |
#TODO: This function is way, way too big and can be modularized. Do that. | |
def scan(self, before_id=None): | |
if before_id is None: | |
limit = 500 | |
else: | |
limit = None | |
newest_comment = None | |
logging.debug('scanning comments newer than %s' % str(before_id)) | |
for comment in self.subreddit.get_comments(params={'before':before_id}, limit=limit): | |
new_comments = None | |
if type(comment) is praw.objects.MoreComments: | |
new_comments = comments.replies | |
print "scanning a MoreComments object" | |
scan(new_comments[0].name) | |
if comment == None: | |
logging.debug('This comment was deleted.') | |
continue | |
if newest_comment is None: | |
newest_comment = comment | |
#print "Has name is " | |
#print hasattr(comment, 'name') | |
#print "Has author is " | |
#print hasattr(comment, 'author') | |
#print "Has name AND author is " | |
#print (hasattr(comment, 'name') and hasattr(comment, 'author')) | |
#print "Is the author None?" | |
#print (comment.author == None) | |
if comment.name == None or comment.author == None: | |
print "Author or comment has been deleted.\n" | |
continue | |
else: | |
logging.debug('scanning comment %s by %s' % (comment.name, comment.author.name)) | |
# see if there's a delta reply | |
parent = self.find_delta(comment) | |
if not parent: | |
#logging.debug('skipping') | |
continue | |
logging.info('new delta comment %s by %s to %s found' % (comment.name, comment.author.name, parent.author.name)) | |
if parent.author.name.lower() == BOT_ACCOUNT[0].lower(): | |
logging.debug('reply to bot detected, awarding no points') | |
continue | |
if self.is_parents_thread(comment): | |
logging.debug("Submisson's author can't get delta in own thread.") | |
continue | |
if self.multiple_deltas_thread(comment): | |
logging.debug("Disallowing comment: same user, multiple deltas, same thread") | |
continue | |
# add points | |
okay = self.add_points(parent.author) | |
if okay: | |
# send confirmation message | |
self.update_delta_tracker(comment) | |
comment_confirm = comment.reply(MSG_CONFIRM % parent.author) | |
comment_confirm.distinguish() | |
logging.debug('confirmation message sent') | |
else: | |
logging.warn('non-numeric flair for user %s, skipping adding points' % comment.author.name) | |
if newest_comment is None: | |
logging.debug('no new comments') | |
return before_id | |
# write newest comment id to cache file | |
self.write_previous_comment_id(newest_comment.name) | |
return newest_comment.name | |
def get_top_ten_deltas(self): | |
"""Get a list of the top 10 delta earners""" | |
flair_list = [flair for flair in self.subreddit.get_flair_list(limit=None)] | |
flair_list = sorted(flair_list, key = self.get_flair_number) | |
flair_list.reverse() | |
while len(flair_list)<10: | |
flair_list.append({u'user' : u'none', u'flair_text' : u'no deltas'}) | |
return flair_list[0:10] | |
def update_top_ten_list(self): | |
"""Update the top 10 list with highest delta earners.""" | |
top_deltas = self.get_top_ten_deltas() | |
delta_table = [] | |
delta_table.append("\n\n**Top Ten Viewchangers**") | |
delta_table.append(TABLE_HEAD) | |
delta_table.append(TABLE_LEADER_ENTRY%((top_deltas[0][u'user'], top_deltas[0][u'flair_text']))) | |
for i in range(9): | |
delta_table.append(TABLE_ENTRY%((i+2, top_deltas[i+1][u'user'], top_deltas[i+1][u'flair_text']))) | |
settings = self.subreddit.get_settings() | |
old_desc = settings[u'description'] | |
# IMPORTANT: this splits the description on the _____ token. Don't use said | |
# token for anything other than dividing sections or else this breaks. | |
split_desc = old_desc.split("_____") | |
split_desc[len(split_desc)-1] = "".join(delta_table) | |
new_desc = "" | |
for x in split_desc: | |
#logging.info(x) | |
if x != split_desc[0]: | |
new_desc = new_desc + "_____" + x.replace("&", "&") | |
self.subreddit.update_settings(description=new_desc) | |
logging.info("Updated top 10 list.") | |
return | |
def get_flair_number(self, dic): | |
"""Get numeric value from flair""" | |
num = 0 | |
try: | |
num = int(dic[u'flair_text']) | |
except: | |
num = None | |
print "Viva la infinity!" | |
return num | |
def get_previous_comment_id(self): | |
"""Get the last comment's ID from file.""" | |
try: | |
id_file = open('prev_id.txt', 'r') | |
current = id_file.readline() | |
if current == "None": | |
current = None | |
id_file.close() | |
#current_arrayify = ['%s' % current] | |
return current | |
except IOError: | |
logging.info("\n\nNo ID file. Create it.") | |
id_file = open('prev_id.txt', 'w+') | |
id_file.write("None") | |
id_file.close() | |
return None | |
def get_root_comment(self, comment): | |
"""Find and return the root comment for this thread""" | |
if comment.is_root: | |
logging.debug("Comment IS root comment.") | |
return comment | |
submission = comment.submission #sub object | |
parent = False | |
while not parent: | |
for c in submission.comments: | |
if self.comment_in_path(c, comment): | |
logging.info("Found root comment") | |
logging.info("Root comment is:") | |
logging.info(c.body) | |
return c | |
return None | |
def multiple_deltas_thread(self, orig_comment): | |
"""Did this poster give > 1 delta in this thread? """ | |
logging.info("Checking for multiple deltas.") | |
comments = self.get_thread_comments(orig_comment) | |
more_comments = None | |
if type(comments) is praw.objects.MoreComments: | |
more_comments = comments.comments | |
comments = more_comments | |
users = {} | |
for comment in comments: | |
if comment.author != None: | |
if comment.author.name not in users: | |
users[comment.author.name] = 0 | |
if self.find_delta(comment, False): | |
users[comment.author.name] += 1 | |
if users[orig_comment.author.name] > 1: | |
logging.info("more than 0 comments in thread") | |
return True | |
logging.info("somehow got here") | |
return False | |
def comment_in_path(self, root, comment): | |
"""figure out whether or not comment is in child path of root""" | |
stack = [root] | |
while stack: | |
item = stack.pop(0) | |
try: | |
if not hasattr(item, 'replies'): | |
print "No attribute .replies, probably a MoreComment" | |
#this is a morecomments object | |
comments = item.comments() | |
for reply in comments: | |
stack.append(reply) | |
if reply == comment: | |
return True | |
for reply in item.replies: | |
stack.append(reply) | |
if reply == comment: | |
return True | |
except AttributeError: | |
logging.debug("Probably a MoreComments object.") | |
return False | |
def get_thread_comments(self, comment): | |
"""Returns a list of all comments in a comment's thread. | |
Implements a stack to gather a list of all comments in a | |
thread in which the original comment is involved. | |
""" | |
logging.info("Getting all comments in thread.") | |
root = self.get_root_comment(comment) | |
stack = [root] | |
retval = [] | |
while stack != []: | |
item = stack.pop(0) | |
if type(item) is praw.objects.MoreComments: | |
for comment in item.comments(): | |
stack.append(comment) | |
continue | |
for reply in item.replies: | |
new_comments = None | |
if type(reply) is praw.objects.MoreComments: | |
new_comments = reply.comments() | |
for comment in new_comments: | |
stack.append(comment) | |
stack.append(reply) | |
retval.append(item) | |
return retval | |
def write_previous_comment_id(self, the_id): | |
"""Write the previous comment's ID to file.""" | |
id_file = open('prev_id.txt', 'w') | |
id_file.write(the_id) | |
id_file.close() | |
return | |
def is_parents_thread(self, comment): | |
"""Does the thread belong to the comment's parent's author?""" | |
parent = self.reddit.get_info(thing_id=comment.parent_id) | |
if parent.author == comment.submission.author: | |
return True | |
else: | |
return False | |
def update_delta_tracker(self, comment): | |
comment_submission = comment.submission | |
comment_submission_title = comment_submission.title | |
comment_submission_url = comment_submission.url | |
parent = self.reddit.get_info(thing_id=comment.parent_id) | |
parent_author = parent.author.name.lower() | |
try: | |
user_wiki_page = self.reddit.get_wiki_page(SUBREDDIT_NAME, parent_author) | |
if user_wiki_page.page == parent_author: | |
#this is the parent's wiki page, update it. | |
add_link = "\n%s -- %s" % (comment_submission_title, comment_submission_url) | |
new_content = user_wiki_page.content_md + add_link | |
self.reddit.edit_wiki_page(SUBREDDIT_NAME, user_wiki_page.page, new_content, | |
"Updated delta links.") | |
logging.info("Updated delta tracker page for %s" % parent_author) | |
except: | |
#page doesn't exist, create it | |
initial_text = "User %s received deltas in the following threads:\n\n" % parent_author | |
add_link = "\n%s -- %s" % (comment_submission_title, comment_submission_url) | |
full_update = initial_text + add_link | |
self.reddit.edit_wiki_page(SUBREDDIT_NAME, parent_author, full_update, | |
"Created user's delta links page.") | |
logging.info("Created tracker page for %s" % parent_author) | |
#Now add link to user's thread to the main page. | |
delta_tracker_page = self.reddit.get_wiki_page(SUBREDDIT_NAME, "delta_tracker") | |
delta_tracker_page_body = delta_tracker_page.content_md | |
authors_page = "http://www.reddit.com/r/%s/wiki/%s" % (SUBREDDIT_NAME, | |
parent_author) | |
new_link = "\n%s -- %s" % (parent_author, authors_page) | |
new_content = delta_tracker_page_body + new_link | |
self.reddit.edit_wiki_page(SUBREDDIT_NAME, "delta_tracker", new_content, | |
"Updated tracker page.") | |
logging.info("Updated delta_tracker to link to %s's page" % parent_author) | |
def go(self): | |
"""Start DeltaBot""" | |
logging.info('starting with %0.1fs scan period\n\n' % PERIOD_SCAN) | |
before_id = self.get_previous_comment_id() | |
#before_id = None | |
logging.info("We're starting from this ID: %s\n\n" % before_id) | |
temp_id = before_id | |
logging.info("Temp id: %s\n Before id: %s" % (temp_id, before_id)) | |
while True: | |
start_time = time.time() | |
before_id = self.scan(before_id) | |
#NEW -- UPDATE TOP 10 LIST | |
self.update_top_ten_list() | |
# was there a new comment? if so, write to cache | |
if before_id != temp_id: | |
self.write_previous_comment_id(before_id) | |
temp_id = before_id | |
sleep_time = max(0, PERIOD_SCAN - (time.time() - start_time)) | |
logging.debug('sleeping %0.1fs' % sleep_time) | |
logging.debug("Current ID is %s\n\n" % before_id) | |
time.sleep(sleep_time) | |
def sandbox(): | |
#FOR TESTING. Put whatever here. | |
bot = Bot() | |
bot.test_recursive_tree() | |
def main(): | |
bot = Bot() | |
bot.go() | |
if __name__ == '__main__': | |
main() | |
#sandbox() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment