Skip to content

Instantly share code, notes, and snippets.

@safeith
Last active October 3, 2020 07:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save safeith/fc2b5b32d941f8ea46a8081278045d83 to your computer and use it in GitHub Desktop.
Save safeith/fc2b5b32d941f8ea46a8081278045d83 to your computer and use it in GitHub Desktop.
Simple TCP socket class in Python that accepts URL(s) as input and return its http status code
#!/usr/bin/env python3
import socket
import requests
class SimpleTcpSocket:
"""
A TCP socket class
"""
def __init__(self, listening_ip=None, listening_port=None, **kwargs):
"""
Args:
listening_ip (str): The binding IP
listening_port (int): The binding port
kwargs:
status_log_file (str): The status log file
error_log_file (str): The error log file
buffer_size (int): The buffer size
"""
self.listening_ip = '127.0.0.1' if listening_ip is None else listening_ip
self.listening_port = 30080 if listening_port is None else listening_port
self.status_log_file = kwargs['status_log_file'] if 'status_log_file' in kwargs else 'status.log'
self.error_log_file = kwargs['error_log_file'] if 'error_log_file' in kwargs else 'error.log'
self.buffer_size = kwargs['buffer_size'] if 'buffer_size' in kwargs else 1024
def __str__(self):
return f'TCP socket is running on {self.listening_ip}:{self.listening_port}' \
f', status_log_file: {self.status_log_file}, error_log_file: {self.error_log_file}' \
f', buffer_size: {self.buffer_size}'
def server(self):
"""
This a simple tcp socket that accept url(s) as input and return url(s), and
its status code.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((self.listening_ip, self.listening_port))
sock.listen(5)
while True:
connection, address = sock.accept()
print(f'incoming connection from {address[0]}')
buffer = connection.recv(self.buffer_size)
connection.send(buffer)
urls = buffer.decode().strip().split("\n")
for url in urls:
try:
req = requests.get(url)
except Exception as e:
with open(self.error_log_file, 'a') as error_log:
error_log.write(f'{url}, {e}\n')
connection.send(
f' => {url} is not reachable\n'.encode())
continue
else:
with open(self.status_log_file, 'a') as status_log:
status_log.write(f'{url},{req.status_code}\n')
connection.send(
f' => {url} status code is {req.status_code}\n'.encode())
connection.close()
charlie = SimpleTcpSocket()
print(charlie)
charlie.server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment