Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
multiprocessing.reduction socket server with parent processing passing connections to client after accepting connections
#!/usr/bin/env python
@author Sunil Mallya
Sample code to show a parent - child like process communication model where parent listens on a port and passes the pickled file descriptor
to the child process to read the bytes off the socket. The communication in this snippet is via a Queue which is thread/process safe
Just to be clear, the parent process is still accepting the connection and we are sending a live fd to the child
import os
import sys
import SocketServer
import Queue
import time
import socket
import multiprocessing
from multiprocessing.reduction import reduce_handle
from multiprocessing.reduction import rebuild_handle
#Refer for more multiprocessing info
class MultiprocessWorker(multiprocessing.Process):
def __init__(self, sq):
# base class initialization
# job management stuff
self.socket_queue = sq
self.kill_received = False
def run(self):
while not self.kill_received:
#If you used pipe, then recieve as below
#else dequeue
h = self.socket_queue.get_nowait()
received = client_socket.recv(1024)
print "Recieved on client: ",received
except Queue.Empty:
#Dummy timer
class MyTCPHandler(SocketServer.BaseRequestHandler):
The RequestHandler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
def handle(self):
# self.request is the TCP socket connected to the client = self.request.recv(1024).strip()
#print "{} wrote:".format(self.client_address[0])
# just send back the same data, but upper-cased
#Either pipe it to worker directly like this
#pipe_to_worker.send(h) #instanceofmultiprocessing.Pipe
#or use a Queue :)
h = reduce_handle(self.request.fileno())
if __name__ == "__main__":
address = ('localhost', 8082)
server = SocketServer.TCPServer(address, MyTCPHandler)
socket_queue = multiprocessing.Queue()
for i in range(2):
worker = MultiprocessWorker(socket_queue)
except KeyboardInterrupt:

Hello. I tried your script, but when I connect to it, it immediately closes the connection... I can't understand why. Can you help me?

dmaple commented May 15, 2016

It exits because the handle() method ends and the tcpserver closes the connection after the handle method ends / returns.
You can see this by adding a time.sleep(90) after the socket_queue.put(h)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment