Skip to content

Instantly share code, notes, and snippets.

@MatthewScholefield
Last active March 28, 2019 08:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MatthewScholefield/b45c299a125b9792fd6ab7a4354ba2c6 to your computer and use it in GitHub Desktop.
Save MatthewScholefield/b45c299a125b9792fd6ab7a4354ba2c6 to your computer and use it in GitHub Desktop.
Example Python code for interacting with a sockets server
import random
import socket
from time import sleep
from threading import Thread
address = ('127.0.0.1', 8000)
delay_sec_min = 1.0
delay_sec_max = 3.0
max_num_messages = 10
max_num_clients = 15
def delay_random():
sleep(delay_sec_min + (delay_sec_max - delay_sec_min) * random.random())
class DemoServer:
def __init__(self, ip, port):
self.socket = socket.socket()
self.socket.connect((ip, port))
def send(self, msg: str):
len_bytes = len(msg).to_bytes(4, 'little')
print('Sending message: {}'.format(msg))
self.socket.sendall(len_bytes + msg.encode())
def run_demo_client(user_id):
s = DemoServer(*address)
s.send("Hello, I am user {}.".format(user_id))
for i in range(random.randint(1, max_num_messages)):
s.send("Message {} from user {}.".format(i, user_id))
delay_random()
s.send("Signing off, user {}.".format(user_id))
def main():
s = DemoServer(*address)
s.send("First connection has connected!")
random.seed()
clients = []
user_id = 0
while True:
while len(clients) < max_num_clients:
t = Thread(target=run_demo_client, args=[user_id], daemon=True)
t.start()
clients.append(t)
delay_random()
user_id += 1
sleep(0.1)
clients = [i for i in clients if i.is_alive()]
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment