transcribersofreddit beta 0.1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
import sys | |
# noinspection PyUnresolvedReferences | |
import better_exceptions | |
from praw import Reddit | |
from redis import StrictRedis | |
from helpers import clean_id | |
from helpers import get_wiki_page | |
from helpers import log_header | |
from helpers import flair_post | |
from helpers import get_parent_post_id | |
from strings import reddit_url | |
from strings import id_already_handled_in_db | |
from strings import summoned_submit_title | |
from strings import discovered_submit_title | |
from strings import rules_comment | |
from strings import bot_footer | |
from strings import rules_comment_unknown_format | |
from strings import claim_success | |
from strings import claim_already_complete | |
from strings import already_claimed | |
from strings import done_still_unclaimed | |
from strings import done_completed_transcript | |
from strings import done_cannot_find_transcript | |
class Context(object): | |
''' | |
global support object for bot | |
''' | |
video_domains = [] | |
audio_domains = [] | |
image_domains = [] | |
video_formatting = '' | |
audio_formatting = '' | |
image_formatting = '' | |
subreddits_to_check = [] | |
def _(message): | |
""" | |
Message formatter. Returns the message and the disclaimer for the | |
footer. | |
:param message: string. The message to be displayed. | |
:return: string. The original message plus the footer. | |
""" | |
return bot_footer.format(message) | |
def populate_formatting(): | |
Context.audio_formatting = get_wiki_page('format/audio', tor=tor) | |
Context.video_formatting = get_wiki_page('format/video', tor=tor) | |
Context.image_formatting = get_wiki_page('format/images', tor=tor) | |
def populate_domain_lists(): | |
''' | |
Loads the approved content domains into memory from the wiki page. | |
:return: None | |
''' | |
domains = get_wiki_page('domains', tor=tor) | |
domains = ''.join(domains.splitlines()).split('---') | |
for domainset in domains: | |
domain_list = domainset[domainset.index('['):].strip('[]').split(', ') | |
current_domain_list = [] | |
if domainset.startswith('video'): | |
current_domain_list = Context.video_domains | |
elif domainset.startswith('audio'): | |
current_domain_list = Context.audio_domains | |
elif domainset.startswith('images'): | |
current_domain_list = Context.image_domains | |
[current_domain_list.append(x) for x in domain_list] | |
logging.debug('Domain list populated: {}'.format(current_domain_list)) | |
def populate_subreddit_list(): | |
''' | |
Gets the list of subreddits to monitor and loads it into memory. | |
:return: None. | |
''' | |
Context.subreddits_to_check = get_wiki_page('subreddits', tor=tor).split('\r\n') | |
logging.debug( | |
'Created list of subreddits from wiki: {}'.format( | |
Context.subreddits_to_check | |
) | |
) | |
def configure_logging(): | |
logging.basicConfig( | |
level=logging.INFO, | |
format='[%(asctime)s] - [%(levelname)s] - [%(funcName)s] - %(message)s', | |
datefmt='%m/%d/%Y %I:%M:%S %p', | |
filename='transcribersofreddit.log' | |
) | |
console = logging.StreamHandler() | |
console.setLevel(logging.INFO) | |
formatter = logging.Formatter('[%(asctime)s] - [%(funcName)s] - %(message)s') | |
# tell the handler to use this format | |
console.setFormatter(formatter) | |
# add the handler to the root logger | |
logging.getLogger('').addHandler(console) | |
log_header('Starting!') | |
def is_valid(post_id): | |
""" | |
Returns true or false based on whether the parent id is in a set of IDs. | |
It determines this by attempting to insert the value into the DB and | |
returning the result. Sure, we could use SISMEMBER and actually try to | |
look it up first, but that takes all the fun out of it. | |
:param post_id: string. The comment / post ID. | |
:return: True if the ID is successfully inserted into the set; False if | |
it's already there. | |
""" | |
result = redis_server.sadd("post_ids", post_id) | |
return True if result == 1 else False | |
def respond_to_thanks(mention): | |
logging.info( | |
'Responding to a Thank You comment, ID {}'.format(mention) | |
) | |
mention.reply(_('You\'re very welcome! I\'m just doing my job!')) | |
def process_mention(mention): | |
# We have to do this entire parent / parent_permalink thing twice because | |
# the method for calling a permalink changes for each object. Laaaame. | |
if not mention.is_root: | |
# this comment is in reply to something. Let's grab a comment object. | |
parent = r.comment(id=clean_id(mention.parent_id)) | |
parent_permalink = parent.permalink() | |
else: | |
# this is a post. | |
parent = r.submission(id=clean_id(mention.parent_id)) | |
parent_permalink = parent.permalink | |
logging.info( | |
'Posting call for transcription on ID {}'.format(mention.parent_id) | |
) | |
# noinspection PyBroadException | |
try: | |
result = tor.submit( | |
title=summoned_submit_title.format( | |
sub=mention.subreddit.display_name, | |
commentorpost=parent.__class__.__name__.lower(), | |
title=parent.title | |
), | |
url=reddit_url.format(parent_permalink) | |
) | |
result.reply(_(rules_comment_unknown_format)) | |
logging.info( | |
'Posting success message in response to caller, u/{}'.format(mention.author) | |
) | |
mention.reply(_( | |
'The transcribers have been summoned! Please be patient ' | |
'and we\'ll be along as quickly as we can.') | |
) | |
# I need to figure out what errors can happen here | |
except Exception as e: | |
logging.error(e) | |
logging.error( | |
'Posting failure message in response to caller, u/{}'.format(mention.author) | |
) | |
mention.reply(_( | |
'Something appears to have gone wrong. Please message the ' | |
'moderators of r/TranscribersOfReddit to have them look at ' | |
'this. Thanks!') | |
) | |
def process_post(new_post): | |
if not is_valid(new_post.fullname): | |
logging.info(id_already_handled_in_db.format(new_post.fullname)) | |
return | |
logging.info( | |
'Posting call for transcription on ID {} posted by {}'.format( | |
new_post.fullname, new_post.author.name | |
) | |
) | |
if new_post.domain in Context.image_domains: | |
content_type = 'image' | |
content_format = Context.image_formatting | |
elif new_post.domain in Context.audio_domains: | |
content_type = 'audio' | |
content_format = Context.audio_formatting | |
elif new_post.domain in Context.video_domains: | |
content_type = 'video' | |
content_format = Context.video_formatting | |
else: | |
# how could we get here without fulfilling one of the above | |
# criteria? Just remember: the users will find a way. | |
content_type = 'Unknown' | |
content_format = 'Formatting? I think something went wrong here...' | |
# noinspection PyBroadException | |
try: | |
result = tor.submit( | |
title=discovered_submit_title.format( | |
sub=new_post.subreddit.display_name, | |
type=content_type.title(), | |
title=new_post.title | |
), | |
url=reddit_url.format(new_post.permalink) | |
) | |
result.reply( | |
_( | |
rules_comment.format( | |
post_type=content_type, formatting=content_format | |
) | |
) | |
) | |
# Flair looks like this: | |
# { | |
# 'flair_css_class': 'unclaimed-flair', | |
# 'flair_template_id': 'fe9d6950-142a-11e7-901e-0ecc947f9ff4', | |
# 'flair_text_editable': False, | |
# 'flair_position': 'left', | |
# 'flair_text': 'Unclaimed' | |
# } | |
flair_post(result, 'Unclaimed') | |
# I need to figure out what errors can happen here | |
except Exception as e: | |
logging.error(e) | |
logging.error( | |
'Something went wrong; unable to post content.\n' | |
'ID: {id}\n' | |
'Title: {title}\n' | |
'Subreddit: {sub}'.format( | |
id=new_post.fullname, | |
title=new_post.title, | |
sub=new_post.subreddit.display_name | |
) | |
) | |
def check_inbox(): | |
# first we do mentions, then comment replies | |
mentions = [] | |
# grab all of our messages and filter | |
for item in r.inbox.unread(limit=None): | |
if item.subject == 'username mention': | |
mentions.append(item) | |
item.mark_read() | |
# sort them and create posts where necessary | |
for mention in mentions: | |
logging.info('Received mention! ID {}'.format(mention)) | |
if not is_valid(mention.parent_id): | |
# Do our check here to make sure we can actually work on this one and | |
# that we haven't already posted about it. We use the full ID here | |
# instead of the cleaned one, just in case. | |
logging.info(id_already_handled_in_db.format(mention.parent_id)) | |
continue | |
process_mention(mention) | |
# comment replies | |
replies = [] | |
for item in r.inbox.unread(limit=None): | |
if item.subject == 'comment reply': | |
replies.append(item) | |
item.mark_read() | |
for reply in replies: | |
if 'thank' in reply.body.lower(): | |
respond_to_thanks(reply) | |
continue | |
if 'claim' in reply.body: | |
process_claim(reply) | |
if 'done' in reply.body: | |
process_done(reply) | |
def verified_posted_transcript(post): | |
top_parent = get_parent_post_id(post, r) | |
# get source link, check all comments, look for root level comment | |
# by the author of the post. Return True if found, False if not. | |
linked_resource = r.submission(top_parent.id_from_url(top_parent.url)) | |
for top_level_comment in linked_resource.comments: | |
if post.author == top_level_comment.author: | |
return True | |
return False | |
def process_done(post): | |
top_parent = get_parent_post_id(post, r) | |
if 'Unclaimed' in top_parent.link_flair_text: | |
post.reply(_(done_still_unclaimed)) | |
elif top_parent.link_flair_text == 'In Progress': | |
if verified_posted_transcript(post): | |
# we need to double-check these things to keep people | |
# from gaming the system | |
post.reply(_(done_completed_transcript)) | |
flair_post(top_parent, 'Completed!') | |
else: | |
post.reply(_(done_cannot_find_transcript)) | |
def process_claim(post): | |
top_parent = get_parent_post_id(post, r) | |
if 'Unclaimed' in top_parent.link_flair_text: | |
# need to get that "Summoned - Unclaimed" in there too | |
post.reply(_(claim_success)) | |
flair_post(top_parent, 'In Progress') | |
# can't claim something that's already claimed | |
elif top_parent.link_flair_text == 'In Progress': | |
post.reply(_(already_claimed)) | |
elif top_parent.link_flair_text == 'Completed!': | |
post.reply(_(claim_already_complete)) | |
pass | |
def check_submissions(subreddit): | |
for post in r.subreddit(subreddit).new(limit=100): | |
if ( | |
post.domain in Context.image_domains or | |
post.domain in Context.audio_domains or | |
post.domain in Context.video_domains | |
): | |
process_post(post) | |
# for testing, we'll just do one | |
return | |
def initialize(): | |
populate_domain_lists() | |
logging.info('Domains loaded.') | |
populate_subreddit_list() | |
logging.info('Subreddits loaded.') | |
populate_formatting() | |
logging.info('Formatting loaded.') | |
if __name__ == '__main__': | |
r = Reddit('bot') | |
configure_logging() | |
try: | |
redis_server = StrictRedis(host='localhost', port=6379, db=0) | |
redis_server.ping() | |
except: | |
logging.error("Redis server is not running! Exiting!") | |
sys.exit(1) | |
tor = r.subreddit('transcribersofreddit') | |
initialize() | |
try: | |
# primary loop | |
while True: | |
check_inbox() | |
for sub in Context.subreddits_to_check: | |
check_submissions(sub) | |
logging.info('Sleeping!') | |
time.sleep(60) | |
except KeyboardInterrupt: | |
logging.error('User triggered shutdown. Shutting down.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment