Skip to content

Instantly share code, notes, and snippets.

@lukemarsden
Created September 8, 2011 12:55
Show Gist options
  • Save lukemarsden/1203328 to your computer and use it in GitHub Desktop.
Save lukemarsden/1203328 to your computer and use it in GitHub Desktop.
Twisted Spread integration
# Released under BSD 2-clause license by Luke Marsden
from twisted.internet import reactor
from twisted.application import service
from twisted.python import log
import spread
import zlib
def hex2ip(sender):
_,hex,priv = sender.split('#')
quads = [hex[0:2],hex[2:4],hex[4:6],hex[6:8]]
ip = '.'.join(map(lambda x: str(int(x,16)),quads))
return ip
class Spreader(service.Service):
mailbox = None
privateGroup = None
connected = False
mboxfd = None
def __init__(self, config):
self.spreadName = "%s@%s"%(config['spread_port'], config['spread_host'])
self.privateName = str(config['my_ip'])
self.priority = 0
self.membership = int(config['spread_membership'])
self.group = config['spread_group']
def startService(self):
service.Service.startService(self)
self.connect()
self.checkConnection()
def checkConnection(self):
"""
Checks whether we are still connected to Spread (sometimes Spread kicks
us off if we can't receive fast enough. If this happens, reconnect
after a couple of seconds.
"""
if not self.connected:
self.connect()
reactor.callLater(1, self.checkConnection)
def stopService(self):
service.Service.stopService(self)
self.disconnect()
def regularMessageReceived(self, message):
#print inspect.getmembers(message)
try:
self.messageReceived(message.sender, message.groups, message.message,
message.msg_type, message.endian)
except:
log.err(None, "Problem dispatching regular spread message")
def membershipMessageReceived(self, message):
#print inspect.getmembers(message)
if message.reason == 0:
self.groupTransitioned(message.group)
elif message.reason == spread.CAUSED_BY_JOIN:
self.groupJoined(message.group, message.members)
self.groupChanged(message.group, message.members, message.orig_members)
elif (message.reason == spread.CAUSED_BY_LEAVE
or message.reason == spread.CAUSED_BY_DISCONNECT):
self.groupLeft(message.group, message.members)
self.groupChanged(message.group, message.members, message.orig_members)
else:
print "Unknown membership message:", str(message)
def messageReceived(self, sender, groups, message, msg_type, extra):
#log.debug("messageReceived: %s %s %s", sender, groups, message)
pass
def groupTransitioned(self, group):
#log.debug("groupTransitioned: %s", group)
pass
def groupJoined(self, group, who):
#log.debug("groupJoined: %s %s", group, who)
pass
def groupLeft(self, group, who):
#log.debug("groupLeft: %s %s", group, who)
pass
def groupChanged(self, group, current, past):
#log.debug("groupChanged: %s %s", group, current)
pass
def connectionLost(s,reason):
print reason
s.disconnect()
def connect(self, tries = 10):
print "Spreader: connecting to spread"
try:
self.mailbox = spread.connect(self.spreadName, self.privateName,
self.priority, self.membership)
except Exception, e: # Sometimes we reconnect too soon, in which case Spread still thinks we're connected - so just disconnect, and we'll try again
print "**", e
self.disconnect()
return
self.privateGroup = self.mailbox.private_group
self.join(self.group)
class MailboxFD:
def __init__(s,mailbox):
s.mailbox = mailbox
def fileno(s):
if self.connected:
try:
mboxno = s.mailbox.fileno()
if type(mboxno) == int:
return mboxno
else:
self.disconnect()
print '** Mailbox is not an integer - disconnected!'
except Exception, e:
self.disconnect()
print "** !! ", e
else:
# Don't even bother trying
print "** Refused to try and get file descriptor when we're not connected. Told Twisted to stop listening."
self.disconnect()
def doRead(s):
try:
m = s.mailbox.receive()
if isinstance(m, spread.RegularMsgType):
self.regularMessageReceived(m) # self refers to the object in the outer lexical scope
elif isinstance(m, spread.MembershipMsgType):
self.membershipMessageReceived(m)
except spread.error, e: # Spread might chuck us off because we haven't read enough
print "CRITICAL Exception in spreader: ", str(e)
self.disconnect()
raise e
def connectionLost(s,reason): # is this called?
print "CRITICAL Spreader Connection Lost: "+str(reason)
self.disconnect()
def logPrefix(s):
return 'Spread Mailbox'
self.mboxfd = MailboxFD(self.mailbox)
reactor.addReader(self.mboxfd)
self.connected = True
def disconnect(self):
print "Spreader: disconnecting from spread"
self.connected = False
if self.mailbox:
self.mailbox.disconnect()
self.mailbox = None
self.privateGroup = None
if self.mboxfd:
reactor.removeReader(self.mboxfd)
def join(self, group):
print "Spreader: joining mailbox"
if not self.mailbox:
raise Exception("no mailbox")
self.mailbox.join(group)
def leave(self, group):
print "Spreader: leaving mailbox"
if not self.mailbox:
raise Exception("no mailbox")
self.mailbox.leave(group)
def multicast(self, message, service_type=spread.SAFE_MESS, group='yourgroup', message_type=0):
assert isinstance(message, basestring), ( # TODO: Maybe we don't want unicode in here?
"Message passed to spreader multicast is not a string, it was a %r" % (type(message),))
if self.connected:
if not self.mailbox:
raise Exception("no mailbox")
try:
preCompressionLength = len(message)
startOfMessage = message[:100]
message = zlib.compress(message)
postCompressionLength = len(message)
if postCompressionLength > 10 * 1000: # 10kb, Spread message limit is 130kb
print ("large_messages",
"After compression: %i bytes (before %i), start of message: %s" %
(postCompressionLength, preCompressionLength, startOfMessage))
if postCompressionLength > 100 * 1000: # 100kb, Spread gets unhappy at 130kb
print "CRITICAL Dropping message > 100kb to avoid making Spread unhappy:",
print startOfMessage
else:
return self.mailbox.multicast(service_type, group, message, message_type)
except Exception, e:
self.disconnect()
print "**", e
else:
print "** Tried to send message when not connected. The following message has been lost:"
print message[:100]
def multigroup_multicast(self, *a, **kw):
if not self.mailbox:
raise Exception("no mailbox")
return self.mailbox.multigroup_multicast(*a, **kw)
class InteractingSpreader(Spreader):
def __init__(self, sm, my_ip, spread_group=False, group_notifier=False):
self.sm = sm
self.group_notifier = group_notifier
# convert my_ip to hex
hex = ''.join(map(lambda x: "%02X" % int(x),my_ip.split('.')))
optParameters = {
'spread_host': 'localhost',
'spread_port': str(spread.DEFAULT_SPREAD_PORT),
'spread_membership': '1',
'spread_group': spread_group or 'yourgroup',
'my_ip' : hex
}
Spreader.__init__(self, optParameters)
def messageReceived(self, sender, groups, message, msg_type, extra):
self.sm.parser.handle_xml(zlib.decompress(message), hex2ip(sender))
def groupTransitioned(self, group):
if self.group_notifier:
self.group_notifier.groupTransitioned(group)
def groupJoined(self, group, who):
if self.group_notifier:
self.group_notifier.groupJoined(group,who)
def groupLeft(self, group, who):
if self.group_notifier:
self.group_notifier.groupLeft(group,who)
def groupChanged(self, group, current, past):
if self.group_notifier:
self.group_notifier.groupChanged(group,current,past)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment