Skip to content

Instantly share code, notes, and snippets.

@gfranxman
Created March 1, 2012 17:04
Show Gist options
  • Save gfranxman/1951430 to your computer and use it in GitHub Desktop.
Save gfranxman/1951430 to your computer and use it in GitHub Desktop.
twisted q
from twisted.internet import reactor, protocol
from twisted.protocols import policies
from twisted.protocols import basic
import queue
WAITING = 'waiting'
COLLECTING = 'collecting'
IDLE = 'idle'
singletonQueue = queue.Queue()
class QueueProtocol( basic.LineReceiver, policies.TimeoutMixin ):
def connectionMade( self ):
self.setTimeout( 30 )
self.mode = 'cmd'
self.data = ''
self.reserved_items = list()
def lineReceived( self, line ):
self.resetTimeout()
if self.mode == 'cmd':
if line == 'next':
self.do_NEXT()
elif line == 'reservenext':
self.do_RESERVENEXT()
elif line.startswith( "dispose:" ):
item_id = line[ len('dispose:'):]
self.do_DISPOSE( item_id )
elif line.startswith( 'requeue:' ):
item_id = line[ len('requeue:'):]
self.do_REQUEUE( item_id )
elif line.startswith( 'enqueue:' ):
payload = line[ len('enqueue:'):]
self.do_ENQUEUE( payload )
elif line.startswith( 'menqueue:' ):
self.data = line[ len('menqueue:'):]
self.mode = 'data'
elif line == 'peek':
self.do_PEEK()
else:
self.sendLine( "unknown cmd" )
self.transport.loseConnection()
elif self.mode == 'data':
if line == '.':
self.do_MULTILINE_ENQUEUE( self.data )
self.mode = 'cmd'
self.data = ''
else:
self.data += line + "\n"
def connectionLost( self, reason ):
print "my connection was lost", reason
# put anything reserved back
for i in self.reserved_items:
if i.isReserved():
print "returning %s to the queue" % i._id
i.clear_reservation()
self.mode = 'cmd'
self.data = ''
def timeoutConnection( self ):
self.sendLine( "Macht schnell!" )
for i in self.reserved_items:
if i.isReserved():
self.sendLine( "returning %s to the queue" % i._id )
self.transport.loseConnection()
def _send_item( self, item, status ):
self.sendLine( "ID:%s %s" % ( item._id, status ) )
self.sendLine( "---------- HEAD:%s ----------" % item._id )
self.sendLine( item.contents )
self.sendLine( "---------- TAIL:%s ----------" % item._id )
def do_PEEK( self ):
for item in singletonQueue._items:
self._send_item( item, item.isReserved() )
def do_NEXT( self ):
try:
item = singletonQueue.next()
item.dispose()
self._send_item( item, "REMOVED" )
except StopIteration, empty:
from time import sleep
sleep( 3 )
self.sendLine( "try again?" )
def do_RESERVENEXT( self ):
try:
item = singletonQueue.next()
self.reserved_items.append( item ) # remember the item. if we loose the connection, we'll clear the reservation
self.sendLine( "#here ya go ( don't forget to dispose or requeue it )" )
self._send_item( item, "RESERVED" )
except StopIteration, empty:
from time import sleep
sleep( 3 )
self.sendLine( "try again?" )
def do_DISPOSE( self, item_id ):
for n, i in enumerate( self.reserved_items ):
if i._id == item_id:
del self.reserved_items[ n ]
i.dispose()
self.sendLine( "disposed ID:%s" % i._id)
return
self.sendLine( "could not dispose ID:%s" % item_id )
def do_REQUEUE( self, item_id ):
for n, i in enumerate( self.reserved_items ):
if i._id == item_id:
del self.reserved_items[n]
item = singletonQueue.requeue( item_id )
self.sendLine( "requeued ID:%s" % item._id)
return
self.sendLine( "could not requeue ID:%s" % item_id )
def do_ENQUEUE( self, first_line_of_payload ):
pass
id = singletonQueue.enqueue( first_line_of_payload )
self.sendLine( "queued ID:%s" % id )
def do_MULTILINE_ENQUEUE( self, data ):
id = singletonQueue.enqueue( data )
self.sendLine( "queued ID:%s" % id )
class QueueServerFactory( protocol.ServerFactory ):
protocol = QueueProtocol
if __name__ == '__main__':
port = 50001
reactor.listenTCP( port, QueueServerFactory() )
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment