Last active
June 15, 2021 21:40
-
-
Save eduardomazolini/ee9829ee5f30a2b86e283e159d7f15bd to your computer and use it in GitHub Desktop.
Cria um serviço pra receber notificações WebHook de COMMIT do Bitbucket e envia para o MandeUmZap (M1Z)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!./venv/bin/python3 | |
# -*- coding: utf-8 -*- | |
''' | |
Cria um serviço pra receber notificações WebHook de COMMIT do Bitbucket e envia para o MandeUmZap (M1Z) | |
''' | |
import os | |
import asyncio | |
import pprint | |
import aiohttp | |
from aiohttp import web | |
from aiojobs.aiohttp import setup, spawn, get_scheduler | |
#from aiohttp_basicauth import BasicAuthMiddleware | |
from jinja2 import Environment, FileSystemLoader | |
ENV = Environment(loader=FileSystemLoader("templates")) | |
PP = pprint.PrettyPrinter(indent=4) | |
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) | |
M1Z = { | |
#"url_messages" : "https://webhook.site/wwwwwwww-wwww-wwww-wwww-wwwwwwwwwwww", | |
"url_messages" : "https://empresa-api.mandeumzap.com.br/v1/messages", | |
"headers" : { | |
"Authorization": "Bearer xxxxxxxxxxx", | |
"Content-Type" : "application/json" | |
}, | |
"body":{ | |
"text" : "teste", | |
"type" : "chat", | |
"doNotOpenTicket" : True, | |
"contactId" : "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" | |
} | |
} | |
async def envia_mensagem(data): | |
''' | |
Envia a mensagm para o Mande1Zap | |
''' | |
payload = M1Z["body"] | |
msg = "O desenvolvedor " + data["actor"]["display_name"] | |
msg += " no código " + data["repository"]["name"] | |
msg += " alterou:" + '\r\n' | |
for commit in data["push"]["changes"][0]["commits"]: | |
msg += " - " + commit["message"] | |
payload.update({"text" : msg}) | |
PP.pprint(payload) | |
async with aiohttp.ClientSession(headers=M1Z["headers"]) as client: | |
async with client.post(M1Z["url_messages"], | |
json=payload) as response: | |
print(await response.text()) | |
#PP.pprint(data) | |
def web_get_scheduler(request): | |
''' | |
Retorna o numero de processos jobs executando | |
''' | |
scheduler = get_scheduler(request) | |
return aiohttp.web.Response(text=str(scheduler.active_count)) | |
async def get_json(request): #pylint: disable=unused-argument | |
''' | |
Responde ao get do '/' retorna json | |
''' | |
headers = {} | |
headers['Access-Control-Allow-Origin'] = '*' | |
json_body = {} | |
return aiohttp.web.json_response(json_body, headers=headers) | |
async def post_upload(request): | |
''' | |
Recebe a notificação do Bitbucket | |
''' | |
data = await request.json() | |
await spawn(request, envia_mensagem(data)) | |
return aiohttp.web.Response(body="", content_type="text/html") | |
def web_server(): | |
''' | |
Sobe o serviço Web | |
''' | |
#auth = BasicAuthMiddleware(username='user', password='password') | |
#app = web.Application(middlewares=[auth]) | |
app = web.Application() | |
setup(app) | |
#app.router.add_get('/get_scheduler', web_get_scheduler, allow_head=False) | |
#app.router.add_get('/json', get_json, allow_head=False) | |
app.router.add_post('/notifica', post_upload) | |
#app.router.add_static('/', | |
# path=PROJECT_ROOT + '/static', | |
# name='static', show_index=False) | |
web.run_app(app, port=8082) | |
if __name__ == "__main__": | |
LOOP = asyncio.get_event_loop() | |
LOOP.run_until_complete(web_server()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment