Skip to content

Instantly share code, notes, and snippets.

@topia
Created April 18, 2009 02:27
Show Gist options
  • Save topia/97396 to your computer and use it in GitHub Desktop.
Save topia/97396 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# $URL$
# $Id$
"""Tiny Python Console for TwitterIrcGateway.
This script require IronPython 2.0 bundled or CPython 2.5 (not 2.6) bundled
pure python part of python standard library:
codeop, traceback, __future__, os, os.path (and ntpath), linecache,
stat, errno, copy_reg, types, UserDict
"""
__docformat__ = 'reStructuredText'
__author__ = 'Topia <topia@clovery.jp>'
__version__ = '$Revision$'
__date__ = '$Date$'
__credits__ = """copyright (C) 2009 Topia <topia@clovery.jp>. all rights reserved."""
import sys
import clr
from Misuzilla.Net.Irc import *
sys.path.append(r"Libraries\IronPython")
from codeop import CommandCompiler
import traceback
## TODO:
## * better multiline handling
class IRCWriter(object):
def __init__(self, session, prefix, target, notice=True):
self.session = session
self.prefix = prefix
if notice:
self.msgclass = NoticeMessage
else:
self.msgclass = PrivMsgMessage
self.target = target
self.buffer = ''
def write(self, text):
buf = self.buffer + text
try:
while True:
if "\n" not in buf:
break
line, buf = buf.split("\n", 1)
ircmsg = self.msgclass(self.target, line.rstrip())
ircmsg.Sender = self.prefix
self.session.Send(ircmsg)
finally:
self.buffer = buf
def flush(self):
if self.buffer:
self.write("\n")
class ConsoleHandler(object):
channel_name = '#pyconsole'
def __init__(self, locals=None):
self.compile = CommandCompiler()
self.resetbuffer()
if locals is None:
locals = {"__name__": "__console__", "__doc__": None,
"console": self}
self.locals = locals
def attach(self, session=Session, server=Server):
self.server = server
self.session = session
self.session.PreMessageReceived += self.onPreMessageReceived
self.session.PostMessageReceived += self.onPostMessageReceived
chname = self.channel_name
self.userhost = 'twitter@' + server.ServerName
self.stdout = IRCWriter(session, 'stdout!' + self.userhost, chname)
self.stderr = IRCWriter(session, 'stderr!' + self.userhost, chname)
def detach(self):
self.session.PreMessageReceived -= self.onPreMessageReceived
self.session.PostMessageReceived -= self.onPostMessageReceived
self.session = self.server = None
self.stdout = self.stderr = None
def send_notify(self, nick, msg):
ircmsg = NoticeMessage(self.channel_name, msg)
ircmsg.SenderHost = self.userhost
ircmsg.SenderNick = nick
self.session.Send(ircmsg)
def onPreMessageReceived(self, sender, eventargs):
msg = eventargs.Message
if (not isinstance(msg, (PrivMsgMessage, NoticeMessage)) or
msg.Receiver != self.channel_name):
return
eventargs.Cancel = True;
self.process_line(msg.Content)
def onPostMessageReceived(self, sender, eventargs):
msg = eventargs.Message
if (not isinstance(msg, JoinMessage) or
msg.Channel != self.channel_name):
return
Session.Groups[self.channel_name].IsSpecial = True;
def resetbuffer(self):
self.buffer = []
self.inmultiline = False
def process_line(self, content):
if not content.strip():
content = ''
more = self.push(content)
if more and not self.inmultiline:
self.send_notify('...', 'when insert blank (normally last) line, enter space only line')
self.inmultiline = True
def push(self, content):
self.buffer.append(content)
source = "\n".join(self.buffer)
more = self.runsource(source)
if not more:
self.resetbuffer()
return more
def runsource(self, source):
if source[:1] in [' ', '\t']:
source = 'if 1:\n%s' % source
try:
code = self.compile(source)
except (OverflowError, SyntaxError, ValueError, TypeError):
self.send_notify('failure-source', repr(source))
for line in traceback.format_exc().splitlines():
self.send_notify('failure', line)
return None
if code is None:
return True
self.runcode(code)
return False
def runcode(self, code):
__builtins__['_'] = None
iobackup = sys.stdin, sys.stdout, sys.stderr
try:
sys.stdin = None
sys.stdout = self.stdout
sys.stderr = self.stderr
try:
try:
exec code in self.locals
finally:
if sys.stdout:
sys.stdout.flush()
if sys.stderr:
sys.stderr.flush()
except Exception, e:
for line in traceback.format_exc().splitlines():
self.send_notify('failure', line)
else:
if _ is None and self.inmultiline:
self.send_notify('success', '(accepted)')
finally:
sys.stdin, sys.stdout, sys.stderr = iobackup
console_locals = {"__name__": "__console__", "__doc__": None}
console = ConsoleHandler(console_locals)
console_locals['console'] = console
console_locals['Session'] = Session
console_locals['Server'] = Server
console_locals.update(sys.modules)
console.attach(Session, Server)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment