Skip to content

Instantly share code, notes, and snippets.

@krzysztofantczak
Last active March 6, 2024 07:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krzysztofantczak/27b2ed098d88417032916ba4dfeeadbd to your computer and use it in GitHub Desktop.
Save krzysztofantczak/27b2ed098d88417032916ba4dfeeadbd to your computer and use it in GitHub Desktop.
import os
import signal
import socket
import sys
import threading
import time
from prometheus_client import start_http_server, Gauge, Counter
class Peer:
def __init__(self, name):
self.name = name
self.connections = {} # Dictionary to store client connections
self.latencies = {} # Dictionary to store latency measurements
self.packet_loss_rates = {} # Dictionary to store packet loss rates
self.sent_pings = {} # Dictionary to store the number of sent ping messages
self.received_pongs = {} # Dictionary to store the number of received pong messages
self.lock = threading.Lock() # Lock to synchronize access to shared resources
self.server_socket = None # Server socket for accepting incoming connections
self.running = True # Flag to control the main loop
# Prometheus metrics to expose latency measurements
self.latency_seconds_gauge = Gauge('peer_latency_seconds', 'Latency to peers in seconds', ['peer'])
self.latency_milliseconds_gauge = Gauge('peer_latency_milliseconds', 'Latency to peers in milliseconds',
['peer'])
self.latency_microseconds_gauge = Gauge('peer_latency_microseconds', 'Latency to peers in microseconds',
['peer'])
# Prometheus metric to expose packet loss rates
self.packet_loss_rate_gauge = Gauge('peer_packet_loss_rate', 'Packet loss rate to peers', ['peer'])
# Prometheus counter to track the number of times a peer goes offline
self.peer_offline_counter = Counter('peer_offline_counter', 'Number of times a peer goes offline', ['peer'])
def start(self):
# Start server and background threads
threading.Thread(target=self.start_server).start()
threading.Thread(target=self.connect_to_peers).start()
threading.Thread(target=self.send_ping).start()
# Start Prometheus HTTP server on a port incremented by 1 from the peer's port
port = self.get_peer_address(self.name)[1] + 1
print(f"Starting metrics server on port {port}")
start_http_server(port)
def start_server(self):
# Bind server socket to the peer's address and listen for incoming connections
host, port = self.get_peer_address(self.name)
while True:
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((host, port))
self.server_socket.listen(5)
print(f"{self.name} is listening on {host}:{port}")
break
except OSError as e:
if e.errno == 98:
print(f"Port {port} is already in use. Retrying in 5 seconds...")
time.sleep(5)
else:
raise
while self.running:
try:
# Accept incoming connections and handle them in separate threads
client_socket, addr = self.server_socket.accept()
threading.Thread(target=self.handle_client, args=(client_socket, addr)).start()
except KeyboardInterrupt:
self.running = False
break
def handle_client(self, client_socket, addr):
# Handle communication with a client
with self.lock:
# Store client connection and initialize latency and packet loss rate to None and 0, respectively
self.connections[addr] = client_socket
self.latencies[addr] = None
self.packet_loss_rates[addr] = 0.0
print(f"New connection from {addr}")
previously_online = True # Track if the peer was previously online
while self.running:
try:
data = client_socket.recv(1024).decode()
if not data:
break
print(f"Received message from {addr}: {data}")
# Ping/Pong mechanism
if data.startswith("ping"):
parts = data.split()
if len(parts) == 5 and parts[3] == 'at':
peer_name, timestamp = parts[2], float(parts[4])
pong_message = f"pong from {self.name} at {time.time()}"
client_socket.send(pong_message.encode())
latency = time.time() - timestamp
with self.lock:
self.latencies[addr] = latency
self.latency_milliseconds_gauge.labels(peer=peer_name).set(
latency * 1000) # Convert to milliseconds
self.latency_microseconds_gauge.labels(peer=peer_name).set(
latency * 1000000) # Convert to microseconds
# Calculate packet loss rate
sent_pings = self.sent_pings.get(addr, 0)
received_pongs = self.received_pongs.get(addr, 0)
if sent_pings > 0:
packet_loss_rate = (sent_pings - received_pongs) / sent_pings * 100.0
self.packet_loss_rates[addr] = packet_loss_rate
# Check if latency switches from non-zero to -1
if previously_online and latency == -1:
self.peer_offline_counter.labels(peer=peer_name).inc()
previously_online = False
elif not previously_online and latency > 0:
previously_online = True
except Exception as e:
print(f"An error occurred while handling client {addr}: {e}")
break
# When client disconnects, set the latency metric to -1
with self.lock:
if addr in self.connections:
del self.connections[addr]
if addr in self.latencies:
self.latency_milliseconds_gauge.labels(peer=peer_name).set(-1) # Set latency to -1
self.latency_microseconds_gauge.labels(peer=peer_name).set(-1) # Set latency to -1
del self.latencies[addr]
if previously_online:
self.peer_offline_counter.labels(peer=peer_name).inc()
print(f"Connection with {addr} closed")
try:
self.connections[addr].close()
except KeyError:
print(f"Error: Connection with {addr} already closed.")
def close_connection(self, addr):
# Close connection and clean up resources
with self.lock:
if addr in self.connections:
del self.connections[addr]
if addr in self.latencies:
del self.latencies[addr]
print(f"Connection with {addr} closed")
try:
self.connections[addr].close()
except KeyError:
print(f"Error: Connection with {addr} already closed.")
def handle_reconnect(self, addr):
# Attempt to reconnect to a peer
host, port = addr
retry_delay = 1
while self.running:
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((host, port))
print(f"Reconnected to {addr}")
self.handle_client(client_socket, addr)
break
except ConnectionRefusedError:
print(f"Connection to {addr} refused. Peer may be unavailable. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 30)
except Exception as e:
print(f"An error occurred while reconnecting to {addr}: {e}")
break
def connect_to_peer(self, peer_name, host, port):
# Connect to a peer
retry_delay = 1
while self.running:
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((host, port))
print(f"Connected to {peer_name} at {host}:{port}")
self.handle_client(client_socket, (host, port))
return
except ConnectionRefusedError:
print(
f"Connection to {peer_name} at {host}:{port} refused. Peer may be unavailable. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 30)
except Exception as e:
print(f"An error occurred while connecting to {peer_name}: {e}")
break
def connect_to_peers(self):
# Connect to all peers
peers = os.getenv("PEERS", "").split(";")
threads = []
for peer_info in peers:
peer_name, peer_address = peer_info.split("=")
if peer_name == self.name:
continue
host, port = peer_address.split(":")
port = int(port)
thread = threading.Thread(target=self.connect_to_peer, args=(peer_name, host, port))
threads.append(thread)
thread.start()
# Wait for all threads to finish
for thread in threads:
thread.join()
def send_ping(self):
# Send ping messages to all connected peers
while self.running:
with self.lock:
for addr, client_socket in list(self.connections.items()):
try:
start_time = time.time()
client_socket.send(f"ping from {self.name} at {start_time}".encode())
# Update sent pings count
self.sent_pings[addr] = self.sent_pings.get(addr, 0) + 1
except Exception as e:
print(f"Error sending ping to {addr}: {e}")
time.sleep(5)
def get_peer_address(self, name):
# Get address of a peer by name
peers = os.getenv("PEERS", "").split(";")
for peer_info in peers:
peer_name, peer_address = peer_info.split("=")
if peer_name == name:
host, port = peer_address.split(":")
return host, int(port)
raise ValueError(f"No peer found with name {name}")
def get_latency_metrics(self):
# Generator function to yield latency metrics
with self.lock:
for addr, latency in self.latencies.items():
yield addr, latency
def signal_handler(sig, frame):
# Signal handler to gracefully exit the program
global peer
print('Exiting...')
peer.running = False
peer.server_socket.close()
for addr in peer.connections.keys():
peer.close_connection(addr)
sys.exit(0)
if __name__ == "__main__":
# Main entry point of the program
import argparse
parser = argparse.ArgumentParser(description='Latency Exporter')
parser.add_argument('name', type=str, help='Name of the current exporter instance')
args = parser.parse_args()
signal.signal(signal.SIGINT, signal_handler)
peer = Peer(args.name)
peer.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment