Skip to content

Instantly share code, notes, and snippets.

@DonnchaC
Last active December 17, 2015 22:46
Show Gist options
  • Save DonnchaC/ee6ef622067af3dde91e to your computer and use it in GitHub Desktop.
Save DonnchaC/ee6ef622067af3dde91e to your computer and use it in GitHub Desktop.
Simple script to monitor a Tor relay and report any warning via email.
# -*- coding: utf-8 -*-
"""
Monitors Tor control port and sends email notifications for
log events NOTICE, WARN, and ERROR.
"""
import os
import time
import datetime
import argparse
import logging
import sys
import smtplib
from datetime import datetime
from email.mime.text import MIMEText
import stem
from stem.control import Controller, EventType
from stem.response.events import Event
class Application(dict):
"""Store the application state"""
pass
def initialize_smtp_connection(app):
"""
Initialize and configure a SMTP connection
"""
if app.use_starttls:
smtp_port = app.smtp_port or 587
server = smtplib.SMTP(app.smtp_host, smtp_port)
server.starttls()
else:
smtp_port = app.smtp_port or 25
server = smtplib.SMTP(app.smtp_host, smtp_port)
if app.smtp_user or app.smtp_password:
server.login(app.smtp_user, app.smtp_password)
else:
app.log.debug("Not using SMTP authentication.")
return server
def send_email_notification(unreported_errors):
"""
Prepare notification email about malicious HSDir
"""
try:
smtp = initialize_smtp_connection(app)
except smtplib.SMTPException:
app.logger.exception("Could not connect to the SMTP server")
return None
message = MIMEText(
"Tor emitted {} error, warning or notice messages: \n\n"
"{}".format(len(unreported_errors), '\n'.join(unreported_errors))
)
message['Subject'] = 'Tor Relay Error {}'.format(app.relay_fingerprint)
message['To'] = app.email
message['From'] = 'Tor Relay Monitor <{}>'.format(app.source_address)
message['List-Id'] = "TorRelayMonitor <{}>".format(app.source_address)
try:
smtp.sendmail(app.source_address, app.email,
message.as_string())
except smtplib.SMTPException as e:
app.logger.exception("Error sending notification email")
app.logger.info("Sent notification email: %d lines",
len(unreported_errors))
smtp.quit()
def new_log_event(event):
"""
Parse Tor log event received via the control port
"""
if event.runlevel in ['ERROR', 'WARNING', 'NOTICE']:
log_message = '[{}]: {}'.format(datetime.now(), str(event))
app.unreported_errors.append(log_message)
app.logger.info(str(event))
# Check that it has been long enough since our last email.
if app.last_email:
time_since_mail = (datetime.now() - app.last_email).total_seconds()
if not app.last_email or (time_since_mail > app.email_limit):
# Limit expired, we can send another email.
unreported_errors = app.unreported_errors
app.unreported_errors = []
send_email_notification(unreported_errors)
app.last_email = datetime.now()
else:
app.logger.debug(event)
def parse_cmd_args():
"""
Parses and returns command line arguments.
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--address", type=str, default="127.0.0.1",
help="Tor controller host")
parser.add_argument("-p", "--port", type=int, default=9051,
help="Tor controller port")
smtp_group = parser.add_argument_group(title="SMTP Connection")
smtp_group.add_argument("--smtp-host", type=str, default="localhost",
help="SMTP server host (default: localhost)")
smtp_group.add_argument("--smtp-port", type=int, default=None,
help="SMTP port (defaults to 587 when using "
"STARTTLS)")
smtp_group.add_argument("--use-starttls", action='store_true')
smtp_group.add_argument("--smtp-user", type=str,
help="Username to authentication to SMTP server")
smtp_group.add_argument("--smtp-password", type=str,
help="SMTP server password")
notification = parser.add_argument_group(title="Email Recipients")
notification.add_argument("--email", type=str, default=None,
help="Address to receive email alerts")
notification.add_argument("--source-address", type=str,
default="tor-relay-monitor@torrelay.test",
help="Source for email notifications")
notification.add_argument('--email-limit', type=int, default=60,
help="Minimum delay between email notifications "
"(default: 60 mins)")
parser.add_argument("-v", "--verbose", action='store_true')
return parser.parse_args()
def initialize_control_connection(address='127.0.0.1', port=9051):
"""
Create a connection to the Tor control port
"""
try:
controller = Controller.from_port(address=address, port=port)
except stem.SocketError as exc:
app.logger.error("Unable to connect to Tor control port: %s", exc)
sys.exit(1)
else:
app.logger.debug("Successfully connected to the Tor control port.")
try:
controller.authenticate()
except stem.connection.AuthenticationFailure as exc:
app.logger.error("Unable to authenticate to Tor control port: %s", exc)
sys.exit(1)
else:
app.logger.debug("Successfully authenticated to the Tor control port.")
return controller
def get_logger():
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(fmt="%(asctime)s [%(levelname)s]: "
"%(message)s"))
logger = logging.getLogger('tormonitor')
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
def main(app):
"""
Begin main event loop waiting for control events
"""
app.logger = get_logger()
args = parse_cmd_args()
# Store arguments in the application context
for argument in vars(args):
setattr(app, argument, getattr(args, argument))
if args.verbose:
app.logger.setLevel(logging.DEBUG)
# Create connection to the Tor control port and listen for HS_DESC event
app.controller = initialize_control_connection(address=args.address,
port=args.port)
# Monitor err, warn and notice level messages from Tor
app.controller.add_event_listener(new_log_event, EventType.ERR)
app.controller.add_event_listener(new_log_event, EventType.WARN)
app.controller.add_event_listener(new_log_event, EventType.NOTICE)
app.email_limit = args.email_limit * 60 # Store email limit in seconds
app.unreported_errors = []
app.last_email = None
try:
app.relay_fingerprint = app.controller.get_info('fingerprint')
except stem.ControllerError as exc:
app.logger.error("Error retrieving the relay's fingerprint, are you "
"running a relay?")
app.relay_fingerprint = "[UNKNOWN]"
app.logger.info("Beginning event loop")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
app.logger.info("Keyboard interrupt, finishing.")
if __name__ == "__main__":
app = Application()
main(app)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment