Skip to content

Instantly share code, notes, and snippets.

@craigderington
Created December 13, 2018 19:19
Show Gist options
  • Save craigderington/b1fb5b07942d99a04a1751b2325d551b to your computer and use it in GitHub Desktop.
Save craigderington/b1fb5b07942d99a04a1751b2325d551b to your computer and use it in GitHub Desktop.
Flask REST Mailgun Webhook v2
@app.route('/api/v1/wh/mg/lead/email/delivered', methods=['POST'])
def lead_email_delivered():
"""
The lead email delivered webhook.
:return: json
"""
if request.method == 'POST':
form_data = {
"message_id": request.form.get('Message-Id', None),
"x_mail_gun_sid": request.form.get('X-Mailgun-Sid', None),
"domain": request.form.get('domain', 'email.com'),
"event": request.form.get('event', 'delivered'),
"timestamp": request.form.get('timestamp', None),
"recipient": request.form.get('recipient', None),
"signature": request.form.get('signature', None),
"token": request.form.get('token', None)
}
# verify the mailgun token and signature with the api_key
token = form_data['token'].encode('utf-8')
timestamp = form_data['timestamp'].encode('utf-8')
signature = form_data['signature'].encode('utf-8')
mg_recipient = form_data['recipient']
event = form_data['event']
if verify(mailgun_api_key, token, timestamp, signature):
try:
lead = db_session.query(Lead).filter(
Lead.email_addr == mg_recipient
).first()
if lead:
email = lead.email
lead_id = lead.id
event = form_data['event']
# set the delivered flags in the database
lead.followup_email_delivered = 1
lead.followup_email_status = event
lead.webhook_last_updated = datetime.now()
db_session.commit()
# return a successful response
return jsonify({
"l_id": lead_id,
"email": email,
"event": event,
"status": 'success'}), 202
# return 404: no email for recipient email address
else:
resp = {"Error": "Unable to resolve the recipient email address..."}
data = json.dumps(resp)
return Response(data, status=404, mimetype='application/json')
# database exception
except exc.SQLAlchemyError as db_err:
resp = {"Database Error": str(db_err)}
data = json.dumps(resp)
return Response(data, status=500, mimetype='application/json')
# signature and token verification failed
else:
resp = {"Signature": form_data['signature'], "Token": form_data['token']}
data = json.dumps(resp)
return Response(data, status=409, mimetype='application/json')
# method not allowed
else:
resp = {"Message": "Method Not Allowed"}
data = json.dumps(resp)
return Response(data, status=405, mimetype='application/json')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment