Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@Nachtalb
Last active October 25, 2021 16:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nachtalb/991411b7c59846162099c93d43e5ed2b to your computer and use it in GitHub Desktop.
Save Nachtalb/991411b7c59846162099c93d43e5ed2b to your computer and use it in GitHub Desktop.
IRC ZNC module for posting danbooru pics on demand by any user with configurable commands for each IRC channel
from contextlib import contextmanager
from datetime import datetime
from random import choice
from requests import Session
from yarl import URL
import inspect
import json
import os
import pprint
import sys
import traceback
import znc
pp = pprint.PrettyPrinter()
def _is_self(*args):
if len(args) > 1 and type(args[0]) == danbooru:
return args[0]
return None
def catchfail(return_code):
func = return_code if callable(return_code) else None
return_code = None if func else return_code
def outer_wrapper(fn):
def wrapper(*args, **kwargs):
try:
return fn(*args, **kwargs)
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
s = _is_self(*args)
if s:
s.PutModule('Failed with %s' % (e))
lines = traceback.format_exception(exc_type, exc_value,
exc_traceback)
s.internal_log.error(*lines)
for line in lines:
s.PutModule(line)
return return_code
return wrapper
if func:
return outer_wrapper(func)
return outer_wrapper
class danbooru(znc.Module):
_base_url = URL('https://danbooru.donmai.us/')
_session = None
loaded_aliases = []
description = 'Auto posts danbooru content by defined aliases'
has_args = True
args_help_text = 'API Key and Login space separated in this order'
hook_debugging = False
module_types = [znc.CModInfo.NetworkModule]
def OnLoad(self, args: str, message):
self.internal_log = InternalLog(self.GetSavePath())
self.debug_hook()
try:
return self.setup(args, message)
except Exception as error:
self.internal_log.error(error)
raise error
def setup(self, args, message):
self._api_key = self.nv.get('api_key')
self._login = self.nv.get('login')
self._session = Session()
arguments = args.strip().split()
if len(args) > 1 and not self._api_key and not self._login:
self._api_key, self._login = arguments
self.nv['api_key'], self.nv['login'] = self._api_key, self._login
return znc.CONTINUE
def _request(self, endpoint, params={}):
data = {
'api_key': self._api_key,
'login': self._login
}
data.update(params)
for key, value in data.items():
if value is None:
del data[key]
url = self._base_url / endpoint % data
response = self._session.get(str(url))
if response.status_code == 200:
return response.json()
else:
self._request_error(response)
def _request_error(self, response):
error_msg = response.reason
try:
data = response.json()
error_msg = data.get('message', error_msg)
except json.JSONDecodeError:
try:
data.raise_for_status()
except Exception as error:
error_msg = error.args.join(', ')
self.internal_log.error(error_msg)
def posts(self, tags=None, limit=None):
return self._request('posts.json', {'tags': tags, 'limit': limit if limit else 200})
@property
def config_key(self):
return self.GetUser().GetUsername()
def get_client_options(self, clientname=None):
clientname = clientname or self.config_key
config = self.nv.get(clientname, '{"alias": {}}')
return json.loads(config) if config else None
def set_client_options(self, config, clientname=None):
clientname = clientname or self.config_key
self.nv[clientname] = json.dumps(config)
def add_alias(self, alias, tags, channels, clientname=None):
config = self.get_client_options(clientname)
updated = alias in config['alias']
config['alias'].update({alias: {
'tags': tags,
'channels': channels,
}})
self.set_client_options(config, clientname)
return updated
def get_alias(self, alias, clientname=None):
config = self.get_client_options(clientname)
return config['alias'].get(alias)
def remove_alias(self, alias, clientname=None):
config = self.get_client_options(clientname)
if alias in config['alias']:
del config['alias'][alias]
self.set_client_options(config, clientname)
return True
def add_alias_command(self, args):
self.debug_hook()
name = args[0]
channels = [item for item in args[1:] if item.startswith('#')]
tags = [item for item in args[1:] if not item.startswith('#')]
if (not tags and channels) or (not channels and tags):
current_config = self.get_alias(name)
if not current_config:
self.inform_user('You have to at least specify one tag and one channel', True)
return
if not tags:
tags = current_config['tags'].split()
channels += current_config['channels']
if not channels:
tags += current_config['tags'].split()
channels = current_config['channels']
channels = sorted(set(channels))
tags = ' '.join(sorted(set(tags)))
updated = self.add_alias(name, tags, list(channels))
if not channels:
self.inform_user('No channels defined. This alias is not available anywhere!', notice=True)
if updated:
self.inform_user(f'Updated {name}: {tags}, {" ".join(channels)}')
else:
self.inform_user(f'Added {name}: {tags}, {" ".join(channels)}')
def add_channel_to_all_command(self, args):
channels = [c for c in args if c.startswith('#')]
if not channels:
self.inform_user('You have to specify at least one channel', True)
return
client_config = self.get_client_options()
if not client_config['alias']:
self.inform_user('No aliases configured', True)
return
for alias, config in client_config['alias'].items():
client_config['alias'][alias]['channels'] = sorted(set(client_config['alias'][alias]['channels'] + channels))
self.set_client_options(client_config)
self.inform_user('All aliases have been update')
self.list_config_command()
def remove_channel_from_all(self, args):
channels = set([c for c in args if c.startswith('#')])
if not channels:
self.inform_user('You have to specify at least one channel', True)
return
client_config = self.get_client_options()
if not client_config['alias']:
self.inform_user('No aliases configured', True)
return
for alias, config in client_config['alias'].items():
current_channels = set(client_config['alias'][alias]['channels'])
client_config['alias'][alias]['channels'] = sorted(current_channels - channels)
self.set_client_options(client_config)
self.inform_user('All aliases have been update')
self.list_config_command()
def remove_alias_command(self, args):
for name in args:
removed = self.remove_alias(name)
if removed:
self.inform_user(f'Removed {name}')
else:
self.inform_user(f'{name} does not exist')
def list_config_command(self):
config = self.get_client_options()
if not config['alias']:
self.inform_user('There are no aliases configured')
table_data = [(alias, v['tags'], ', '.join(v['channels'])) for alias, v in config['alias'].items()]
table = self.create_table(['alias', 'tags', 'channels'], table_data)
self.inform_user(table)
def create_table(self, header, rows):
table = znc.CTable()
for title in header:
table.AddColumn(title)
for row in rows:
table.AddRow()
for title, value in zip(header, row):
table.SetCell(title, value)
return table
def show_usage_command(self):
self.inform_user(self.create_table(
['command', 'arguments', 'function'],
[
['help', '', 'Show list of commands'],
['add/update', 'NAME TAGS... CHANNELS...', 'Add / Update an alias. "{args}" will be replaced with the users input. The triggers will only work in the defined channels.'],
['add-channel', 'CHANNELS...', 'Add channel to all aliases'],
['remove-channel', 'CHANNELS...', 'Remove channel from all aliases'],
['set-help', 'CHANNELS...', '!!help command for any users in those channels'],
['list-help', '', 'Show where !!help is enabled'],
['remove', 'NAME...', 'Remove the given aliases'],
['list', '', 'List all aliases'],
]
))
def send_both_ways(self, message, target):
self.debug_hook()
self.send_to_irc(message, target)
self.send_to_client(message, target)
def pass_through(self, message):
self.debug_hook()
if self.GetClient():
self.send_to_irc(message.GetText(), message.GetTarget())
self.send_to_client(message.GetText(), message.GetTarget(), current=False)
else:
self.send_to_client(message.GetText(), message.GetTarget(), from_mask=message.GetNick().GetHostMask())
def send_to_irc(self, message, target):
self.PutIRC(f'PRIVMSG {target} :{message}')
def send_to_client(self, message, target, current=True, from_mask=None):
self.debug_hook()
current_client = self.GetClient()
current_network = self.GetNetwork()
for client in self.GetUser().GetAllClients():
if (not current and current_client == client) or current_network != client.GetNetwork():
continue
if not from_mask:
from_mask = client.GetNickMask()
client.PutClient(f':{from_mask} PRIVMSG {target} :{message}')
def inform_user(self, message, notice=False):
if notice:
self.PutModNotice(message)
else:
self.PutModule(message)
def send_random(self, tags, target):
posts = self.posts(tags)
post = None
while not post:
post = choice(posts or [None])
if not post:
self.send_both_ways(f'Didn\'t find anything for the tags: {tags}', target)
return
if 'loli' in tags or 'shota' in tags:
url = post.get('file_url')
else:
url = self._base_url / 'posts' / str(post['id']) % {'q': tags}
if not url:
post = None
continue
self.send_both_ways(f'Post: {post["id"]}: {url}', target)
return True
@contextmanager
def check_and_send(self, message):
target = message.GetTarget()
msg_args = message.GetText().split()
alias = self.get_alias(msg_args[0].lower())
if not alias:
yield
return
if target.startswith('#') and target.lower() not in alias['channels']:
yield
return
tags = alias['tags']
if '{args}' in tags:
tags = tags.replace('{args}', ' '.join(msg_args[1:]))
try:
yield True
finally:
self.send_random(tags, target)
@contextmanager
def user_help(self, message):
args = list(filter(None, message.GetText().split()))
target = message.GetTarget()
if len(args) == 0 or not args[0].startswith('!!help'):
yield
return
config = self.get_client_options()
allowed_channels = config.get('help_in_channels')
if not allowed_channels or target.lower() not in allowed_channels:
yield
return
text = ''
if len(args) > 1:
alias = config['alias'].get(args[1])
if not alias:
text = f'Could not find alisa {args[1]}, use "!!help" to list all available aliases'
else:
text = f'The alias "{args[1]}" is available in the channels: {", ".join(alias["channels"])} and uses these tags: "{alias["tags"]}"'
else:
if not config['alias']:
text = 'No aliases are configured atm'
else:
text = 'Available aliases: %s' % ', '.join(config['alias'])
try:
yield True
finally:
self.send_both_ways(text, target)
def message_hook(self, message):
hooks = [self.user_help, self.check_and_send]
for hook in hooks:
with hook(message) as action:
if action:
self.pass_through(message)
return True
def update_help_in_channels_command(self, args):
channels = sorted(set([c.lower() for c in args if c.startswith('#')]))
config = self.get_client_options()
config['help_in_channels'] = channels
self.set_client_options(config)
if not channels:
self.inform_user('!!help: Disabled')
else:
self.inform_user(f'!!help: Enabled for: {", ".join(channels)}')
def show_help_channels(self):
config = self.get_client_options()
channels = config.get('help_in_channels')
if not channels:
self.inform_user('!!help: Disabled')
else:
self.inform_user(f'!!help: Enabled for: {", ".join(channels)}')
@catchfail
def OnModCommand(self, command):
self.debug_hook()
args = command.lower().split()
if args[0] in ['add', 'update'] and len(args) > 1:
self.add_alias_command(args[1:])
elif args[0] == 'add-channel' and len(args) > 1:
self.add_channel_to_all_command(args[1:])
elif args[0] == 'remove-channel' and len(args) > 1:
self.remove_channel_from_all(args[1:])
elif args[0] == 'set-help':
self.update_help_in_channels_command(args[1:])
elif args[0] == 'list-help':
self.show_help_channels()
elif args[0] == 'remove' and len(args) > 1:
self.remove_alias_command(args[1:])
elif args[0] == 'list':
self.list_config_command()
else:
self.show_usage_command()
return znc.CONTINUE
@catchfail(znc.CONTINUE)
def OnUserTextMessage(self, message):
"""
This module hook is called when a user sends a PRIVMSG message.
"""
self.debug_hook()
return znc.HALT if self.message_hook(message) else znc.CONTINUE
@catchfail(znc.CONTINUE)
def OnChanTextMessage(self, message):
"""
Called when we receive a channel PRIVMSG message from IRC.
"""
self.debug_hook()
return znc.HALT if self.message_hook(message) else znc.CONTINUE
# DEBUGGING HOOKS
# ===============
def debug_hook(self):
"""
Dumps parent calling method name and its arguments to debug logfile.
"""
if self.hook_debugging is not True:
return
frameinfo = inspect.stack()[1]
argvals = frameinfo.frame.f_locals
messages = []
messages.append('Called method: ' + frameinfo.function + '()')
for argname in argvals:
if argname == 'self':
continue
messages.append(' ' + argname + ' -> ' + pprint.pformat(argvals[argname]))
messages.append('')
self.internal_log.debug(*messages)
class InternalLog:
def __init__(self, save_path: str):
self.save_path = save_path
def _write_to(self, file, *text):
with self.open(file) as file:
text = map(lambda t: str(t).rstrip('\n') + '\n', text)
file.writelines(text)
def debug(self, *text):
self._write_to('debug', *text)
def error(self, *text):
self._write_to('error', *text)
def open(self, level: str):
target = open(os.path.join(self.save_path, level + '.log'), 'a')
line = 'Log opened at: {} UTC\n'.format(datetime.utcnow())
target.write(line)
target.write('=' * len(line) + '\n\n')
return target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment