Skip to content

Instantly share code, notes, and snippets.

@DonnchaC
Created May 7, 2016 17:24
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save DonnchaC/86f8dc382f2382c41ba136b04ad00cc6 to your computer and use it in GitHub Desktop.
Rough script for reading Tor CIRC and a custom CELL control events and keep tracking of Tor's circuits and cells
# -*- coding: utf-8 -*-
"""
Monitors Tor control port for descriptor lookups and INTRODUCE1
requests.
"""
import os
import time
import datetime
import argparse
import logging
import sys
from operator import attrgetter
import stem
from stem.util import tor_tools, str_tools
from stem.control import Controller, EventType
from stem.response.events import Event
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(fmt="%(asctime)s [%(levelname)s]: "
"%(message)s"))
logger = logging.getLogger("monitor")
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
# Store all the received circuits
circuits = {}
descriptor_lookups = {}
class CellEvent(Event):
"""
Stem parser for the custom CELL control port event
"""
_KEYWORD_ARGS = {
'COMMAND': 'command',
'CIRC': 'circuit_id',
'CIRC_ID': 'unique_circuit_id',
'DIRECTION': 'direction',
'TIME': 'timestamp',
}
def _parse(self):
if self.timestamp is not None:
try:
self.timestamp = str_tools._parse_iso_timestamp(self.timestamp)
except ValueError as exc:
raise stem.ProtocolError('Unable to parse create date (%s): %s' % (exc, self))
for circ_id in [self.circuit_id, self.unique_circuit_id]:
if not tor_tools.is_valid_circuit_id(circ_id):
raise stem.ProtocolError("Circuit IDs must be one to sixteen alphanumeric characters, got '%s': %s" % (self.id, self))
class Circuit(object):
"""
Track all cells sent or received on a circuit
"""
def __init__(self, circuit_id, unique_id=None):
self.id = int(circuit_id)
self.unique_id = int(unique_id) if unique_id else None
self.purpose = None
self.path = None
self.time_created = None
self.time_closed = None
self.last_activity = None
self.cells = []
self.events = []
def add_event(self, event):
"""
Add a new circuit event to this circuit
"""
self.time_created = self.time_created or event.created
if event.status == "BUILT":
self.path = event.path
self.purpose = event.purpose
# If there is a descriptor request to this relay, link the circuit
last_relay = self.path[-1][0]
if descriptor_lookups.get(last_relay):
self.purpose = "HS_DESC"
self.hs_address = descriptor_lookups[last_relay]
del descriptor_lookups[last_relay]
if event.status in ['FAILED', 'CLOSED']:
self.time_closed = datetime.datetime.utcnow()
self.events.append(event)
def add_cell(self, cell):
"""
Add a cell to the cell list for this circuit
"""
self.unique_id = self.unique_id
self.last_activity = cell.timestamp
self.cells.append(cell)
def __repr__(self):
return '<Circuit (%s, %s)>' % (self.id, self.unique_id)
class Cell(object):
"""
Represent a cell seen on the wire
"""
def __init__(self, command, direction, timestamp):
self.command = command.upper()
self.direction = direction
self.timestamp = timestamp
def __repr__(self):
return '<Cell %s %s %s>' % (self.command, self.direction, self.timestamp)
def new_cell_event(event):
"""
Process and log the processed CELL event
"""
new_cell = Cell(command=event.command, direction=event.direction, timestamp=event.timestamp)
# Quit if the cell is not linked with a circuit (probably a padding cell)
if not int(event.circuit_id):
return
circuit = circuits.get(int(event.circuit_id))
if not circuit:
# Create a new circuit if it hasn't been seen before
circuit = Circuit(event.circuit_id, event.unique_circuit_id)
circuits[circuit.id] = circuit
# Add cell to the new or existing circuit
circuit.add_cell(new_cell)
def new_circ_event(event):
"""
Add a new circuit event to the circuit
"""
circuit = circuits.get(int(event.id))
if not circuit:
# Create a new circuit if it hasn't been seen before
circuit = Circuit(event.id)
circuits[circuit.id] = circuit
circuit.add_event(event)
def new_hs_desc_event(event):
"""
Monitor for new HS_DESC events
"""
if event.action == 'REQUESTED':
descriptor_lookups[event.directory_fingerprint] = event.address
def parse_cmd_args():
"""
Parses and returns command line arguments.
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--address", type=str, default="127.0.0.1",
help="Tor controller host")
parser.add_argument("-p", "--port", type=int, default=9051,
help="Tor controller port")
return parser.parse_args()
def initialize_control_connection(address='127.0.0.1', port=9051):
"""
Create a connection to the Tor control port
"""
try:
controller = Controller.from_port(address=address, port=port)
except stem.SocketError as exc:
logger.error("Unable to connect to Tor control port: %s", exc)
sys.exit(1)
else:
logger.debug("Successfully connected to the Tor control port.")
try:
controller.authenticate()
except stem.connection.AuthenticationFailure as exc:
logger.error("Unable to authenticate to Tor control port: %s", exc)
sys.exit(1)
else:
logger.debug("Successfully authenticated to the Tor control port.")
return controller
def print_circuit_data(circuits):
"""
Display the circuit and cell objects
"""
os.system("clear")
for circuit in sorted(circuits.values(), key=attrgetter('last_activity')):
purpose = circuit.purpose or 'UNKNOWN'
state = 'CLOSED' if circuit.time_closed else 'OPEN'
address = getattr(circuit, 'hs_address', "")
print("\nCircuit {} - {} {} [{}]".format(circuit.id, purpose, address, state))
print("Circuit events:")
if circuit.events:
for event in circuit.events:
path = [relay[0] for relay in event.path]
print(" - {} {}".format(event.status, path))
else:
print(" [NONE]")
print("Cells:")
if circuit.cells:
for cell in circuit.cells:
print(" - {}".format(cell))
else:
print(" [NONE]")
def main():
"""
Begin main event loop waiting for control events
"""
args = parse_cmd_args()
# Create connection to the Tor control port and listen for HS_DESC event
controller = initialize_control_connection(address=args.address,
port=args.port)
# Add a custom event handler for CELL events
stem.response.events.EVENT_TYPE_TO_CLASS['CELL'] = CellEvent
controller.add_event_listener(new_cell_event, 'CELL')
controller.add_event_listener(new_circ_event, EventType.CIRC)
controller.add_event_listener(new_hs_desc_event, EventType.HS_DESC)
logger.info("Beginning event loop")
try:
while True:
import time
if time.time() % 10 == 0:
print_circuit_data(circuits)
continue
except KeyboardInterrupt:
logger.info("Keyboard interrupt, finishing.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment