Skip to content

Instantly share code, notes, and snippets.

from twisted.internet import main, reactor
from twisted.internet.abstract import FileDescriptor
from twisted.internet.error import ConnectionDone
from twisted.internet.fdesc import writeToFD
from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
from twisted.internet.protocol import Protocol
from twisted.python import log
from twisted.python.filepath import FilePath
from zope.interface import implements
import errno
@shylent
shylent / gist:721292
Created November 30, 2010 07:21
fifo listener
from twisted.internet import main, reactor
from twisted.internet.abstract import FileDescriptor
from twisted.internet.error import ConnectionDone
from twisted.internet.fdesc import setNonBlocking
from twisted.internet.interfaces import IReadDescriptor
from twisted.internet.protocol import Protocol
from twisted.python import log
from twisted.python.failure import Failure
from twisted.python.filepath import FilePath
from zope.interface import implements
'''
@author: shylent
'''
from twisted.internet import reactor
from twisted.python import log
class Spent(Exception):
pass
class ExpectTelnetWrapper(object):
implements(ITelnetProtocol)
def __init__(self, expect):
"""
@param expect: Instance of L{Expect} class, that we are wrapping
@type expect: L{Expect}
"""
self.expect = expect
self.expect.transport = self
def handle_foo(request):
format_handlers = {
'xml':handle_foo_xml
}
format = request.GET.get('format', None)
if format is not None and format in format_handlers:
return format_handlers[format](request)
...
def handle_foo_xml(request):
from twisted.trial import unittest
from twisted.python import log
class DummyObserver(object):
def start(self):
log.addObserver(self.emit)
def stop(self):
log.removeObserver(self.emit)