Created
May 7, 2016 17:24
Star
You must be signed in to star a gist
Rough script for reading Tor CIRC and a custom CELL control events and keep tracking of Tor's circuits and cells
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Monitors Tor control port for descriptor lookups and INTRODUCE1 | |
requests. | |
""" | |
import os | |
import time | |
import datetime | |
import argparse | |
import logging | |
import sys | |
from operator import attrgetter | |
import stem | |
from stem.util import tor_tools, str_tools | |
from stem.control import Controller, EventType | |
from stem.response.events import Event | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter(fmt="%(asctime)s [%(levelname)s]: " | |
"%(message)s")) | |
logger = logging.getLogger("monitor") | |
logger.addHandler(handler) | |
logger.setLevel(logging.DEBUG) | |
# Store all the received circuits | |
circuits = {} | |
descriptor_lookups = {} | |
class CellEvent(Event): | |
""" | |
Stem parser for the custom CELL control port event | |
""" | |
_KEYWORD_ARGS = { | |
'COMMAND': 'command', | |
'CIRC': 'circuit_id', | |
'CIRC_ID': 'unique_circuit_id', | |
'DIRECTION': 'direction', | |
'TIME': 'timestamp', | |
} | |
def _parse(self): | |
if self.timestamp is not None: | |
try: | |
self.timestamp = str_tools._parse_iso_timestamp(self.timestamp) | |
except ValueError as exc: | |
raise stem.ProtocolError('Unable to parse create date (%s): %s' % (exc, self)) | |
for circ_id in [self.circuit_id, self.unique_circuit_id]: | |
if not tor_tools.is_valid_circuit_id(circ_id): | |
raise stem.ProtocolError("Circuit IDs must be one to sixteen alphanumeric characters, got '%s': %s" % (self.id, self)) | |
class Circuit(object): | |
""" | |
Track all cells sent or received on a circuit | |
""" | |
def __init__(self, circuit_id, unique_id=None): | |
self.id = int(circuit_id) | |
self.unique_id = int(unique_id) if unique_id else None | |
self.purpose = None | |
self.path = None | |
self.time_created = None | |
self.time_closed = None | |
self.last_activity = None | |
self.cells = [] | |
self.events = [] | |
def add_event(self, event): | |
""" | |
Add a new circuit event to this circuit | |
""" | |
self.time_created = self.time_created or event.created | |
if event.status == "BUILT": | |
self.path = event.path | |
self.purpose = event.purpose | |
# If there is a descriptor request to this relay, link the circuit | |
last_relay = self.path[-1][0] | |
if descriptor_lookups.get(last_relay): | |
self.purpose = "HS_DESC" | |
self.hs_address = descriptor_lookups[last_relay] | |
del descriptor_lookups[last_relay] | |
if event.status in ['FAILED', 'CLOSED']: | |
self.time_closed = datetime.datetime.utcnow() | |
self.events.append(event) | |
def add_cell(self, cell): | |
""" | |
Add a cell to the cell list for this circuit | |
""" | |
self.unique_id = self.unique_id | |
self.last_activity = cell.timestamp | |
self.cells.append(cell) | |
def __repr__(self): | |
return '<Circuit (%s, %s)>' % (self.id, self.unique_id) | |
class Cell(object): | |
""" | |
Represent a cell seen on the wire | |
""" | |
def __init__(self, command, direction, timestamp): | |
self.command = command.upper() | |
self.direction = direction | |
self.timestamp = timestamp | |
def __repr__(self): | |
return '<Cell %s %s %s>' % (self.command, self.direction, self.timestamp) | |
def new_cell_event(event): | |
""" | |
Process and log the processed CELL event | |
""" | |
new_cell = Cell(command=event.command, direction=event.direction, timestamp=event.timestamp) | |
# Quit if the cell is not linked with a circuit (probably a padding cell) | |
if not int(event.circuit_id): | |
return | |
circuit = circuits.get(int(event.circuit_id)) | |
if not circuit: | |
# Create a new circuit if it hasn't been seen before | |
circuit = Circuit(event.circuit_id, event.unique_circuit_id) | |
circuits[circuit.id] = circuit | |
# Add cell to the new or existing circuit | |
circuit.add_cell(new_cell) | |
def new_circ_event(event): | |
""" | |
Add a new circuit event to the circuit | |
""" | |
circuit = circuits.get(int(event.id)) | |
if not circuit: | |
# Create a new circuit if it hasn't been seen before | |
circuit = Circuit(event.id) | |
circuits[circuit.id] = circuit | |
circuit.add_event(event) | |
def new_hs_desc_event(event): | |
""" | |
Monitor for new HS_DESC events | |
""" | |
if event.action == 'REQUESTED': | |
descriptor_lookups[event.directory_fingerprint] = event.address | |
def parse_cmd_args(): | |
""" | |
Parses and returns command line arguments. | |
""" | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument("--address", type=str, default="127.0.0.1", | |
help="Tor controller host") | |
parser.add_argument("-p", "--port", type=int, default=9051, | |
help="Tor controller port") | |
return parser.parse_args() | |
def initialize_control_connection(address='127.0.0.1', port=9051): | |
""" | |
Create a connection to the Tor control port | |
""" | |
try: | |
controller = Controller.from_port(address=address, port=port) | |
except stem.SocketError as exc: | |
logger.error("Unable to connect to Tor control port: %s", exc) | |
sys.exit(1) | |
else: | |
logger.debug("Successfully connected to the Tor control port.") | |
try: | |
controller.authenticate() | |
except stem.connection.AuthenticationFailure as exc: | |
logger.error("Unable to authenticate to Tor control port: %s", exc) | |
sys.exit(1) | |
else: | |
logger.debug("Successfully authenticated to the Tor control port.") | |
return controller | |
def print_circuit_data(circuits): | |
""" | |
Display the circuit and cell objects | |
""" | |
os.system("clear") | |
for circuit in sorted(circuits.values(), key=attrgetter('last_activity')): | |
purpose = circuit.purpose or 'UNKNOWN' | |
state = 'CLOSED' if circuit.time_closed else 'OPEN' | |
address = getattr(circuit, 'hs_address', "") | |
print("\nCircuit {} - {} {} [{}]".format(circuit.id, purpose, address, state)) | |
print("Circuit events:") | |
if circuit.events: | |
for event in circuit.events: | |
path = [relay[0] for relay in event.path] | |
print(" - {} {}".format(event.status, path)) | |
else: | |
print(" [NONE]") | |
print("Cells:") | |
if circuit.cells: | |
for cell in circuit.cells: | |
print(" - {}".format(cell)) | |
else: | |
print(" [NONE]") | |
def main(): | |
""" | |
Begin main event loop waiting for control events | |
""" | |
args = parse_cmd_args() | |
# Create connection to the Tor control port and listen for HS_DESC event | |
controller = initialize_control_connection(address=args.address, | |
port=args.port) | |
# Add a custom event handler for CELL events | |
stem.response.events.EVENT_TYPE_TO_CLASS['CELL'] = CellEvent | |
controller.add_event_listener(new_cell_event, 'CELL') | |
controller.add_event_listener(new_circ_event, EventType.CIRC) | |
controller.add_event_listener(new_hs_desc_event, EventType.HS_DESC) | |
logger.info("Beginning event loop") | |
try: | |
while True: | |
import time | |
if time.time() % 10 == 0: | |
print_circuit_data(circuits) | |
continue | |
except KeyboardInterrupt: | |
logger.info("Keyboard interrupt, finishing.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment