Last active
February 27, 2019 11:45
-
-
Save CalvT/cc7e8feded92168447b2eb656008d513 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import BotpySE as bp | |
import cbenv | |
import regex | |
import requests | |
import subprocess | |
import time | |
# Bot Variables | |
email = cbenv.email | |
password = cbenv.password | |
site = 'stackexchange.com' | |
botHeader = '[ [CommentSmoker](https://github.com/CalvT/CommentSmoker) ] ' | |
rooms = [57773] | |
#Bot Commands | |
class CommandAlive(bp.Command): | |
@staticmethod | |
def usage(): | |
return ["alive", "status"] | |
def run(self): | |
self.reply("Yes") | |
class CommandAmiprivileged(Command): | |
def usage(): | |
return ["amiprivileged", "doihaveprivs", "privileges"] | |
def run(self): | |
user_privs = self.message.user.get_privilege_type() | |
if user_privs is None: | |
self.reply("You do not have any privileges.") | |
else: | |
self.reply("You have the privilege: " + user_privs.name) | |
class CommandListPrivilegedUsers(Command): | |
def usage(): | |
return ["membership", "privileged", "listprivileged"] | |
def run(self): | |
privilege_list = list() | |
for each_user in self.message.room.get_users(): | |
if each_user.get_privilege_type() is not None: | |
privilege_list.append([each_user.id, each_user.get_privilege_type().name]) | |
table = tabulate.tabulate(privilege_list, headers=["User ID", "Privilege level"], tablefmt="orgtbl") | |
self.post(" " + table.replace("\n", "\n ")) | |
class CommandListRunningCommands(Command): | |
@staticmethod | |
def usage(): | |
return ["running commands", "rc"] | |
def run(self): | |
command_list = list() | |
for each_command, _ in self.command_manager.running_commands: | |
command_list.append([each_command.message.user.name, each_command.usage()[each_command.usage_index]]) | |
table = tabulate.tabulate(command_list, headers=["User", "Command"], tablefmt="orgtbl") | |
self.post(" " + re.sub('\n', '\n ', table)) | |
class CommandPrivilegeUser(Command): | |
def usage(): | |
return ["privilege * *", "addpriv * *"] | |
def privileges(self): | |
return 0 | |
def run(self): | |
user_id = int(self.arguments[0]) | |
privilege_name = self.arguments[1] | |
privilege_type = self.message.room.get_privilege_type_by_name(privilege_name) | |
if privilege_type == None: | |
self.reply("Please give a valid privilege type") | |
return | |
if self.message.room.is_user_privileged(user_id, privilege_type.level): | |
self.reply("The user specified already has the required privileges.") | |
return | |
self.message.room.change_privilege_level(user_id, privilege_type) | |
self.reply("The user specified has been given the privileges.") | |
class CommandUnprivilegeUser(Command): | |
@staticmethod | |
def usage(): | |
return ["unprivilege *", "unprivilege user *", "remove privileges *"] | |
def run(self): | |
user_id = int(self.arguments[0]) | |
self.message.room.change_privilege_level(user_id) | |
self.reply("The user specified has been stripped of all privileges.") | |
class CommandReboot(Command): | |
@staticmethod | |
def usage(): | |
return ['reboot', 'restart'] | |
def run(self): | |
self.reply("Rebooting...") | |
Utilities.StopReason.reboot = True | |
class CommandStop(Command): | |
@staticmethod | |
def usage(): | |
return ['stop', 'shutdown'] | |
def run(self): | |
self.reply("Shutting down...") | |
Utilities.StopReason.shutdown = True | |
commands = [CommandAlive, | |
CommandListRunningCommands, | |
CommandPrivilegeUser, | |
CommandStop, | |
CommandUnprivilegeUser, | |
CommandAmiprivileged, | |
CommandListPrivilegedUsers, | |
CommandReboot] | |
#Bot Starup | |
bot = bp.Bot('CharlieB', commands, rooms, [], site, email, password) | |
bot.start() | |
bot.add_privilege_type(1, "regular_user") | |
bot.add_privilege_type(2, "owner") | |
bot.set_room_owner_privs_max() | |
# Bot Message System | |
cbmQueue = {} | |
def cbmGenerator(msg): | |
cbmQueue[datetime.now()] = (botHeader + msg) | |
def cbm(): | |
while len(cbmQueue) > 0: | |
t = list(cbmQueue)[0] | |
bot.post_global_message(cbmQueue[t]) | |
del cbmQueue[t] | |
time.sleep(2) | |
# Regex Generation | |
chqGH = 'https://raw.githubusercontent.com/Charcoal-SE/SmokeDetector/master/' | |
chqWebsites = requests.get(chqGH + 'blacklisted_websites.txt').text.splitlines() | |
chqWR = r'(?i)({})'.format('|'.join(chqWebsites)) | |
chqKeywords = requests.get(chqGH + 'bad_keywords.txt').text.splitlines() | |
chqKR = r'(?is)(?:^|\b|(?w:\b))(?:{})'.format('|'.join(chqKeywords)) | |
wWebsites = open('websiteWhitelist.txt').read().splitlines() | |
wWR = r'.*<a href=\"http(s):\/\/(?!(www\.|)(' + '|'.join(wWebsites) + '))' | |
bKeywords = open('keywordBlacklist.txt').read().splitlines() | |
bKR = r'(' + ')|('.join(bKeywords) + ')' | |
# Comment Scanner | |
def scanner(scan): | |
if regex.search(chqWR, scan): | |
result = 3 | |
elif regex.search(chqKR, scan): | |
result = 4 | |
# elif regex.search(wWR, scan): | |
# result = 1 | |
elif regex.search(bKR, scan): | |
result = 2 | |
else: | |
result = 0 | |
return result | |
messages = { | |
1: 'Website Detected | [Comment: {}]({}): `{}`', | |
2: 'Keyword Detected | [Comment: {}]({}): `{}`', | |
3: 'Charcoal Website Detected | [Comment: {}]({}): `{}` @CalvT', | |
4: 'Charcoal Keyword Detected | [Comment: {}]({}): `{}` @CalvT' | |
} | |
# Get Comments | |
def fetcher(site): | |
comments = requests.get( | |
'http://api.stackexchange.com/2.2/comments?' | |
'page=1' | |
'&pagesize=75' | |
'&key=IAkbitmze4B8KpacUfLqkw((' | |
'&order=desc' | |
'&sort=creation' | |
'&site=' + site + | |
'&filter=!SWK9z*gpvuT.wQS8A.' | |
).json() | |
return comments['items'] | |
# Connect All Functions | |
def smokedetector(site): | |
items = fetcher(site) | |
a = b = c = 0 | |
for data in items: | |
a += 1 | |
if data['comment_id'] not in cIDs: | |
x = scanner(data['body']) | |
cIDs.add(data['comment_id']) | |
if x > 0: | |
cbmGenerator(messages.get(x) | |
.format(site, data['link'], data['body'][:300])) | |
b += 1 | |
else: | |
c += 1 | |
cRT.append(c) | |
print( | |
'{} Site: {} | Scanned: {} | New Matched: {} | Previously seen: {}' | |
.format(datetime.now(), site, a, b, c)) | |
cIDs = set() | |
cRT = [30, 30, 30, 30, 30, 30, 30, 30, 30, 30] | |
def runtime(): | |
while True: | |
s = datetime.now() | |
smokedetector('stackoverflow') | |
smokedetector('stackapps') | |
cbm() | |
s = datetime.now() - s | |
s = s.total_seconds() | |
d = sum(cRT[-10:]) / 10 | |
s = 40 - s + d | |
print(str(s) + " | " + str(d)) | |
time.sleep(s) | |
# Run Scanner | |
runtime() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment