Skip to content

Instantly share code, notes, and snippets.

@datlife
Last active September 30, 2020 23:54
Show Gist options
  • Save datlife/a76c90e2a2c2e80e97ed3ffc5ed5bc3c to your computer and use it in GitHub Desktop.
Save datlife/a76c90e2a2c2e80e97ed3ffc5ed5bc3c to your computer and use it in GitHub Desktop.
Pub Sub pattern using pure `socket` programming on Python. No extra dependencies :) 🍻

Problem statement

I have a fast producer (client) and slow subscriber (server) due to slow Internet connection. How to decouple the connection such that server won't affect client performance?

  • The server will not send any data back to client once the TCP connection is established. In my case, it is a data sink.

Solution

In order to avoid drag down client IO, e.g. waiting for ACK from server, there are a few solutions:

  • Leverage a message queue (Redis, RabbitMQ)
  • Pure socket programming. The high level idea is to quickly send ACK back to client "I got your message", then forward packets to server later. This ensures to reduce IO cost on client side.

Use case

  • I host a server on my laptop and a cloud service on AWS. I need a way to stream the data to my server without affect the service on the cloud.
client -----> Extra TCP Server (middleman)-------- Packet Handler with non-blocking IO (separate thread) ------> server

Result

Can't be happier! The client is no longer blocked by server :)

""" Rather than directly sending packets to server, client would send to middle man
"""
import sys
import socket
import threading
import traceback
if sys.version_info[0] > 2:
from socketserver import ThreadingTCPServer, BaseRequestHandler
else:
from SocketServer import ThreadingTCPServer, BaseRequestHandler
MIDDLE_MAN = ("localhost", 8443)
SERVER = ("localhost", 8009)
class Handler(BaseRequestHandler, object):
""" Handller will stream data to server without blocking client"""
def __init__(self, *args, **kwargs):
self.data_sink = None # my actual server
super(Handler, self).__init__(*args, **kwargs)
def handle(self):
if self.data_sink is None:
self.data_sink = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.data_sink.connect()
print("Init new connection to Graphite")
print("Thread Name:{}".format(threading.current_thread().name))
try:
data = self.request.recv(4096)
self.data_sink.sendall(data)
except Exception as e:
print(e)
print(traceback.print_exc())
finally:
self.data_sink.close()
self.data_sink = None
if __name__ == "__main__":
server = ThreadingTCPServer(MIDDLE_MAN, Handler)
print("Listen at %s, forward data to %s" % (MIDDLE_MAN, SERVER))
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment