Skip to content

Instantly share code, notes, and snippets.

@notesbot
Last active February 16, 2024 18:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save notesbot/5c2e823d10633bc419273127d1361728 to your computer and use it in GitHub Desktop.
Save notesbot/5c2e823d10633bc419273127d1361728 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# r/ask_historians.py
import praw
import functools
import re
import time
import json
from logger_config import setup_logger
import os
import prawcore.exceptions
from dotenv import load_dotenv
from datetime import datetime, timedelta
import math
# set up logging
script_name = os.path.splitext(os.path.basename(__file__))[0]
logger = setup_logger(script_name)
# load credentials from an env file
load_dotenv()
env = os.getenv('ENV')
reddit = praw.Reddit(
client_id=os.getenv('CLIENT_ID'),
client_secret=os.getenv('CLIENT_SECRET'),
username=os.getenv('BOT_USERNAME'),
password=os.getenv('BOT_PASSWORD'),
user_agent=os.getenv('USER_AGENT')
)
subreddit = reddit.subreddit('askhistorians')
logger.info(f'Logged in as {reddit.user.me()} on r/{subreddit}')
# Set up a basic BLACKLIST of mod usernames to exempt.
BLACKLIST = ["AutoModerator"]
COOLDOWN_LIST_FILE_PATH = 'cooldown_list.txt'
USER_SCORES_FILE_PATH = 'user_scores.txt'
DENY_PERMS_REPLY = "u/{comment_author}, I'm sorry you don't have the permissions required to remove this comment."
# Consolidate handling reddit errors.
def reddit_error_handler(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except praw.exceptions.RedditAPIException as api_exc:
logger.error(f'Reddit API Error: {api_exc}')
time.sleep(180)
except praw.exceptions.ClientException as client_exc:
logger.error(f'Client Error: {client_exc}')
except prawcore.exceptions.RequestException as request_exc:
logger.error(f'Prawcore Request Exception: {request_exc}')
time.sleep(60)
except prawcore.exceptions.ResponseException as response_exc:
logger.error(f'Prawcore Response Exception: {response_exc}')
except Exception as e:
logger.error(f'General Error: {e}')
time.sleep(30)
return None
return wrapper
# Set up all the reddit api call functions with error handling.
@reddit_error_handler
def fetch_moderator_list(subreddit):
moderators = [str(mod) for mod in subreddit.moderator() if mod not in BLACKLIST]
return moderators
def is_super_user(username):
return username in moderators_list
@reddit_error_handler
def save_comment(comment):
logger.info(f"Saving comment: {comment.id}")
comment.save()
@reddit_error_handler
def remove_comment(comment):
logger.info(f"Removing comment: {comment.id}")
comment.mod.remove()
@reddit_error_handler
def send_message_to_user(username, message):
logger.info(f'Sending message to {username}')
if username is not None:
reddit.redditor(f"{username}").message(subject="test subject", message=message)
else:
return
@reddit_error_handler
def reply_to_message(message, reply_text):
if message is not None:
logger.info('Replying...')
message.reply(reply_text)
else:
return
@reddit_error_handler
def mark_as_read(message):
if message is not None:
logger.info("Marking message as read.")
message.mark_read()
else:
return
def get_cooldown_list():
"""Retrieves the cooldown list from a local text file."""
try:
# Check if the cooldown file exists; if not, return an empty dictionary
if not os.path.exists(COOLDOWN_LIST_FILE_PATH):
return {}
with open(COOLDOWN_LIST_FILE_PATH, 'r') as file:
content = file.read()
cooldown_list = json.loads(content) if content else {}
logger.info("Cooldown list successful. ")
return cooldown_list
except Exception as e:
logger.error(f"Failed to retrieve cooldown list: {e}")
return {}
def author_cooldown_list_check(comment, cooldown_period, super_user=True):
"""Check if user is on a cooldown list, if a mod, bypass the cooldown."""
if comment.author:
author_name = comment.author.name
logger.info(f"Checking cooldown for: {author_name}")
if is_super_user(author_name):
logger.info(f"u/{author_name} is a moderator, bypassing.")
return True
cooldown_list = get_cooldown_list()
if author_name in cooldown_list:
last_command_time = datetime.strptime(cooldown_list[author_name], "%Y-%m-%d %H:%M:%S")
rate_limit = timedelta(minutes=cooldown_period) - (datetime.now() - last_command_time)
if (datetime.now() - last_command_time) < timedelta(minutes=cooldown_period):
remaining_seconds = rate_limit.total_seconds()
logger.info(f"Remaining seconds: {remaining_seconds}")
# Format remaining time on rate limit to be human readable.
if remaining_seconds < 3600:
remaining_time_str = f"{math.ceil(remaining_seconds / 60)} minutes"
else:
hours = remaining_seconds // 3600
minutes = remaining_seconds % 3600 // 60
remaining_time_str = f"{int(hours)}" + (".5 hours" if minutes >= 30 else " hours")
cooldown_msg = f"Cooldown message: You're doing that too much. Please try again in {remaining_time_str}."
send_message_to_user(comment.author.name, cooldown_msg)
logger.info(f"Rate limit msg sent, was {remaining_time_str}.")
return False
return True
def update_cooldown_list(name, log_time):
"""Adds a name and time to the cooldown list stored in a local text file."""
name = str(name)
try:
cooldown_list = get_cooldown_list()
cooldown_list[name] = log_time.strftime("%Y-%m-%d %H:%M:%S")
with open(COOLDOWN_LIST_FILE_PATH, 'w') as file:
content = json.dumps(cooldown_list)
file.write(content)
logger.info(f"Cooldown list updated for user {name}.")
except Exception as e:
logger.error(f"Failed to update cooldown list for name {name} - received error {e}")
def save_user_scores(user_scores):
"""Saves the updated user scores to a local text file."""
content = json.dumps(user_scores, indent=4) # Use indent for better readability
try:
with open(USER_SCORES_FILE_PATH, 'w') as file:
file.write(content)
logger.info("User scores updated successfully.")
except Exception as e:
logger.error(f"Error: Something went wrong saving user scores: [{e}]")
def load_user_scores():
"""Retrieves user scores from a local text file."""
try:
# Check if the user scores file exists
if not os.path.exists(USER_SCORES_FILE_PATH):
return {}
with open(USER_SCORES_FILE_PATH, 'r') as file:
content = file.read()
user_scores = json.loads(content) if content else {}
return user_scores
except Exception as e:
logger.error(f'Error loading user scores: {e}')
return {}
def user_has_permissions(comment_author, required_perm_level):
"""Check if user is a mod (super user), or has flair text. """
if is_super_user(comment_author):
logger.info(f"u/{comment_author} is a moderator with full perms.")
return True
# Check for any flair as a sign of permission.
try:
flair = next(subreddit.flair(redditor=comment_author))
if flair.get('flair_text'):
return True
except StopIteration:
pass
return False
def update_user_scores(author_name=None, recipient=None, recipient_action=None, author_action=None):
"""Update the user scoreboard and increment total comments processed"""
logger.info("Loading user scores")
user_scores = load_user_scores()
# Increment the global count of total comments processed
if "Total Comments Processed" not in user_scores:
user_scores["Total Comments Processed"] = 0
user_scores["Total Comments Processed"] += 1
# Initiaze user entry if doesn't exist, increment the count by 1
def initialize_user_score(username, action=None):
username = username.lower()
# Ensure user entry exists within a 'Users' sub-dictionary
if "Users" not in user_scores:
user_scores["Users"] = {}
if username not in user_scores["Users"]:
user_scores["Users"][username] = {
"Removals": 0,
"Comments Removed": 0
}
# Increment specific action count if applicable
if action and action in user_scores["Users"][username]:
user_scores["Users"][username][action] += 1
# Update score for the author
if author_name:
initialize_user_score(author_name, action=author_action)
# Update score for the recipient if applicable
if recipient and recipient_action:
initialize_user_score(recipient, action=recipient_action)
# Save the updated scores back to the file
save_user_scores(user_scores)
def process_comment_removal(comment, submission_id):
"""Remove comment if meets several conditions."""
# Check if comment processed already (saved), or if removed by a mod already or the author is on a blacklist.
if comment.saved or comment.banned_by or comment.author.name in BLACKLIST:
return
# Check if the comment is in the right submission
if comment.link_id.split('_')[1] != submission_id:
return
# Check if user is on cooldown, set to 10 minutes. Only allows one removal every 10 minutes.
if not author_cooldown_list_check(comment, 10):
return
# Skip if the comment is a top level comment or if the author deleted their account.
if comment.is_root or not comment.author:
send_message_to_user(comment.author.name, "I'm sorry, you can't remove a submission, reply to a comment instead.")
save_comment(comment)
remove_comment(comment)
return
logger.info(f"New comment removal attempt from u/{comment.author.name}.")
# Get the parent comment ID and author name.
parent_comment = reddit.comment(id=comment.parent_id.split('_')[1])
author_name = comment.author.name.lower() if comment.author else "deleted_user"
# Check if the parent comment's author has permissions (is a super user or has flair)
if user_has_permissions(parent_comment.author.name, "any"):
logger.info(f"Skipping removal. u/{parent_comment.author.name} has flair or is a moderator.")
send_message_to_user(author_name, "The comment you attempted to remove belongs to a flaired user or moderator. No action was taken.")
return
# Check if the replying user has permissions
if user_has_permissions(author_name, "any"):
try:
remove_comment(parent_comment)
save_comment(parent_comment)
update_user_scores(author_name, recipient=parent_comment.author.name, author_action ="Removals", recipient_action = "Comments Removed")
# Send confirmation message to the replying user
send_message_to_user(author_name, f"You successfully removed the following comment by u/{parent_comment.author.name}\n\n> [{parent_comment.body}]({parent_comment.permalink}) with a removal reason of\n\n> [{comment.body}]({parent_comment.permalink})")
time.sleep(2)
except Exception as e:
logger.error(f"Failed to remove comment {parent_comment.id}: {e}")
# Send failure message to user
send_message_to_user(author_name, f"I was unable to remove this comment [{comment.id}]({parent_comment.permalink}) due to an error: ```{e}```")
time.sleep(2)
else:
# Send failure message due to lack of permissions
message = DENY_PERMS_REPLY.format(comment_author=comment.author.name)
send_message_to_user(author_name, message)
time.sleep(2)
def check_inbox_for_removal_submission_id():
#logger.info("Checking inbox for new removal submission ID...")
pattern = r'https?://(?:www\.|old\.|new\.)reddit\.com/r/[^/]+/comments/([^/]+)/'
for message in reddit.inbox.unread(limit=1): # Check the latest unread message
if "removals" in message.subject.lower():
try:
logger.info('Message found, extracting link...')
# Extract submission ID from message body
match = re.search(pattern, message.body)
if match:
submission_id = match.group(1)
logger.info(f"Found submission ID for removals: {submission_id}")
reply_to_message(message, f'Thank you, I have updated the thread for comment removals to http://redd.it/{submission_id}.')
mark_as_read(message)
return submission_id
else:
logger.info("No submission ID found in the message.")
reply_to_message(message, f'I was not able to process the submisison id from your link. This is what I have ```{submission_id}```')
mark_as_read(message)
except Exception as e:
logger.error(f"Error extracting submission ID: {e}")
mark_as_read(message)
return None
def main():
# Initial check for submission ID from inbox or use a default.
submission_id = check_inbox_for_removal_submission_id()
if not submission_id:
logger.info("No new submission ID found for removals. Using the existing ID...")
# Default submission ID as a fallback
submission_id = "180fdpl"
while True:
for comment in subreddit.stream.comments():
# Check the inbox for a new submission ID at the start of each iteration.
new_submission_id = check_inbox_for_removal_submission_id()
if new_submission_id:
submission_id = new_submission_id
logger.info(f"Updated submission ID to: {submission_id}")
elif not new_submission_id and not submission_id:
logger.info("No submission ID found; skipping processing for this comment.")
continue
process_comment_removal(comment, submission_id)
if __name__ == "__main__":
moderators_list = fetch_moderator_list(subreddit)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment