Created
November 30, 2011 13:23
-
-
Save photofroggy/1409041 to your computer and use it in GitHub Desktop.
Experimenting with subcommands. Really icky code atm. Needs plenty of revision if it is ever going to be a reasonable option.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' Sub commands | |
Hackish experiment in supporting sub commands using | |
reflex as a base. Thrown together pretty quickly. | |
TODO: | |
- clean up? | |
Based on the command stuff found in Terra. | |
''' | |
import copy | |
import traceback | |
from functools import wraps | |
from reflex.data import Event | |
from reflex.data import Binding | |
from reflex.base import Ruleset | |
class ExistingSubcommand(Exception): | |
pass | |
class SubcommandNotFound(Exception): | |
pass | |
class InvalidCommand(Exception): | |
pass | |
class CommandRuleset(Ruleset): | |
def init(self): | |
self.index = {} | |
def bind(self, meth, event, **options): | |
""" Creates a command binding. | |
This method attempts to create a command binding for the given | |
method. If successful, this method returns the command binding. | |
On failure, ``None`` is returned. | |
""" | |
if not options: | |
return None | |
if not options['cmd']: | |
return None | |
if not 'command' in self.mapref: | |
self.mapref['command'] = [] | |
# Check if the command binding already exists. | |
for binding in self.mapref['command']: | |
if (binding.call, binding.options) is (meth, options): | |
return None | |
new_cmd = None | |
# Try adding a command or sub command when given a single string for the | |
# command name. | |
if isinstance(options['cmd'], str): | |
rcmd = options['cmd'].split(' ') | |
cmd = rcmd.pop(0) | |
if cmd.lower() in self.index: | |
new_cmd = self.bind_sub(meth, cmd, rcmd, options) | |
else: | |
new_cmd = CommandBinding(meth, options) | |
self.index_binding(new_cmd, options['cmd']) | |
return new_cmd | |
def bind_sub(self, meth, cmd, rcmd, options): | |
if len(rcmd) == 0: | |
return None | |
try: | |
return self.mapref['command'][self.index[cmd.lower()]].add(meth, copy.copy(rcmd), options) | |
except ExistingSubcommand as e: | |
self.debug('>> ExistingSubcommand: Subcommand \'{0} {1}\' already exists'.format(cmd, ' '.join(rcmd))) | |
def index_binding(self, binding, cmd): | |
if binding is None: | |
return | |
# binding.set_privs(self.user.groups) | |
if binding.sub: | |
return | |
self.mapref['command'].append(binding) | |
self.index[cmd.lower()] = len(self.mapref['command']) - 1 | |
def unbind(self, meth, event, **options): | |
""" Remove a command binding. | |
This method attempts to remove a command binding for the given | |
method. If successful, ``True`` is returned. On failure, ``False`` | |
is returned. | |
""" | |
if not options: | |
return False | |
scmd = options['cmd'].lower().split() | |
cmd = scmd.pop(0) | |
if 'command' in self.mapref.keys(): | |
if not cmd in self.index.keys(): | |
return False | |
if len(scmd) > 0: | |
try: | |
return self.mapref['command'][self.index[cmd]].remove(meth, copy.copy(scmd), options) | |
except SubcommandNotFound as e: | |
self.debug('>> SubcommandNotFound: Subcommand \'{0} {1}\' not found'.format(cmd, ' '.join(scmd))) | |
return False | |
del self.mapref['command'][self.index[cmd]] | |
if len(self.mapref['command']) == 0: | |
del self.mapref['command'] | |
self.sort_index() | |
return True | |
def sort_index(self): | |
temp = self.index | |
self.index = {} | |
for name in temp: | |
for key, binding in enumerate(self.mapref['command']): | |
if name != binding.options['cmd'].lower(): | |
continue | |
self.index[name] = key | |
continue | |
def trigger(self, event, *args): | |
"""Trigger a command.""" | |
try: | |
if not event.trigger.lower() in self.index.keys(): | |
self.debug('>> No such command as \'{0}\''.format(event.trigger)) | |
return None | |
except AttributeError: | |
self.debug('>> Invalid command provided') | |
return None | |
return self.run(self.mapref['command'][self.index[event.trigger.lower()]], event, *args) | |
def run(self, binding, event, index=1, last=None, *args): | |
"""Attempt to run a command's event binding.""" | |
cmd = event.arguments(index).lower() | |
last = last or event.trigger.lower() | |
for key, value in binding.options.items(): | |
if not value: | |
continue | |
if key == 'cmd' and value.lower() == last: | |
if len(cmd) > 0 and cmd in binding.subs: | |
return self.run(binding.subs[cmd], event, index + 1, cmd, *args) | |
continue | |
if key == 'priv': | |
if not self.privd(data.user, binding.level, data.trigger): | |
return None | |
continue | |
if key == 'channel': | |
'''if dAmn.format_ns(str(option)).lower() == str(data.ns).lower(): | |
continue | |
return None''' | |
continue | |
try: | |
# Process event item | |
item = getattr(event, key) | |
except AttributeError: | |
return None | |
# Try to match as a string | |
try: | |
if str(item).lower() == str(value).lower(): | |
continue | |
except Exception: | |
pass | |
# Are they the same type? | |
if type(item) != type(value): | |
return None | |
# Do they hold the same value? | |
if item == value: | |
continue | |
return None | |
sub = event.arguments(1) | |
if sub == '?': | |
if binding.options and binding.options['help']: | |
#dAmn.say(event.target, ': '.join([str(event.user), binding.options['help']])) | |
return None | |
#dAmn.say(data.target, event.user+': There is no information for this command.') | |
return None | |
'''if self.debug: | |
self._write('** Running command \''+data.trigger+'\' for '+str(data.user)+'.')''' | |
try: | |
binding.call(event, *args) | |
except Exception as e: | |
log = self.debug | |
log('>> Failed to execute command "{0}"!'.format(event.trigger)) | |
log('>> Error:') | |
tb = traceback.format_exc().splitlines() | |
for line in tb: | |
log('>> {0}'.format(line)) | |
return None | |
def privd(self, user, level, cmd): | |
return True | |
class CommandBinding(Binding): | |
"""Command Binding class.""" | |
def __init__(self, method, options, sub=False): | |
super(CommandBinding, self).__init__(method, 'command', options) | |
self.subs = {} | |
self.sub = sub | |
cmd = self.options['cmd'] if isinstance(self.options['cmd'], str) else self.options['cmd'][0] | |
scmd = cmd.split(' ') | |
if len(scmd) != 1: | |
raise ValueError | |
self.type = '<event[\'command:{0}\'].binding>'.format(cmd) | |
def set_privs(self, groups): | |
grp = 'Guests' | |
if len(self.options) > 1: | |
grp = self.options.get('priv', 'Guests') | |
if grp != 'Guests': | |
grp = groups.find(grp, True) | |
if grp is None: | |
grp = 'Guests' | |
self.group = grp | |
self.level = groups.find(grp) | |
def add(self, method, scmd, options): | |
""" Add a sub command to this command. """ | |
try: | |
cmd = scmd.pop(0) | |
except IndexError as e: | |
# Raise a custom exception here? | |
return None | |
if cmd.lower() in self.subs: | |
# Raise an exception here? | |
# - also sub-sub commands? | |
if len(scmd) == 0: | |
raise ExistingSubcommand('Subcommand \'{0}\' already exists'.format(cmd)) | |
try: | |
return self.subs[cmd.lower()].add(method, copy.copy(scmd), options) | |
except ExistingSubcommand as e: | |
raise ExistingSubcommand('Subcommand \'{0} {1}\' already exists'.format(cmd, ' '.join(scmd))) | |
options['cmd'] = cmd | |
binding = CommandBinding(method, options, True) | |
self.subs[cmd.lower()] = binding | |
return binding | |
def remove(self, method, scmd, options): | |
""" Remove a sub command from this command. """ | |
try: | |
cmd = scmd.pop(0).lower() | |
except IndexError as e: | |
# Raise a custom exception here? | |
raise SubcommandNotFound('Subcommand not found') | |
if cmd in self.subs: | |
if len(scmd) > 0: | |
return self.subs[cmd].remove(method, copy.copy(scmd), options) | |
if self.subs[cmd].call == method: | |
del self.subs[cmd] | |
return True | |
raise SubcommandNotFound('Subcommand \'{0} {1}\' not found'.format(cmd, ' '.join(scmd))) | |
def arguments(data=False, start=0, finish=False, separator=' ', sequence=False): | |
""" This method extracts specific arguments from a message. """ | |
if data == '': | |
return '' if not sequence else [''] | |
data = data.split(separator) | |
if len(data) <= start: | |
return '' if not sequence else [''] | |
if isinstance(finish, bool) or finish == None: | |
data = data[start:] if finish else [data[start]] | |
else: | |
data = data[start:] if len(data) <= finish else data[start:finish] | |
return data if sequence else separator.join(data) | |
class Command(Event): | |
"""Command event class.""" | |
def init(self, event, data): | |
self.arguments = self._args_wrapper() | |
def __str__(self): | |
return '<event[\'command:'+self.trigger+'\']>' | |
def _args_wrapper(self): | |
@wraps(arguments) | |
def wrapper(start=0, finish=False, separator=' ', sequence=False): | |
return arguments(self.message, start, finish, separator, sequence) | |
return wrapper | |
# EOF |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' Testing out the subcmd stuff. | |
Only a crappy brief example and stuffs. | |
''' | |
import sys | |
from reflex.base import Reactor | |
from reflex.control import EventManager | |
from subcmd import CommandRuleset | |
from subcmd import Command | |
class Example(Reactor): | |
def init(self): | |
self.bind(self.cmd, 'command', cmd='cmd') | |
self.bind(self.scmd, 'command', cmd='cmd sub') | |
self.bind(self.multi, 'command', cmd='cmd sub lol') | |
self.bind(self.multi, 'command', cmd='cmd sub lol') | |
self.bind(self.multi, 'command', cmd='cmd sub wot') | |
self.unbind(self.multi, 'command', cmd='cmd sub wot') | |
self.unbind(self.multi, 'command', cmd='cmd sub wot') | |
def cmd(self, event, **args): | |
print 'hi' | |
def scmd(self, event, **args): | |
print 'sub lol' | |
def multi(self, event, **args): | |
print 'subsub?' | |
def writeout(msg=''): | |
sys.stdout.write('{0}\n'.format(msg)) | |
sys.stdout.flush() | |
events = EventManager(stddebug=writeout) | |
events.define_rules('command', CommandRuleset) | |
e = Example(events) | |
print 'bound commands:' | |
for item in events.map['command']: | |
print '>', item.options['cmd'] | |
if len(item.subs) > 0: | |
for name in item.subs: | |
print '> {0} {1}'.format(item.options['cmd'], name) | |
if len(item.subs[name].subs) > 0: | |
for sname in item.subs[name].subs: | |
print '> {0} {1} {2}'.format(item.options['cmd'], name, sname) | |
cmd = Command('command', [ | |
('trigger', 'cmd'), | |
('channel', 'chat:Botdom'), | |
('user', 'photofroggy'), | |
('message', 'cmd sub lol'), | |
('priv', 'Guests')]) | |
print '-------------------------' | |
print 'firing commands' | |
print '> expecting \'subsub?\'' | |
events.trigger(cmd) | |
cmd.message = 'cmd sub foo' | |
print '> expecting \'sub lol\'' | |
events.trigger(cmd) | |
cmd.message = 'foo' | |
print '> expecting \'hi\'' | |
events.trigger(cmd) | |
cmd.trigger = 'foo' | |
print '> expecting command not found' | |
events.trigger(cmd) | |
# EOF |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment