Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save EncodeTheCode/bd3d80b03e7263da0f1b5ff7c496a297 to your computer and use it in GitHub Desktop.
Save EncodeTheCode/bd3d80b03e7263da0f1b5ff7c496a297 to your computer and use it in GitHub Desktop.
import socket
import time
import threading
import geoip2.database
# Configuration
HOST = "0.0.0.0"
PORT = 12345
GEOIP_DB_PATH = "GeoLite2-City.mmdb" # Ensure you have this file
RATE_LIMIT_MS = 1000 # Minimum time interval between executions in milliseconds
# Load GeoIP2 database once
reader = geoip2.database.Reader(GEOIP_DB_PATH)
# Last execution timestamp (thread-safe)
last_execution_time = time.perf_counter() * 1000
lock = threading.Lock()
def get_country(ip: str) -> str:
"""Get country name from IP address using GeoIP2."""
try:
response = reader.city(ip)
return response.country.name
except Exception:
return "Unknown"
def measure_latency(client_socket: socket.socket) -> float:
"""Measure connection latency by sending a ping and waiting for a response."""
try:
start_time = time.perf_counter()
client_socket.sendall(b"ping")
client_socket.recv(1024) # Expect response
return (time.perf_counter() - start_time) * 1000 # Convert to ms
except socket.error:
return -1 # Indicate failure
def handle_client(client_socket: socket.socket, addr: tuple):
"""Handle incoming client connection."""
global last_execution_time
with client_socket:
country = get_country(addr[0])
if country != "South Africa":
return # Ignore non-South African connections
latency = measure_latency(client_socket)
if latency == -1:
return # Skip if latency measurement fails
# Rate-limiting logic
with lock:
current_time = time.perf_counter() * 1000 # Convert to ms
if current_time - last_execution_time < RATE_LIMIT_MS:
return # Skip if time interval is too short
last_execution_time = current_time # Update last execution time
print(f"South Africa connection detected from {addr[0]} with {latency:.2f} ms latency")
perform_action(latency)
def perform_action(latency: float):
"""Action to execute when a valid connection is detected."""
print(f"Performing action for latency {latency:.2f}ms")
def server():
"""Start a multi-threaded TCP server to listen for incoming connections."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.bind((HOST, PORT))
server_socket.listen()
print(f"Listening on {HOST}:{PORT}")
while True:
client_socket, addr = server_socket.accept()
client_thread = threading.Thread(target=handle_client, args=(client_socket, addr), daemon=True)
client_thread.start()
if __name__ == "__main__":
server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment