Skip to content

Instantly share code, notes, and snippets.

@photofroggy
Created November 30, 2011 13:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save photofroggy/1409041 to your computer and use it in GitHub Desktop.
Save photofroggy/1409041 to your computer and use it in GitHub Desktop.
Experimenting with subcommands. Really icky code atm. Needs plenty of revision if it is ever going to be a reasonable option.
''' Sub commands
Hackish experiment in supporting sub commands using
reflex as a base. Thrown together pretty quickly.
TODO:
- clean up?
Based on the command stuff found in Terra.
'''
import copy
import traceback
from functools import wraps
from reflex.data import Event
from reflex.data import Binding
from reflex.base import Ruleset
class ExistingSubcommand(Exception):
pass
class SubcommandNotFound(Exception):
pass
class InvalidCommand(Exception):
pass
class CommandRuleset(Ruleset):
def init(self):
self.index = {}
def bind(self, meth, event, **options):
""" Creates a command binding.
This method attempts to create a command binding for the given
method. If successful, this method returns the command binding.
On failure, ``None`` is returned.
"""
if not options:
return None
if not options['cmd']:
return None
if not 'command' in self.mapref:
self.mapref['command'] = []
# Check if the command binding already exists.
for binding in self.mapref['command']:
if (binding.call, binding.options) is (meth, options):
return None
new_cmd = None
# Try adding a command or sub command when given a single string for the
# command name.
if isinstance(options['cmd'], str):
rcmd = options['cmd'].split(' ')
cmd = rcmd.pop(0)
if cmd.lower() in self.index:
new_cmd = self.bind_sub(meth, cmd, rcmd, options)
else:
new_cmd = CommandBinding(meth, options)
self.index_binding(new_cmd, options['cmd'])
return new_cmd
def bind_sub(self, meth, cmd, rcmd, options):
if len(rcmd) == 0:
return None
try:
return self.mapref['command'][self.index[cmd.lower()]].add(meth, copy.copy(rcmd), options)
except ExistingSubcommand as e:
self.debug('>> ExistingSubcommand: Subcommand \'{0} {1}\' already exists'.format(cmd, ' '.join(rcmd)))
def index_binding(self, binding, cmd):
if binding is None:
return
# binding.set_privs(self.user.groups)
if binding.sub:
return
self.mapref['command'].append(binding)
self.index[cmd.lower()] = len(self.mapref['command']) - 1
def unbind(self, meth, event, **options):
""" Remove a command binding.
This method attempts to remove a command binding for the given
method. If successful, ``True`` is returned. On failure, ``False``
is returned.
"""
if not options:
return False
scmd = options['cmd'].lower().split()
cmd = scmd.pop(0)
if 'command' in self.mapref.keys():
if not cmd in self.index.keys():
return False
if len(scmd) > 0:
try:
return self.mapref['command'][self.index[cmd]].remove(meth, copy.copy(scmd), options)
except SubcommandNotFound as e:
self.debug('>> SubcommandNotFound: Subcommand \'{0} {1}\' not found'.format(cmd, ' '.join(scmd)))
return False
del self.mapref['command'][self.index[cmd]]
if len(self.mapref['command']) == 0:
del self.mapref['command']
self.sort_index()
return True
def sort_index(self):
temp = self.index
self.index = {}
for name in temp:
for key, binding in enumerate(self.mapref['command']):
if name != binding.options['cmd'].lower():
continue
self.index[name] = key
continue
def trigger(self, event, *args):
"""Trigger a command."""
try:
if not event.trigger.lower() in self.index.keys():
self.debug('>> No such command as \'{0}\''.format(event.trigger))
return None
except AttributeError:
self.debug('>> Invalid command provided')
return None
return self.run(self.mapref['command'][self.index[event.trigger.lower()]], event, *args)
def run(self, binding, event, index=1, last=None, *args):
"""Attempt to run a command's event binding."""
cmd = event.arguments(index).lower()
last = last or event.trigger.lower()
for key, value in binding.options.items():
if not value:
continue
if key == 'cmd' and value.lower() == last:
if len(cmd) > 0 and cmd in binding.subs:
return self.run(binding.subs[cmd], event, index + 1, cmd, *args)
continue
if key == 'priv':
if not self.privd(data.user, binding.level, data.trigger):
return None
continue
if key == 'channel':
'''if dAmn.format_ns(str(option)).lower() == str(data.ns).lower():
continue
return None'''
continue
try:
# Process event item
item = getattr(event, key)
except AttributeError:
return None
# Try to match as a string
try:
if str(item).lower() == str(value).lower():
continue
except Exception:
pass
# Are they the same type?
if type(item) != type(value):
return None
# Do they hold the same value?
if item == value:
continue
return None
sub = event.arguments(1)
if sub == '?':
if binding.options and binding.options['help']:
#dAmn.say(event.target, ': '.join([str(event.user), binding.options['help']]))
return None
#dAmn.say(data.target, event.user+': There is no information for this command.')
return None
'''if self.debug:
self._write('** Running command \''+data.trigger+'\' for '+str(data.user)+'.')'''
try:
binding.call(event, *args)
except Exception as e:
log = self.debug
log('>> Failed to execute command "{0}"!'.format(event.trigger))
log('>> Error:')
tb = traceback.format_exc().splitlines()
for line in tb:
log('>> {0}'.format(line))
return None
def privd(self, user, level, cmd):
return True
class CommandBinding(Binding):
"""Command Binding class."""
def __init__(self, method, options, sub=False):
super(CommandBinding, self).__init__(method, 'command', options)
self.subs = {}
self.sub = sub
cmd = self.options['cmd'] if isinstance(self.options['cmd'], str) else self.options['cmd'][0]
scmd = cmd.split(' ')
if len(scmd) != 1:
raise ValueError
self.type = '<event[\'command:{0}\'].binding>'.format(cmd)
def set_privs(self, groups):
grp = 'Guests'
if len(self.options) > 1:
grp = self.options.get('priv', 'Guests')
if grp != 'Guests':
grp = groups.find(grp, True)
if grp is None:
grp = 'Guests'
self.group = grp
self.level = groups.find(grp)
def add(self, method, scmd, options):
""" Add a sub command to this command. """
try:
cmd = scmd.pop(0)
except IndexError as e:
# Raise a custom exception here?
return None
if cmd.lower() in self.subs:
# Raise an exception here?
# - also sub-sub commands?
if len(scmd) == 0:
raise ExistingSubcommand('Subcommand \'{0}\' already exists'.format(cmd))
try:
return self.subs[cmd.lower()].add(method, copy.copy(scmd), options)
except ExistingSubcommand as e:
raise ExistingSubcommand('Subcommand \'{0} {1}\' already exists'.format(cmd, ' '.join(scmd)))
options['cmd'] = cmd
binding = CommandBinding(method, options, True)
self.subs[cmd.lower()] = binding
return binding
def remove(self, method, scmd, options):
""" Remove a sub command from this command. """
try:
cmd = scmd.pop(0).lower()
except IndexError as e:
# Raise a custom exception here?
raise SubcommandNotFound('Subcommand not found')
if cmd in self.subs:
if len(scmd) > 0:
return self.subs[cmd].remove(method, copy.copy(scmd), options)
if self.subs[cmd].call == method:
del self.subs[cmd]
return True
raise SubcommandNotFound('Subcommand \'{0} {1}\' not found'.format(cmd, ' '.join(scmd)))
def arguments(data=False, start=0, finish=False, separator=' ', sequence=False):
""" This method extracts specific arguments from a message. """
if data == '':
return '' if not sequence else ['']
data = data.split(separator)
if len(data) <= start:
return '' if not sequence else ['']
if isinstance(finish, bool) or finish == None:
data = data[start:] if finish else [data[start]]
else:
data = data[start:] if len(data) <= finish else data[start:finish]
return data if sequence else separator.join(data)
class Command(Event):
"""Command event class."""
def init(self, event, data):
self.arguments = self._args_wrapper()
def __str__(self):
return '<event[\'command:'+self.trigger+'\']>'
def _args_wrapper(self):
@wraps(arguments)
def wrapper(start=0, finish=False, separator=' ', sequence=False):
return arguments(self.message, start, finish, separator, sequence)
return wrapper
# EOF
''' Testing out the subcmd stuff.
Only a crappy brief example and stuffs.
'''
import sys
from reflex.base import Reactor
from reflex.control import EventManager
from subcmd import CommandRuleset
from subcmd import Command
class Example(Reactor):
def init(self):
self.bind(self.cmd, 'command', cmd='cmd')
self.bind(self.scmd, 'command', cmd='cmd sub')
self.bind(self.multi, 'command', cmd='cmd sub lol')
self.bind(self.multi, 'command', cmd='cmd sub lol')
self.bind(self.multi, 'command', cmd='cmd sub wot')
self.unbind(self.multi, 'command', cmd='cmd sub wot')
self.unbind(self.multi, 'command', cmd='cmd sub wot')
def cmd(self, event, **args):
print 'hi'
def scmd(self, event, **args):
print 'sub lol'
def multi(self, event, **args):
print 'subsub?'
def writeout(msg=''):
sys.stdout.write('{0}\n'.format(msg))
sys.stdout.flush()
events = EventManager(stddebug=writeout)
events.define_rules('command', CommandRuleset)
e = Example(events)
print 'bound commands:'
for item in events.map['command']:
print '>', item.options['cmd']
if len(item.subs) > 0:
for name in item.subs:
print '> {0} {1}'.format(item.options['cmd'], name)
if len(item.subs[name].subs) > 0:
for sname in item.subs[name].subs:
print '> {0} {1} {2}'.format(item.options['cmd'], name, sname)
cmd = Command('command', [
('trigger', 'cmd'),
('channel', 'chat:Botdom'),
('user', 'photofroggy'),
('message', 'cmd sub lol'),
('priv', 'Guests')])
print '-------------------------'
print 'firing commands'
print '> expecting \'subsub?\''
events.trigger(cmd)
cmd.message = 'cmd sub foo'
print '> expecting \'sub lol\''
events.trigger(cmd)
cmd.message = 'foo'
print '> expecting \'hi\''
events.trigger(cmd)
cmd.trigger = 'foo'
print '> expecting command not found'
events.trigger(cmd)
# EOF
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment