Last active
July 15, 2020 00:45
-
-
Save EkremDincel/a26230ddfe0dc4dd80b6fcbb56df1102 to your computer and use it in GitHub Desktop.
A little TCP socket interface for Python.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import socket | |
from struct import Struct, calcsize | |
from select import select | |
__all__ = ("Server", "Client", "localhostname", "localhost", "LAN", "WAN") | |
_int = "H" # 0 <= number <= 65535 == 2 ** 16 -1 | |
_size = calcsize(_int) | |
_struct = Struct("!" + _int) | |
del Struct, calcsize # clear namespace | |
localhostname = socket.gethostname() | |
localhost = socket.gethostbyname(localhostname) | |
LAN = "localhost" # local area network | |
WAN = "" # wide area network | |
class BaseSocket(): # isim önerilerine açığım | |
def __init__(self): | |
self.socket = socket.socket() | |
def close(self): | |
"""Closes socket.""" | |
self.socket.close() | |
class SelectWrapper(): | |
def isreadable(self): | |
"""Returns True if socket can read data.""" | |
return select((self.socket,), (), (), 0)[0] | |
def iswritable(self): | |
"""Returns True if socket can write data.""" | |
return select((), (self.socket,), (), 0)[1] | |
def iserror(self): | |
"""Returns True if socket will raise error.""" | |
return select((), (), (self.socket,), 0)[2] | |
def select(self): | |
s = (self.socket,) | |
return select(s, s, s, 0) | |
class Server(BaseSocket, SelectWrapper): | |
"""Server class for accepting clients.""" | |
canaccept = SelectWrapper.isreadable # alias | |
def bind(self, address): | |
"""Bind server to address.""" | |
self.socket.bind(address) | |
def listen(self, n): | |
"""Set the number of unaccepted connections that the system will allow before refusing new connections.""" | |
self.socket.listen(n) | |
def accept(self): | |
"""Accept a client and return it.""" | |
return AcceptedClient(*self.socket.accept()) | |
class BaseClient(): | |
def _safe_send(self, msg): | |
totalsent = 0 | |
while totalsent < len(msg): | |
sent = self.socket.send(msg[totalsent:]) | |
if sent: | |
totalsent += sent | |
else: | |
raise ConnectionError("Socket connection is broken.") | |
def _safe_recv(self, lenght): | |
return_value = b"" | |
bytes_recd = 0 | |
while bytes_recd < lenght: | |
chunk = self.socket.recv(lenght - bytes_recd) | |
if chunk: | |
return_value += chunk | |
bytes_recd += len(chunk) | |
else: | |
raise ConnectionError("Socket connection is broken.") | |
return return_value | |
def send_obj(self, obj): | |
"""Send an object to the other socket.""" | |
bytes_to_send = json.dumps(obj).encode() | |
self._safe_send(_struct.pack(len(bytes_to_send)) + bytes_to_send) | |
def recv_obj(self): | |
"""Receive an object from the other socket.""" | |
lenght = _struct.unpack(self._safe_recv(_size))[0] | |
return json.loads(self._safe_recv(lenght).decode()) | |
class AcceptedClient(BaseClient, SelectWrapper, BaseSocket): | |
def __init__(self, socket, addr): | |
self.addr = addr | |
self.socket = socket | |
class Client(BaseClient, SelectWrapper, BaseSocket): | |
"""Client class for connecting to the server.""" | |
def connect(self, address): | |
"""Connect to the server.""" | |
self.socket.connect(address) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment