Skip to content

Instantly share code, notes, and snippets.

@saivenkat
Created March 10, 2010 12:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save saivenkat/327825 to your computer and use it in GitHub Desktop.
Save saivenkat/327825 to your computer and use it in GitHub Desktop.
MessagePub notification for buildbot build
'''
Example Usage:
from buildbot.status import messagepub
c['status'].append(messagepub.Notifier(recipient="a@b.c", key="Your messagepub key"))
'''
import urllib, base64
from zope.interface import implements
from twisted.internet import defer
from twisted.web import client, error
from twisted.python import log
from buildbot import interfaces
from buildbot.status import base
from buildbot.status.builder import FAILURE, SUCCESS, Results
class Payload:
def __init__(self, recipient, channel="email", escalation=1, subject="", body=""):
self.recipient = recipient
self.channel = channel
self.escalation = str(escalation)
self.subject = subject
self.body = body
def to_s(self):
return '''<notification><body><![CDATA[%s]]></body><subject><![CDATA[%s]]></subject><escalation>%s</escalation><recipients><recipient><position>1</position><channel>%s</channel><address>%s</address></recipient></recipients></notification>''' % (self.body, self.subject, self.escalation, self.channel, self.recipient)
class Message:
def __init__(self, name, build, results, master_status):
self.name = name
self.build = build
self.results = results
self.master_status = master_status
def subject(self):
return "buildbot %(result) in %(projectName) on %(builder)" % {'result' : Results[self.results],
'projectName' : self.master_status.getProjectName(),
'builder' : self.name}
def body(self):
result = Results[self.results]
text = "The Buildbot has finished a build"
text += "Awesome devs responsible: %s\n" % ",".join(self.build.getResponsibleUsers())
text += "\n"
t = self.build.getText()
if t:
t = ": " + " ".join(t)
else:
t = ""
if result == 'success':
text += "Build succeeded!\n"
elif result == 'warnings':
text += "Build Had Warnings%s\n" % t
else:
text += "BUILD FAILED%s\n" % t
text += "\n"
text += "Love,\n"
text += " -The Buildbot\n"
text += "\n"
return text
class Notifier(base.StatusReceiverMultiService):
def __init__(self, recipient='', key=''):
base.StatusReceiverMultiService.__init__(self)
self.mode = "all"
self.recipient = recipient
self.subject = "buildbot %(result) in %(projectName) on %(builder)"
self.master_status = None
self.key = key
self.message_pub = "http://messagepub.com/notifications.xml"
def _transmit(self, message):
basic_auth = base64.encodestring("%s:%s" %(self.key, 'password'))
auth_header = "Basic " + basic_auth.strip()
driver = client.getPage(self.message_pub,
method='POST',
postdata= message,
headers={'Content-Type' : 'text/xml',
'Authorization' : auth_header})
driver.addCallback(lambda r: log.msg(r))
driver.addErrback(lambda e: log.err(e))
def builderAdded(self, builderName, builder):
pass
def builderRemoved(self, builderName, builder):
pass
def buildStarted(self, builderName, builder):
pass
def buildFinished(self, builderName, build, results):
message = Message(builderName, build, results, self.master_status)
payload = Payload(self.recipient, subject=message.subject(), body=message.body())
self._transmit(payload.to_s())
def stepStarted(self, build, step):
pass
def stepFinished(self, build, step, results):
pass
def setServiceParent(self, parent):
base.StatusReceiverMultiService.setServiceParent(self, parent)
self.master_status = self.parent.getStatus()
self.master_status.subscribe(self)
def disownServiceParent(self):
self.master_status.unsubscribe(self)
return base.StatusReceiverMultiService.disownServiceParent(self)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment