Created
March 18, 2025 18:45
-
-
Save EncodeTheCode/bd3d80b03e7263da0f1b5ff7c496a297 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import time | |
import threading | |
import geoip2.database | |
# Configuration | |
HOST = "0.0.0.0" | |
PORT = 12345 | |
GEOIP_DB_PATH = "GeoLite2-City.mmdb" # Ensure you have this file | |
RATE_LIMIT_MS = 1000 # Minimum time interval between executions in milliseconds | |
# Load GeoIP2 database once | |
reader = geoip2.database.Reader(GEOIP_DB_PATH) | |
# Last execution timestamp (thread-safe) | |
last_execution_time = time.perf_counter() * 1000 | |
lock = threading.Lock() | |
def get_country(ip: str) -> str: | |
"""Get country name from IP address using GeoIP2.""" | |
try: | |
response = reader.city(ip) | |
return response.country.name | |
except Exception: | |
return "Unknown" | |
def measure_latency(client_socket: socket.socket) -> float: | |
"""Measure connection latency by sending a ping and waiting for a response.""" | |
try: | |
start_time = time.perf_counter() | |
client_socket.sendall(b"ping") | |
client_socket.recv(1024) # Expect response | |
return (time.perf_counter() - start_time) * 1000 # Convert to ms | |
except socket.error: | |
return -1 # Indicate failure | |
def handle_client(client_socket: socket.socket, addr: tuple): | |
"""Handle incoming client connection.""" | |
global last_execution_time | |
with client_socket: | |
country = get_country(addr[0]) | |
if country != "South Africa": | |
return # Ignore non-South African connections | |
latency = measure_latency(client_socket) | |
if latency == -1: | |
return # Skip if latency measurement fails | |
# Rate-limiting logic | |
with lock: | |
current_time = time.perf_counter() * 1000 # Convert to ms | |
if current_time - last_execution_time < RATE_LIMIT_MS: | |
return # Skip if time interval is too short | |
last_execution_time = current_time # Update last execution time | |
print(f"South Africa connection detected from {addr[0]} with {latency:.2f} ms latency") | |
perform_action(latency) | |
def perform_action(latency: float): | |
"""Action to execute when a valid connection is detected.""" | |
print(f"Performing action for latency {latency:.2f}ms") | |
def server(): | |
"""Start a multi-threaded TCP server to listen for incoming connections.""" | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: | |
server_socket.bind((HOST, PORT)) | |
server_socket.listen() | |
print(f"Listening on {HOST}:{PORT}") | |
while True: | |
client_socket, addr = server_socket.accept() | |
client_thread = threading.Thread(target=handle_client, args=(client_socket, addr), daemon=True) | |
client_thread.start() | |
if __name__ == "__main__": | |
server() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment