Skip to content

Instantly share code, notes, and snippets.

@zopieux
Last active August 31, 2022 23:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zopieux/8185409 to your computer and use it in GitHub Desktop.
Save zopieux/8185409 to your computer and use it in GitHub Desktop.
Bitbucket webhooks to IRC broker
#!/usr/bin/env python3
# coding: utf-8
# This launches a webserver listening on HTTP_HOST and a IRC client to display the messages.
# Configuration using capital variables below.
# Dependencies: Python 3, pypeul (Python 3 branch)
from http.server import BaseHTTPRequestHandler, HTTPServer
from pypeul import IRC, Tags
import json
import signal
import threading
from urllib.parse import parse_qs, unquote
HTTP_HOST = ('0.0.0.0', 8085)
SERVER = ('irc.freenode.net', 6667, False) # host, port, SSL
NICK = ('somenick',)
CHAN = '#somechan'
import logging
logging.basicConfig(level=logging.DEBUG)
class Bot(IRC):
def on_ready(self):
self.join(CHAN)
def say(self, what):
self.message(CHAN, what)
# {"payload"=>"{"repository": {"website": "", "fork": false, "name": "MyRepoName", "scm": "git", "absolute_url": "MyRepoUrl", "owner": "Owner", "slug": "MyRepoSlug", "is_private": true}, "commits": [{"node": "d1403d93e9fb", "files": [{"type": "added", "file": "test_hook"}], "branch": "master", "utctimeistamp": "2012-03-27 15:12:59+00:00", "author": "alissonsales", "timestamp": "2012-03-27 17:12:59", "raw_node": "d1403d93e9fb4d5e2f59db154b55d2609b628f28", "parents": ["c23fcd11dcae"], "raw_author": "Alisson Sales <email@domain.com>", "message": "Test bitbucket hook\\n", "size": -1, "revision": null}], "canon_url": "https://bitbucket.org", "user": "alissonsales"}", "token"=>"MyRepoToken"}
class MyHandler(BaseHTTPRequestHandler):
def do_POST(self):
global bot
cl = int(self.headers.get('content-length', '0'))
try:
data = self.rfile.read(cl).decode('ascii')
data = unquote(data)
data = parse_qs(data)['payload'][0]
data = json.loads(data)
except Exception:
print("Received shit")
print(data)
self.send_response(400)
self.end_headers()
return
for c in data['commits']:
fline = c['message'].strip().split('\n')
if len(fline) > 0:
fline = fline[0].strip()
else:
fline = ''
url = "https://bitbucket.org%scommits/%s" % (
data['repository']['absolute_url'],
c['node']
)
bot.say(
"[{repo}] {author}: {msg} - {url}".format(
repo=Tags.grey(data['repository']['name']),
author=Tags.red(c['author']),
msg=fline,
url=Tags.blue(url),
)
)
self.send_response(204)
self.end_headers()
def main():
global bot, server
def cleanup():
bot.quit('killed')
server.socket.close()
signal.signal(signal.SIGINT, lambda signal, stack: cleanup())
server = HTTPServer(HTTP_HOST, MyHandler)
bot = Bot()
bot.connect(*SERVER)
bot.ident(*NICK)
t = threading.Thread(target=server.serve_forever)
t.start()
bot.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment