Last active
October 3, 2019 01:21
-
-
Save aryan-gupta/ceb39dd573f2d76dc54731cb0e91f5dd to your computer and use it in GitHub Desktop.
Slackbot server for Mosaic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
import os | |
from slackeventsapi import SlackEventAdapter | |
from slack import WebClient | |
import string | |
from collections import OrderedDict | |
import unicodedata | |
import json | |
import re | |
import requests | |
from random import randint | |
import sys | |
OAUTH_FILE = 'OAuth.txt' | |
FIRED_FILE = 'fired.json' | |
TA_CHAT = '' | |
TEST_CHAT = '' | |
ADMIN_CHAT = '' | |
RAND_CHAT = '' | |
ADMIN_USERS = '' | |
BASE_URL = '' | |
GIPHY_SEARCH = 'https://api.giphy.com/v1/gifs/search?q={{search}}&api_key={api_key}&limit=50' | |
GIF_MSG_FORMAT = [ | |
{ | |
"type": "image", | |
"title": { | |
"type": "plain_text", | |
"text": "gif", | |
"emoji": True | |
}, | |
"image_url": "null", | |
"alt_text": "gif" | |
} | |
] | |
def strip_punc(message): | |
return message.translate(str.maketrans('', '', string.punctuation)) | |
def send_message(client, channel, message): | |
client.chat_postMessage(channel=channel, text=message) | |
def strip_whitespace(message): | |
return message.translate(str.maketrans('', '', string.whitespace)) | |
def normalize(message, t): | |
return unicodedata.normalize(t, message).encode('ascii','ignore').decode() | |
def adj_uniq(strng): | |
ret = '' | |
for chr in strng: | |
if ret == '' or chr != ret[len(ret)-1]: | |
ret += chr | |
return ret | |
# ====================================================================================================== | |
# cherwell.py | |
def check_ticket_num(message, start): | |
for i in range(start, start + 6): # cherwell tickets are 6 digits long | |
if not message[i].isdigit(): | |
return -1 | |
return start | |
def check_cherwell_ticket(message): | |
idx = message.find('ticket') + 6 | |
if idx + 6 > len(message): return -1 # if we will go out of bounds checking for ticket number | |
if idx is not -1: | |
if message[idx] is ' ' and message[idx + 1] is '1': # space after ticket | |
return check_ticket_num(message, idx + 1) | |
elif message[idx] is '1': | |
return check_ticket_num(message, idx ) | |
return -1 | |
def get_ticket_if_exists(message): | |
idx = check_cherwell_ticket(message) | |
if idx is not -1: | |
return message[idx : idx + 6] | |
return '' | |
def get_cherwell_link(message): | |
ticket_num = get_ticket_if_exists(message) | |
if ticket_num is '': | |
return None | |
else: | |
return BASE_URL.format(ticket_num=ticket_num) | |
def get_cherwell_short_link(message): | |
ticket_num = get_ticket_if_exists(message) | |
if ticket_num is '': | |
return None | |
else: | |
return '<' + BASE_URL.format(ticket_num=ticket_num) + '|Ticket ' + str(ticket_num) + '>' | |
# ====================================================================================================== | |
# gif.py | |
def search(regex, index=-1): | |
link = GIPHY_SEARCH.format(search=regex) | |
reply = requests.get(link) | |
reply = json.loads(reply.text) | |
max = len(reply['data']) | |
if max is 0: | |
return None, 0, 0 | |
if index is -1 or index >= max: | |
index = randint(0, max - 1) | |
return reply['data'][index]['images']['downsized']['url'], index, max | |
def get_gif_block(regex): | |
gif = GIF_MSG_FORMAT | |
link, index, size = search(regex) | |
if link is None: | |
link, index, size = search('nope') | |
gif[0]['image_url'] = link | |
gif[0]['alt_text'] = gif[0]['title']['text'] = f"{regex} ({index} of {size})" | |
return gif | |
# ====================================================================================================== | |
# joke.py | |
jokes = [ | |
('Cow says', 'No, a cow says mooooo!'), | |
('A little old lady', 'All this time, I had no idea you could yodel'), | |
('Europe', 'No Im not!'), | |
('Etch', 'Bless you, friend.'), | |
('Robin', 'Robin you, now hand over the cash.'), | |
('Cash', 'No thanks, Ill have some peanuts.'), | |
('Mustache', 'I mustache you a question, but Ill shave it for later.'), | |
('Tank', 'Youre welcome.'), | |
('I smell mop', 'Ew.'), | |
('I eat mop', 'Thats revolting.'), | |
('Ya', 'Yahoo! Im just as psyched to see you!'), | |
('Voodoo', 'Voodoo you think you are, asking me so many questions?'), | |
('Spell', 'Okay, okay: W. H. O.'), | |
('Candice', 'Candice door open, or what?'), | |
('Boo', 'No need to cry, its only a joke'), | |
('Stopwatch', 'Stopwatch youre doing and pay attention!'), | |
('To', 'Its to whom.'), | |
('Dewey', 'Dewey have to use a condom every time?'), | |
('Honey bee', 'Honey bee a dear and open up, would you?'), | |
('Lettuce', 'Lettuce in, its cold out here!'), | |
('Mikey', 'Mikey doesnt work so help me out, would you?'), | |
('Dwayne', 'Dwayne the bathtub already. Im drowning!'), | |
('Kanga', 'Actually, it\'s kangaroo'), | |
('Razor', 'Razor hands, this is a stick up!'), | |
('Kevin', 'What do you mean? You can\'t hide from big brother'), | |
('Micaela', 'It\'s the order of the Snail Queen, open up!'), | |
('Aryan', 'Just open it already'), | |
('Norman', '?????? ??????'), | |
('Trash', 'Sorry, no, its TRASHamsa BINdari'), | |
('Anthony', 'Abre la puerta ahora!'), | |
('Robert', 'Do you have a minute to talk about our lord and savior: Warframe?'), | |
('Adam', '') | |
] | |
knock_knock = {} | |
def manage_kk_jokes(client, user, message, channel): | |
if message == 'joke': | |
knock_knock[user] = 0, randint(0, len(jokes)) | |
elif not user in knock_knock.keys(): | |
return False | |
step, jk_num = knock_knock[user] | |
if step == 0: | |
send_message(client, channel, 'Knock Knock.') | |
knock_knock[user] = 1, jk_num | |
return True | |
elif strip_punc(message) == 'whos there' and step == 1: | |
send_message(client, channel, jokes[jk_num][0]) | |
knock_knock[user] = 2, jk_num | |
return True | |
elif step == 2 and 'who' in message: | |
send_message(client, channel, jokes[jk_num][1]) | |
knock_knock.pop(user, None) | |
return True | |
return False | |
kk_client = {} | |
def manage_kk_client_jokes(client, user, message, channel): | |
if message == 'knock knock': | |
send_message(client, channel, 'Who\'s there?') | |
kk_client[user] = 0 | |
return True | |
elif user in kk_client.keys() and kk_client[user] == 0: | |
send_message(client, channel, message + ' who?') | |
kk_client[user] = 1 | |
return True | |
elif user in kk_client.keys() and kk_client[user] == 1: | |
send_message(client, channel, 'Haha, very funny') | |
kk_client.pop(user, None) | |
return True | |
else: | |
return False | |
# ====================================================================================================== | |
# variations.py | |
match_dict = { | |
'c' : [ | |
'es', | |
'ca', | |
'tli', | |
'ke', | |
'she', | |
'open o', | |
'copyright', | |
'cent', | |
'ideograph-531a', | |
'ta', | |
'sigma', | |
'celsius', | |
'hundred', | |
'sima', | |
'chee', | |
'calx', | |
'ninety', | |
'nine', | |
'carian letter d', | |
'colon', | |
'twa' | |
], | |
'u' : [ | |
'v', | |
'n', | |
'iu', | |
'te', | |
'twe', | |
'tte', | |
'ideograph-722a', | |
'upsilon', | |
'seh', | |
'pu', | |
'union', | |
'ha', | |
'wa', | |
'sa', | |
'inverted ohm', | |
'lhu', | |
'tlhu' | |
], | |
'm' : [ | |
'turned w', | |
'ideograph-722a', | |
'go', | |
'thousand', | |
'mu', | |
'san', | |
'mi', | |
'em', | |
'lu', | |
'ehwaz eh e', | |
'ma', | |
'carian letter s', | |
'she', # THis is a duplicate from c: OLD ITALIC LETTER SHE | |
'bath of mary', | |
'three', | |
'tlo' | |
], | |
's' : [], | |
'e' : [], | |
'n' : [], | |
'j' : [], | |
'i' : [], | |
'z' : [] | |
} | |
def find_all(reg, word): | |
return [m.start() for m in re.finditer(reg, word)] | |
def get_unicode_name(message): | |
name = '' | |
for ch in message: | |
name += '(' + unicodedata.name(ch).lower() + ') ' | |
return name | |
def is_word(actual, character): | |
if actual in unicodedata.name(character).lower(): | |
return True | |
return False | |
def is_letter(actual, character): | |
name = unicodedata.name(character).lower() | |
should = ' ' + actual + ' ' # match in sentence | |
if should in name: | |
return True | |
should = ' ' + actual | |
for loc in find_all(should, name): # match at end | |
if loc != -1 and loc == len(name) - len(actual) - 1: | |
return True | |
should = actual + ' ' | |
for loc in find_all(should, name): # match at start | |
if loc != -1 and loc == 0: | |
return True | |
return False | |
def has(start, end, reg, message, lorw=is_letter): | |
for i in range(start, end): | |
if lorw(reg, message[i]): | |
return i | |
return -1 | |
def unicode_check(word, message, start=0, end=-1, depth=0): | |
if end == -1: end = len(message) # first iteration | |
if len(word) <= depth: return True # last iteration | |
if end == len(message) + 1: return False | |
match_list = match_dict[word[depth]] # get matching unicode list | |
match_list.append(word[depth]) | |
for reg in match_list: | |
reg = reg.lower() | |
if ' ' in reg: | |
h = has(start, end, reg, message, is_word) | |
else: | |
h = has(start, end, reg, message, is_letter) | |
if h != -1: return unicode_check(word, message, h+1, h+2, depth=depth+1) | |
return False | |
def check_word(word, message_raw): | |
message = strip_whitespace(strip_punc(message_raw)) | |
if word in message: | |
return True | |
if word in ''.join(OrderedDict.fromkeys(message).keys()): | |
return True | |
for t in ['NFKD', 'NFC', 'NFKC', 'NFD']: | |
if word in normalize(message, t): | |
return True | |
if unicode_check(word, message): | |
return True | |
return False | |
# ====================================================================================================== | |
# | |
def get_message_details(event_data): | |
""" Returns the message as lowercase, the channel the | |
message was posted to, and the user that posted the message, | |
Does all the checks to make sure we dont get an KeyError | |
""" | |
channel = event_data['event']['channel'] if 'channel' in event_data['event'].keys() else None | |
message = event_data['event']['text'] if 'text' in event_data['event'].keys() else None | |
user = event_data['event']['user'] if 'user' in event_data['event'].keys() else None | |
return message.lower(), channel, user | |
def get_message_type(event_data): | |
return event_data['event']['subtype'] if 'subtype' in event_data['event'].keys() else None | |
def admin_user(user): | |
return user in ADMIN_USERS | |
def get_fire_text(message, user, type='fire'): | |
if admin_user(user): | |
person = message[7:16].upper() | |
num_fired = 1 | |
if message[18:19] == 'x' and message[19:20].isdigit(): | |
num_fired = int(re.search(r'\d+', message[19:]).group()) | |
fired_list = {} | |
fired_list = json.load(open(FIRED_FILE, 'r')) | |
if person in fired_list.keys(): | |
if type == 'fire': | |
fired_list[person] = fired_list[person] + num_fired | |
else: | |
fired_list[person] = fired_list[person] - num_fired | |
else: | |
if type == 'fire': | |
fired_list[person] = 1 | |
else: | |
fired_list[person] = -1 | |
json.dump(fired_list, open(FIRED_FILE, 'w')) | |
num_times_fired = fired_list[person] | |
return f"Fired <@{person}>. You have been fired {num_times_fired} times" | |
return f"You can't fire anyone. You do not have the power." | |
def get_config(filename): | |
parsed_file = {} | |
with open(filename, 'r') as file: | |
for line in file.readlines(): | |
key, value = line.split('::', 1) | |
parsed_file[key.strip()] = value.strip() | |
return parsed_file | |
def handle_ta_chat(client, event_data): | |
if get_message_type(event_data) == 'message_deleted': return | |
message, channel, user = get_message_details(event_data) | |
if manage_kk_jokes(client, user, message, channel): return | |
if manage_kk_client_jokes(client, user, message, channel): return | |
if message.startswith('gif ') and len(message) > 4: | |
client.chat_postMessage(channel=channel, blocks=get_gif_block(message[4:])) | |
message = strip_punc(message) | |
if message == 'where am i': client.chat_postMessage(channel=channel, text='You are in the TA chat. Have fun in this chat without the admins watching!') | |
elif message == 'are you there': client.chat_postMessage(channel=channel, text='No') | |
elif message == 'how are you': client.chat_postMessage(channel=channel, text='I\'m doing swell') | |
elif message == 'hello': client.chat_postMessage(channel=channel, text='Hello to you too') | |
elif message == 'im dirty dan': client.chat_postMessage(channel=channel, text='No, I\'m Dirty Dan') | |
elif message == 'youre dirty dan': client.chat_postMessage(channel=channel, text='Yep, Thats correct') | |
elif message == 'cum in': client.chat_postMessage(channel=channel, text='me') | |
elif message == 'cum on': client.chat_postMessage(channel=channel, text='everyone') | |
elif check_word('cum', message): client.chat_postMessage(channel=channel, text='cum') | |
elif check_word('semen', message): client.chat_postMessage(channel=channel, text='cum') | |
elif check_word('jizz', message): client.chat_postMessage(channel=channel, text='cum') | |
elif check_word('jiz', message): client.chat_postMessage(channel=channel, text='cum') | |
elif 'terminator' in message: client.chat_postMessage(channel=channel, text='Hello, I\'m here') | |
elif 'prash' in message: client.chat_postMessage(channel=channel, text='Remember, All trash goes in the Prashcan') | |
elif 'coco' in message or 'mica' in message: client.chat_postMessage(channel=channel, text='All hail the Snail Queen :snail_queen:') | |
elif 'robert' in message: client.chat_postMessage(channel=channel, text='It\'s time to ride or die') | |
def handle_normal_chat(client, event_data): | |
if get_message_type(event_data) == 'message_deleted': return | |
message, channel, user = get_message_details(event_data) | |
message = strip_punc(message) | |
if manage_kk_jokes(client, user, message, channel): return | |
if manage_kk_client_jokes(client, user, message, channel): return | |
if message == 'where am i': client.chat_postMessage(channel=channel, text='I\'m not sure where you are, this is a generic chat response') | |
def handle_test_chat(client, event_data): | |
if get_message_type(event_data) == 'message_deleted': return | |
message, channel, user = get_message_details(event_data) | |
if message.startswith('fire <@') and len(message) > 7: | |
client.chat_postMessage(channel=channel, text=get_fire_text(message, user)) | |
message = strip_punc(message) | |
link = get_cherwell_short_link(message) | |
if link is not None: client.chat_postMessage(channel=channel, text=link) | |
if message.startswith('gif ') and len(message) > 4: | |
client.chat_postMessage(channel=channel, blocks=get_gif_block(message[4:])) | |
if message == 'where am i': client.chat_postMessage(channel=channel, text='You are in the test chat. Do you business, test out things, then leave :)') | |
elif message == 'at me': client.chat_postMessage(channel=channel, text='@UJ50N0V28') | |
def handle_admin_chat(client, event_data): | |
if get_message_type(event_data) == 'message_deleted': return # If it was a message deleted event, skip it | |
message, channel, _ = get_message_details(event_data) | |
message = strip_punc(message) | |
link = get_cherwell_short_link(message) | |
if link is not None: client.chat_postMessage(channel=channel, text=link) | |
if message == 'where am i': client.chat_postMessage(channel=channel, text='You are in the Admin chat, for important admin business only!') | |
elif message == 'are you there': client.chat_postMessage(channel=channel, text='Yes, I am here') | |
def handle_random_chat(client, event_data): | |
if get_message_type(event_data) == 'message_deleted': return | |
message, channel, user = get_message_details(event_data) | |
if message.startswith('fire <@') and len(message) > 5: | |
client.chat_postMessage(channel=channel, text=get_fire_text(message, user)) | |
message = strip_punc(message) | |
link = get_cherwell_short_link(message) | |
if link is not None: client.chat_postMessage(channel=channel, text=link) | |
if message.startswith('gif ') and len(message) > 4: | |
client.chat_postMessage(channel=channel, blocks=get_gif_block(message[4:])) | |
if message == 'where am i': client.chat_postMessage(channel=channel, text='You are in the rando chat, have fun') | |
def main(): | |
global GIPHY_SEARCH, TA_CHAT, TEST_CHAT, ADMIN_CHAT, RAND_CHAT, BASE_URL | |
config = json.loads(base64.b64decode(sys.argv[1])) # get_config(OAUTH_FILE) | |
GIPHY_SEARCH = GIPHY_SEARCH.format(api_key=config['GIPHY_API_KEY']) | |
adaptor = SlackEventAdapter(config['SLACK_SIGNING_SECRET'], endpoint='/slack/terminator') | |
client = WebClient(config['SLACK_BOT_TOKEN']) # Used to send messages as the bot | |
access = WebClient(config['SLACK_TOKEN']) # Used to access various settings if a channel | |
IP = config['IP'] | |
PORT = int(config['PORT']) | |
TA_CHAT = config['TA_CHAT'] | |
TEST_CHAT = config['TEST_CHAT'] | |
ADMIN_CHAT = config['ADMIN_CHAT'] | |
RAND_CHAT = config['RAND_CHAT'] | |
BASE_URL = config['CHERWELL_BASE'] | |
@adaptor.on("reaction_added") | |
def reaction_added(event_data): | |
"""Call back for reaction added events | |
""" | |
nonlocal client, access | |
emoji = event_data['event']['reaction'] | |
message_id = event_data['event']['item']['ts'] | |
channel = event_data['event']['item']['channel'] | |
if channel == TA_CHAT and emoji == 'slackbot': | |
# if the reaction was added to a message in the ta chat, get the message then reply with the unicode names | |
messages = access.groups_history(channel=channel, latest=message_id, count=1, inclusive=1) | |
message = messages['messages'][0]['text'] | |
if len(message) > 6: return | |
client.chat_postMessage(channel=channel, text=get_unicode_name(message)) | |
@adaptor.on("message") | |
def message(event_data): | |
"""Call back function for message events | |
""" | |
nonlocal client | |
if get_message_type(event_data) == 'bot_message': return # skip bot messages | |
channel = event_data['event']['channel'] | |
if channel == TA_CHAT: | |
handle_ta_chat(client, event_data) | |
elif channel == TEST_CHAT: | |
handle_test_chat(client, event_data) | |
elif channel == ADMIN_CHAT: | |
handle_admin_chat(client, event_data) | |
elif channel == RAND_CHAT: | |
handle_random_chat(client, event_data) | |
else: | |
handle_normal_chat(client, event_data) | |
adaptor.start(host=IP, port=PORT, debug=True) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment