Skip to content

Instantly share code, notes, and snippets.

@cramforce
Created July 17, 2017 04:04
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cramforce/f67911d3ff3cc5fa70044384419c07df to your computer and use it in GitHub Desktop.
Save cramforce/f67911d3ff3cc5fa70044384419c07df to your computer and use it in GitHub Desktop.
Google App Engine ADT Pulse alert email to IFTT webhook adapter
import logging
import re
# Receives alert emails from ADT Pulse and expects them to have
# an extra text of the form 'action=YOUR_ACTION' in the body.
# Calls IFTT with your key and the given action to trigger a webhook.
from google.appengine.ext.webapp.mail_handlers import InboundMailHandler
from google.appengine.api import urlfetch
import webapp2
class LogSenderHandler(InboundMailHandler):
def receive(self, mail_message):
logging.info("Received a message from: " + mail_message.sender)
ok_sender = re.compile('adtpulse\.com|YOUR_TEST_EMAIL', re.IGNORECASE)
if not ok_sender.search(mail_message.sender):
logging.info("Skipping message: " + mail_message.sender)
return
action = re.compile('action=(\w+)')
plaintext_bodies = mail_message.bodies('text/plain')
for content_type, body in plaintext_bodies:
plaintext = body.decode()
logging.info("Plain text body of length %d.", len(plaintext))
m = action.search(plaintext)
if m:
action = m.group(1);
logging.info("Action %s.", action)
url = "https://maker.ifttt.com/trigger/" + action + "/with/key/YOUR_KEY"
logging.info("URL %s.", url)
result = urlfetch.fetch(
url=url,
method=urlfetch.POST)
if result.status_code == 200:
logging.info("IFTT is happy: %s", result.content)
else:
logging.info("IFTT status: %s", result.status_code)
logging.info("IFTT response: %s", result.content)
app = webapp2.WSGIApplication([LogSenderHandler.mapping()], debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment