/ask_historians.py Secret
Last active
July 18, 2024 04:28
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# r/ask_historians.py | |
import praw | |
import functools | |
import re | |
import time | |
import json | |
from logger_config import setup_logger | |
import os | |
import prawcore.exceptions | |
from dotenv import load_dotenv | |
from datetime import datetime, timedelta | |
import math | |
# set up logging | |
script_name = os.path.splitext(os.path.basename(__file__))[0] | |
logger = setup_logger(script_name) | |
# load credentials from an env file | |
load_dotenv() | |
env = os.getenv('ENV') | |
reddit = praw.Reddit( | |
client_id=os.getenv('CLIENT_ID'), | |
client_secret=os.getenv('CLIENT_SECRET'), | |
username=os.getenv('BOT_USERNAME'), | |
password=os.getenv('BOT_PASSWORD'), | |
user_agent=os.getenv('USER_AGENT') | |
) | |
subreddit = reddit.subreddit('askhistorians') | |
logger.info(f'Logged in as {reddit.user.me()} on r/{subreddit}') | |
# Set up a basic BLACKLIST of mod usernames to exempt. | |
BLACKLIST = ["AutoModerator"] | |
COOLDOWN_LIST_FILE_PATH = 'cooldown_list.txt' | |
USER_SCORES_FILE_PATH = 'user_scores.txt' | |
DENY_PERMS_REPLY = "u/{comment_author}, I'm sorry you don't have the permissions required to remove this comment." | |
# Consolidate handling reddit errors. | |
def reddit_error_handler(func): | |
@functools.wraps(func) | |
def wrapper(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except praw.exceptions.RedditAPIException as api_exc: | |
logger.error(f'Reddit API Error: {api_exc}') | |
time.sleep(180) | |
except praw.exceptions.ClientException as client_exc: | |
logger.error(f'Client Error: {client_exc}') | |
except prawcore.exceptions.RequestException as request_exc: | |
logger.error(f'Prawcore Request Exception: {request_exc}') | |
time.sleep(60) | |
except prawcore.exceptions.ResponseException as response_exc: | |
logger.error(f'Prawcore Response Exception: {response_exc}') | |
except Exception as e: | |
logger.error(f'General Error: {e}') | |
time.sleep(30) | |
return None | |
return wrapper | |
# Set up all the reddit api call functions with error handling. | |
@reddit_error_handler | |
def fetch_moderator_list(subreddit): | |
moderators = [str(mod) for mod in subreddit.moderator() if mod not in BLACKLIST] | |
return moderators | |
def is_super_user(username): | |
return username in moderators_list | |
@reddit_error_handler | |
def save_comment(comment): | |
logger.info(f"Saving comment: {comment.id}") | |
comment.save() | |
@reddit_error_handler | |
def remove_comment(comment): | |
logger.info(f"Removing comment: {comment.id}") | |
comment.mod.remove() | |
@reddit_error_handler | |
def send_message_to_user(username, message): | |
logger.info(f'Sending message to {username}') | |
if username is not None: | |
reddit.redditor(f"{username}").message(subject="test subject", message=message) | |
else: | |
return | |
@reddit_error_handler | |
def reply_to_message(message, reply_text): | |
if message is not None: | |
logger.info('Replying...') | |
message.reply(reply_text) | |
else: | |
return | |
@reddit_error_handler | |
def mark_as_read(message): | |
if message is not None: | |
logger.info("Marking message as read.") | |
message.mark_read() | |
else: | |
return | |
def get_cooldown_list(): | |
"""Retrieves the cooldown list from a local text file.""" | |
try: | |
# Check if the cooldown file exists; if not, return an empty dictionary | |
if not os.path.exists(COOLDOWN_LIST_FILE_PATH): | |
return {} | |
with open(COOLDOWN_LIST_FILE_PATH, 'r') as file: | |
content = file.read() | |
cooldown_list = json.loads(content) if content else {} | |
logger.info("Cooldown list successful. ") | |
return cooldown_list | |
except Exception as e: | |
logger.error(f"Failed to retrieve cooldown list: {e}") | |
return {} | |
def author_cooldown_list_check(comment, cooldown_period, super_user=True): | |
"""Check if user is on a cooldown list, if a mod, bypass the cooldown.""" | |
if comment.author: | |
author_name = comment.author.name | |
logger.info(f"Checking cooldown for: {author_name}") | |
if is_super_user(author_name): | |
logger.info(f"u/{author_name} is a moderator, bypassing.") | |
return True | |
cooldown_list = get_cooldown_list() | |
if author_name in cooldown_list: | |
last_command_time = datetime.strptime(cooldown_list[author_name], "%Y-%m-%d %H:%M:%S") | |
rate_limit = timedelta(minutes=cooldown_period) - (datetime.now() - last_command_time) | |
if (datetime.now() - last_command_time) < timedelta(minutes=cooldown_period): | |
remaining_seconds = rate_limit.total_seconds() | |
logger.info(f"Remaining seconds: {remaining_seconds}") | |
# Format remaining time on rate limit to be human readable. | |
if remaining_seconds < 3600: | |
remaining_time_str = f"{math.ceil(remaining_seconds / 60)} minutes" | |
else: | |
hours = remaining_seconds // 3600 | |
minutes = remaining_seconds % 3600 // 60 | |
remaining_time_str = f"{int(hours)}" + (".5 hours" if minutes >= 30 else " hours") | |
cooldown_msg = f"Cooldown message: You're doing that too much. Please try again in {remaining_time_str}." | |
send_message_to_user(comment.author.name, cooldown_msg) | |
logger.info(f"Rate limit msg sent, was {remaining_time_str}.") | |
return False | |
return True | |
def update_cooldown_list(name, log_time): | |
"""Adds a name and time to the cooldown list stored in a local text file.""" | |
name = str(name) | |
try: | |
cooldown_list = get_cooldown_list() | |
cooldown_list[name] = log_time.strftime("%Y-%m-%d %H:%M:%S") | |
with open(COOLDOWN_LIST_FILE_PATH, 'w') as file: | |
content = json.dumps(cooldown_list) | |
file.write(content) | |
logger.info(f"Cooldown list updated for user {name}.") | |
except Exception as e: | |
logger.error(f"Failed to update cooldown list for name {name} - received error {e}") | |
def save_user_scores(user_scores): | |
"""Saves the updated user scores to a local text file.""" | |
content = json.dumps(user_scores, indent=4) # Use indent for better readability | |
try: | |
with open(USER_SCORES_FILE_PATH, 'w') as file: | |
file.write(content) | |
logger.info("User scores updated successfully.") | |
except Exception as e: | |
logger.error(f"Error: Something went wrong saving user scores: [{e}]") | |
def load_user_scores(): | |
"""Retrieves user scores from a local text file.""" | |
try: | |
# Check if the user scores file exists | |
if not os.path.exists(USER_SCORES_FILE_PATH): | |
return {} | |
with open(USER_SCORES_FILE_PATH, 'r') as file: | |
content = file.read() | |
user_scores = json.loads(content) if content else {} | |
return user_scores | |
except Exception as e: | |
logger.error(f'Error loading user scores: {e}') | |
return {} | |
def user_has_permissions(comment_author, required_perm_level): | |
"""Check if user is a mod (super user), or has flair text. """ | |
if is_super_user(comment_author): | |
logger.info(f"u/{comment_author} is a moderator with full perms.") | |
return True | |
# Check for any flair as a sign of permission. | |
try: | |
flair = next(subreddit.flair(redditor=comment_author)) | |
if flair.get('flair_text'): | |
return True | |
except StopIteration: | |
pass | |
return False | |
def update_user_scores(author_name=None, recipient=None, recipient_action=None, author_action=None): | |
"""Update the user scoreboard and increment total comments processed""" | |
logger.info("Loading user scores") | |
user_scores = load_user_scores() | |
# Increment the global count of total comments processed | |
if "Total Comments Processed" not in user_scores: | |
user_scores["Total Comments Processed"] = 0 | |
user_scores["Total Comments Processed"] += 1 | |
# Initiaze user entry if doesn't exist, increment the count by 1 | |
def initialize_user_score(username, action=None): | |
username = username.lower() | |
# Ensure user entry exists within a 'Users' sub-dictionary | |
if "Users" not in user_scores: | |
user_scores["Users"] = {} | |
if username not in user_scores["Users"]: | |
user_scores["Users"][username] = { | |
"Removals": 0, | |
"Comments Removed": 0 | |
} | |
# Increment specific action count if applicable | |
if action and action in user_scores["Users"][username]: | |
user_scores["Users"][username][action] += 1 | |
# Update score for the author | |
if author_name: | |
initialize_user_score(author_name, action=author_action) | |
# Update score for the recipient if applicable | |
if recipient and recipient_action: | |
initialize_user_score(recipient, action=recipient_action) | |
# Save the updated scores back to the file | |
save_user_scores(user_scores) | |
def process_comment_removal(comment, submission_id): | |
"""Remove comment if meets several conditions.""" | |
# Check if comment processed already (saved), or if removed by a mod already or the author is on a blacklist. | |
if comment.saved or comment.banned_by or comment.author.name in BLACKLIST: | |
return | |
# Check if the comment is in the right submission | |
if comment.link_id.split('_')[1] != submission_id: | |
return | |
# Check if user is on cooldown, set to 10 minutes. Only allows one removal every 10 minutes. | |
if not author_cooldown_list_check(comment, 10): | |
return | |
# Skip if the comment is a top level comment or if the author deleted their account. | |
if comment.is_root or not comment.author: | |
send_message_to_user(comment.author.name, "I'm sorry, you can't remove a submission, reply to a comment instead.") | |
save_comment(comment) | |
remove_comment(comment) | |
return | |
logger.info(f"New comment removal attempt from u/{comment.author.name}.") | |
# Get the parent comment ID and author name. | |
parent_comment = reddit.comment(id=comment.parent_id.split('_')[1]) | |
author_name = comment.author.name.lower() if comment.author else "deleted_user" | |
# Check if the parent comment's author has permissions (is a super user or has flair) | |
if user_has_permissions(parent_comment.author.name, "any"): | |
logger.info(f"Skipping removal. u/{parent_comment.author.name} has flair or is a moderator.") | |
send_message_to_user(author_name, "The comment you attempted to remove belongs to a flaired user or moderator. No action was taken.") | |
return | |
# Check if the replying user has permissions | |
if user_has_permissions(author_name, "any"): | |
try: | |
remove_comment(parent_comment) | |
save_comment(parent_comment) | |
update_user_scores(author_name, recipient=parent_comment.author.name, author_action ="Removals", recipient_action = "Comments Removed") | |
# Send confirmation message to the replying user | |
send_message_to_user(author_name, f"You successfully removed the following comment by u/{parent_comment.author.name}\n\n> [{parent_comment.body}]({parent_comment.permalink}) with a removal reason of\n\n> [{comment.body}]({parent_comment.permalink})") | |
time.sleep(2) | |
except Exception as e: | |
logger.error(f"Failed to remove comment {parent_comment.id}: {e}") | |
# Send failure message to user | |
send_message_to_user(author_name, f"I was unable to remove this comment [{comment.id}]({parent_comment.permalink}) due to an error: ```{e}```") | |
time.sleep(2) | |
else: | |
# Send failure message due to lack of permissions | |
message = DENY_PERMS_REPLY.format(comment_author=comment.author.name) | |
send_message_to_user(author_name, message) | |
time.sleep(2) | |
def check_inbox_for_removal_submission_id(): | |
#logger.info("Checking inbox for new removal submission ID...") | |
pattern = r'https?://(?:www\.|old\.|new\.)reddit\.com/r/[^/]+/comments/([^/]+)/' | |
for message in reddit.inbox.unread(limit=1): # Check the latest unread message | |
if "removals" in message.subject.lower(): | |
try: | |
logger.info('Message found, extracting link...') | |
# Extract submission ID from message body | |
match = re.search(pattern, message.body) | |
if match: | |
submission_id = match.group(1) | |
logger.info(f"Found submission ID for removals: {submission_id}") | |
reply_to_message(message, f'Thank you, I have updated the thread for comment removals to http://redd.it/{submission_id}.') | |
mark_as_read(message) | |
return submission_id | |
else: | |
logger.info("No submission ID found in the message.") | |
reply_to_message(message, f'I was not able to process the submisison id from your link. This is what I have ```{submission_id}```') | |
mark_as_read(message) | |
except Exception as e: | |
logger.error(f"Error extracting submission ID: {e}") | |
mark_as_read(message) | |
return None | |
def main(): | |
# Initial check for submission ID from inbox or use a default. | |
submission_id = check_inbox_for_removal_submission_id() | |
if not submission_id: | |
logger.info("No new submission ID found for removals. Using the existing ID...") | |
# Default submission ID as a fallback | |
submission_id = "180fdpl" | |
while True: | |
for comment in subreddit.stream.comments(): | |
# Check the inbox for a new submission ID at the start of each iteration. | |
new_submission_id = check_inbox_for_removal_submission_id() | |
if new_submission_id: | |
submission_id = new_submission_id | |
logger.info(f"Updated submission ID to: {submission_id}") | |
elif not new_submission_id and not submission_id: | |
logger.info("No submission ID found; skipping processing for this comment.") | |
continue | |
process_comment_removal(comment, submission_id) | |
if __name__ == "__main__": | |
moderators_list = fetch_moderator_list(subreddit) | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment