Skip to content

Instantly share code, notes, and snippets.

@tokibito
Created December 14, 2016 15:23
Show Gist options
  • Save tokibito/eb38535282244d184471d66fe6f09cce to your computer and use it in GitHub Desktop.
Save tokibito/eb38535282244d184471d66fe6f09cce to your computer and use it in GitHub Desktop.
from multiprocessing.reduction import sendfds, recvfds
import socket
import time
import os
import sys
import tempfile
from multiprocessing.reduction import sendfds, recvfds
import unittest
from unittest import TestCase
SOCKET_FILE = '/tmp/fd_pass.sock'
class Server:
def __init__(self, fds):
self.fds = fds
def run(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(SOCKET_FILE)
sock.listen(1)
conn, addr = sock.accept()
input_file = open('/tmp/data', 'rb', buffering=0)
try:
data = input_file.read(3)
print('server: %s' % data)
fd = input_file.fileno()
sendfds(conn, [fd])
print('server: sent fd=%s' % fd)
input_file.close()
sendfds(conn, self.fds)
finally:
conn.close()
sock.close()
class Client:
def connect(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(SOCKET_FILE)
try:
fd = recvfds(sock, 1)[0]
print('client: fd=%s' % fd)
input_file = os.fdopen(fd, 'rb')
data = input_file.read()
print('client: %s' % data)
input_file.close()
return recvfds(sock, 10)
finally:
sock.close()
class SendIoTest(TestCase):
def setUp(self):
if os.path.exists(SOCKET_FILE):
os.remove(SOCKET_FILE)
with open('/tmp/data', 'w') as datafile:
datafile.write("Hello, World!\n")
def test(self):
fds = self.newFDs(10)
pid = os.fork()
if pid:
# parent
server = Server(fds)
server.run()
sys.stderr.close() # to quiet test result
os.waitpid(pid, 0)
else:
# child
time.sleep(1)
client = Client()
received_fds = client.connect()
print(received_fds)
self.checkFDs(received_fds)
def newFDs(self, n):
fds = []
for i in range(n):
fd, path = tempfile.mkstemp()
self.addCleanup(os.close, fd)
os.write(fd, str(i).encode())
fds.append(fd)
return fds
def checkFDs(self, fds):
for n, fd in enumerate(fds):
os.lseek(fd, 0, os.SEEK_SET)
self.assertEqual(os.read(fd, 1024), str(n).encode())
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment