Created
March 17, 2012 01:33
-
-
Save leeclemens/2054194 to your computer and use it in GitHub Desktop.
Test case for issue with multithreading Popen with pipes when daemonized
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import logging | |
import os | |
import subprocess | |
import sys | |
from threading import Event, Thread | |
# Only used to sleep between sends | |
#noinspection PyUnresolvedReferences | |
#from time import sleep | |
# | |
# A small test case to reproduce a threading issue, | |
# only experienced when double-fork daemonization process occurs | |
# This can be verified by changing the value of DAEMONIZE | |
# | |
# The behavior when daemonized manifests as broken pipes for each subprocess | |
# But they do not occur consistently, | |
# sometimes even between subsequent executions | |
# | |
# One time, ELF> and binary data was seen in /tmp/tts_log! | |
# If the os.dup2 calls (below "Created daemon successfully") are commented out | |
# the test is successful. | |
# Although at least one time, the real app encountered broken pipe errors | |
# My best guess is that subprocess is not thread-safe | |
# And at least while daemonized, pipes are being crossed/shared somehow | |
# The basic layout is MainThread being called, | |
# which daemonizes based upon the value of DAEMONIZE. | |
# This main thread: | |
# 1) Starts the MainRecvThread, | |
# 2) Waits for the startedEvent | |
# 3) Starts MainSendThread | |
# 4) joins to each | |
# MainRecvThread starts as many children threads | |
# as determined by MainThread.__count, one per port | |
# Each WorkerRecvThread uses their own instance of Command to start Netcat | |
# listening on their specified port, piping into a temporary file | |
# MainSendThread starts as many children threads as | |
# determined by MainThread.__count, one per port | |
# Each WorkerSendThread uses their own instance of Command to start Netcat, | |
# sending the contents of MainThread.__sourceFile | |
# Both MainRecvThread and MainSendThread wait for each spawned child to finish | |
# Flip these two starting methods to test each case | |
DAEMONIZE = True | |
#DAEMONIZE = False | |
# More optionally changeable options are set in MainThread.__init__() | |
# And the netcat syntax may need to be altered for your version | |
# using a logger to verify no errors are occurring while daemonized | |
logSys = logging.getLogger("test") | |
logSys.setLevel(4) | |
fmt = logging.Formatter( | |
"%(asctime)s %(name)-10s: %(levelname)-8s: %(message)s" | |
) | |
target = "/tmp/tts_log" | |
open(target, "a").close() | |
hdlr = logging.FileHandler(target) | |
hdlr.setFormatter(fmt) | |
logSys.addHandler(hdlr) | |
class MainThread: | |
""" | |
Main Thread, called from command line | |
When it daemonizes: | |
Unexpected events occur relating to the subprocess's pipes | |
When it does not daemonize, everything works as expected. | |
In this example, the issue manifests as partially copied files | |
If 1 KB is copied, it is validly the first 1 KB of the source file | |
The size and number of partially copied files also varies between | |
each thread and between separate executions of this test app | |
""" | |
def __init__(self): | |
self.__running = True | |
# non-ascii/binary file | |
self.__sourceFile = "/bin/awk" | |
# The port number will be appended to this name | |
self.__outputFilePrefix = "/tmp/tts_" | |
self.__basePort = 64000 | |
self.__count = 10 | |
def runMainThread(self): | |
try: | |
# This is what makes the difference | |
# between working and pipe/buffer/thread issues | |
if DAEMONIZE: | |
self.__createDaemon() | |
startedEvent = Event() | |
mainRecvThread = MainRecvThread( | |
self.__outputFilePrefix, | |
self.__basePort, | |
self.__count, | |
startedEvent | |
) | |
mainRecvThread.start() | |
# Make sure each listener is ready | |
# although the behavior shows the first chunk of the file, | |
# and the sender will fail immediately if nothing is listening | |
startedEvent.wait() | |
mainSendThread = MainSendThread( | |
self.__sourceFile, | |
self.__basePort, | |
self.__count, | |
) | |
mainSendThread.start() | |
mainSendThread.join() | |
mainRecvThread.join() | |
return True | |
except Exception, e: | |
logSys.exception(e) | |
logSys.error("Failure running MainThread") | |
return False | |
def __createDaemon(self): | |
# From | |
# http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/ | |
try: | |
pid = os.fork() | |
if pid > 0: | |
sys.exit(0) | |
except OSError, e: | |
sys.stderr.write("Fork #1 failed: %d (%s)\n" % | |
(e.errno, e.strerror)) | |
sys.exit(1) | |
os.chdir("/") | |
os.setsid() | |
os.umask(0) | |
try: | |
pid = os.fork() | |
if pid > 0: | |
sys.exit(0) | |
except OSError, e: | |
sys.stderr.write("fork #2 failed: %d (%s)\n" % | |
(e.errno, e.strerror)) | |
sys.exit(1) | |
logSys.info("Created daemon successfully, pid %s" % os.getpid()) | |
# Specifically, commenting only these out yields successful results | |
sys.stdout.flush() | |
sys.stderr.flush() | |
si = file("/dev/null", "r") | |
so = file("/dev/null", "a+") | |
se = file("/dev/null", "a+", 0) | |
os.dup2(si.fileno(), sys.stdin.fileno()) | |
os.dup2(so.fileno(), sys.stdout.fileno()) | |
os.dup2(se.fileno(), sys.stderr.fileno()) | |
pass | |
class Command: | |
""" | |
Class that each child thread will obtain an instance of for | |
executing subprocess | |
This is its own class, because in the real app, | |
I need a handle in order to kill each thread's running subprocess | |
""" | |
def __init__(self, caller): | |
self.__caller = caller | |
self._subProcess = None | |
def executeCommand(self, command): | |
self._subProcess = subprocess.Popen( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
shell=True | |
) | |
stdout, stderr = self._subProcess.communicate() | |
exitCode = self._subProcess.returncode | |
if exitCode: | |
logSys.error("Failure executing command: %r" % command) | |
return "%s::%s" % (stdout, stderr), exitCode | |
else: | |
return stdout, exitCode | |
class MainRecvThread(Thread): | |
""" | |
Thread to start and manage each Receiver thread | |
""" | |
def __init__(self, | |
outputFilePrefix, | |
basePort, | |
count, | |
startedEvent): | |
Thread.__init__(self) | |
self.__outputFilePrefix = outputFilePrefix | |
self.__basePort = basePort | |
self.__count = count | |
self.__startedEvent = startedEvent | |
self.__recvThreads = [] | |
def run(self): | |
try: | |
mainRecvEvent = Event() | |
for i in xrange(self.__count): | |
self.__recvThreads.append( | |
WorkerRecvThread( | |
(self.__basePort + i), | |
mainRecvEvent, | |
self.__outputFilePrefix | |
) | |
) | |
for recvThread in self.__recvThreads: | |
recvThread.start() | |
# Wait for each child to start | |
mainRecvEvent.wait() | |
mainRecvEvent.clear() | |
# Tell MainThread we're ready | |
self.__startedEvent.set() | |
for recvThread in self.__recvThreads: | |
recvThread.join() | |
except Exception, e: | |
logSys.exception(e) | |
logSys.error("Fatal Error in Main Recv Thread") | |
class WorkerRecvThread(Thread): | |
""" | |
The Receiver Thread will use netcat to listen on a port | |
""" | |
def __init__(self, | |
port, | |
mainRecvEvent, | |
outputFilePrefix): | |
Thread.__init__(self) | |
self.__port = port | |
self.__mainRecvEvent = mainRecvEvent | |
self.__outputFilePrefix = outputFilePrefix | |
self.__command = Command(self) | |
def run(self): | |
try: | |
self.__mainRecvEvent.set() | |
# You may need to change netcat's command to listen | |
command = "/usr/bin/nc -l 127.0.0.1 %s | /bin/cat > %s%s" %\ | |
( | |
self.__port, | |
self.__outputFilePrefix, | |
self.__port | |
) | |
output, exitCode = self.__command.executeCommand(command) | |
if exitCode: | |
logSys.error("Failure with command %r: [%s]: %r" % | |
(command, exitCode, output)) | |
elif len(output) > 0: | |
logSys.info("Receive on port %s Output: %r" % | |
(self.__port, output)) | |
else: | |
logSys.debug( | |
"Receive on port %s finished without error or output" % | |
self.__port | |
) | |
except Exception, e: | |
logSys.exception(e) | |
logSys.error("Fatal Error in Worker Recv Thread") | |
class MainSendThread(Thread): | |
""" | |
Thread to start and manage the Sender Threads | |
""" | |
def __init__(self, | |
sourceFile, | |
basePort, | |
count): | |
Thread.__init__(self) | |
self.__sourceFile = sourceFile | |
self.__basePort = basePort | |
self.__count = count | |
self.__sendThreads = [] | |
def run(self): | |
try: | |
for i in xrange(self.__count): | |
self.__sendThreads.append( | |
WorkerSendThread( | |
self.__sourceFile, | |
(self.__basePort + i) | |
) | |
) | |
# Start them all | |
for sendThread in self.__sendThreads: | |
sendThread.start() | |
# Wait for each to finish | |
for sendThread in self.__sendThreads: | |
sendThread.join() | |
except Exception, e: | |
logSys.exception(e) | |
logSys.error("Fatal Error in Main Send Thread") | |
class WorkerSendThread(Thread): | |
""" | |
Sender Threads to send data to a corresponding Receiver thread's port | |
""" | |
def __init__(self, | |
sourceFile, | |
port): | |
Thread.__init__(self) | |
self.__sourceFile = sourceFile | |
self.__port = port | |
self.__command = Command(self) | |
def run(self): | |
try: | |
command = "/bin/cat %s | /usr/bin/nc 127.0.0.1 %s" %\ | |
( | |
self.__sourceFile, | |
self.__port | |
) | |
output, exitCode = self.__command.executeCommand(command) | |
if exitCode: | |
logSys.error("Failure command %r: [%s]: %r" % | |
(command, exitCode, output)) | |
elif len(output) > 0: | |
logSys.info("Send on port %s Output: %r" % | |
(self.__port, output)) | |
else: | |
logSys.debug( | |
"Sender on port %s finished without error or output" % | |
self.__port | |
) | |
except Exception, e: | |
logSys.exception(e) | |
logSys.error("Fatal Error in Worker Send Thread") | |
if __name__ == "__main__": | |
mainThread = MainThread() | |
exitCode = mainThread.runMainThread() | |
if exitCode: | |
logSys.info("Success") | |
sys.exit(0) | |
else: | |
logSys.error("Failure: %s" % exitCode) | |
sys.exit(-1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment