Skip to content

Instantly share code, notes, and snippets.

@leeclemens
Created March 17, 2012 01:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leeclemens/2054194 to your computer and use it in GitHub Desktop.
Save leeclemens/2054194 to your computer and use it in GitHub Desktop.
Test case for issue with multithreading Popen with pipes when daemonized
#!/usr/bin/env python
import logging
import os
import subprocess
import sys
from threading import Event, Thread
# Only used to sleep between sends
#noinspection PyUnresolvedReferences
#from time import sleep
#
# A small test case to reproduce a threading issue,
# only experienced when double-fork daemonization process occurs
# This can be verified by changing the value of DAEMONIZE
#
# The behavior when daemonized manifests as broken pipes for each subprocess
# But they do not occur consistently,
# sometimes even between subsequent executions
#
# One time, ELF> and binary data was seen in /tmp/tts_log!
# If the os.dup2 calls (below "Created daemon successfully") are commented out
# the test is successful.
# Although at least one time, the real app encountered broken pipe errors
# My best guess is that subprocess is not thread-safe
# And at least while daemonized, pipes are being crossed/shared somehow
# The basic layout is MainThread being called,
# which daemonizes based upon the value of DAEMONIZE.
# This main thread:
# 1) Starts the MainRecvThread,
# 2) Waits for the startedEvent
# 3) Starts MainSendThread
# 4) joins to each
# MainRecvThread starts as many children threads
# as determined by MainThread.__count, one per port
# Each WorkerRecvThread uses their own instance of Command to start Netcat
# listening on their specified port, piping into a temporary file
# MainSendThread starts as many children threads as
# determined by MainThread.__count, one per port
# Each WorkerSendThread uses their own instance of Command to start Netcat,
# sending the contents of MainThread.__sourceFile
# Both MainRecvThread and MainSendThread wait for each spawned child to finish
# Flip these two starting methods to test each case
DAEMONIZE = True
#DAEMONIZE = False
# More optionally changeable options are set in MainThread.__init__()
# And the netcat syntax may need to be altered for your version
# using a logger to verify no errors are occurring while daemonized
logSys = logging.getLogger("test")
logSys.setLevel(4)
fmt = logging.Formatter(
"%(asctime)s %(name)-10s: %(levelname)-8s: %(message)s"
)
target = "/tmp/tts_log"
open(target, "a").close()
hdlr = logging.FileHandler(target)
hdlr.setFormatter(fmt)
logSys.addHandler(hdlr)
class MainThread:
"""
Main Thread, called from command line
When it daemonizes:
Unexpected events occur relating to the subprocess's pipes
When it does not daemonize, everything works as expected.
In this example, the issue manifests as partially copied files
If 1 KB is copied, it is validly the first 1 KB of the source file
The size and number of partially copied files also varies between
each thread and between separate executions of this test app
"""
def __init__(self):
self.__running = True
# non-ascii/binary file
self.__sourceFile = "/bin/awk"
# The port number will be appended to this name
self.__outputFilePrefix = "/tmp/tts_"
self.__basePort = 64000
self.__count = 10
def runMainThread(self):
try:
# This is what makes the difference
# between working and pipe/buffer/thread issues
if DAEMONIZE:
self.__createDaemon()
startedEvent = Event()
mainRecvThread = MainRecvThread(
self.__outputFilePrefix,
self.__basePort,
self.__count,
startedEvent
)
mainRecvThread.start()
# Make sure each listener is ready
# although the behavior shows the first chunk of the file,
# and the sender will fail immediately if nothing is listening
startedEvent.wait()
mainSendThread = MainSendThread(
self.__sourceFile,
self.__basePort,
self.__count,
)
mainSendThread.start()
mainSendThread.join()
mainRecvThread.join()
return True
except Exception, e:
logSys.exception(e)
logSys.error("Failure running MainThread")
return False
def __createDaemon(self):
# From
# http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write("Fork #1 failed: %d (%s)\n" %
(e.errno, e.strerror))
sys.exit(1)
os.chdir("/")
os.setsid()
os.umask(0)
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" %
(e.errno, e.strerror))
sys.exit(1)
logSys.info("Created daemon successfully, pid %s" % os.getpid())
# Specifically, commenting only these out yields successful results
sys.stdout.flush()
sys.stderr.flush()
si = file("/dev/null", "r")
so = file("/dev/null", "a+")
se = file("/dev/null", "a+", 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
pass
class Command:
"""
Class that each child thread will obtain an instance of for
executing subprocess
This is its own class, because in the real app,
I need a handle in order to kill each thread's running subprocess
"""
def __init__(self, caller):
self.__caller = caller
self._subProcess = None
def executeCommand(self, command):
self._subProcess = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True
)
stdout, stderr = self._subProcess.communicate()
exitCode = self._subProcess.returncode
if exitCode:
logSys.error("Failure executing command: %r" % command)
return "%s::%s" % (stdout, stderr), exitCode
else:
return stdout, exitCode
class MainRecvThread(Thread):
"""
Thread to start and manage each Receiver thread
"""
def __init__(self,
outputFilePrefix,
basePort,
count,
startedEvent):
Thread.__init__(self)
self.__outputFilePrefix = outputFilePrefix
self.__basePort = basePort
self.__count = count
self.__startedEvent = startedEvent
self.__recvThreads = []
def run(self):
try:
mainRecvEvent = Event()
for i in xrange(self.__count):
self.__recvThreads.append(
WorkerRecvThread(
(self.__basePort + i),
mainRecvEvent,
self.__outputFilePrefix
)
)
for recvThread in self.__recvThreads:
recvThread.start()
# Wait for each child to start
mainRecvEvent.wait()
mainRecvEvent.clear()
# Tell MainThread we're ready
self.__startedEvent.set()
for recvThread in self.__recvThreads:
recvThread.join()
except Exception, e:
logSys.exception(e)
logSys.error("Fatal Error in Main Recv Thread")
class WorkerRecvThread(Thread):
"""
The Receiver Thread will use netcat to listen on a port
"""
def __init__(self,
port,
mainRecvEvent,
outputFilePrefix):
Thread.__init__(self)
self.__port = port
self.__mainRecvEvent = mainRecvEvent
self.__outputFilePrefix = outputFilePrefix
self.__command = Command(self)
def run(self):
try:
self.__mainRecvEvent.set()
# You may need to change netcat's command to listen
command = "/usr/bin/nc -l 127.0.0.1 %s | /bin/cat > %s%s" %\
(
self.__port,
self.__outputFilePrefix,
self.__port
)
output, exitCode = self.__command.executeCommand(command)
if exitCode:
logSys.error("Failure with command %r: [%s]: %r" %
(command, exitCode, output))
elif len(output) > 0:
logSys.info("Receive on port %s Output: %r" %
(self.__port, output))
else:
logSys.debug(
"Receive on port %s finished without error or output" %
self.__port
)
except Exception, e:
logSys.exception(e)
logSys.error("Fatal Error in Worker Recv Thread")
class MainSendThread(Thread):
"""
Thread to start and manage the Sender Threads
"""
def __init__(self,
sourceFile,
basePort,
count):
Thread.__init__(self)
self.__sourceFile = sourceFile
self.__basePort = basePort
self.__count = count
self.__sendThreads = []
def run(self):
try:
for i in xrange(self.__count):
self.__sendThreads.append(
WorkerSendThread(
self.__sourceFile,
(self.__basePort + i)
)
)
# Start them all
for sendThread in self.__sendThreads:
sendThread.start()
# Wait for each to finish
for sendThread in self.__sendThreads:
sendThread.join()
except Exception, e:
logSys.exception(e)
logSys.error("Fatal Error in Main Send Thread")
class WorkerSendThread(Thread):
"""
Sender Threads to send data to a corresponding Receiver thread's port
"""
def __init__(self,
sourceFile,
port):
Thread.__init__(self)
self.__sourceFile = sourceFile
self.__port = port
self.__command = Command(self)
def run(self):
try:
command = "/bin/cat %s | /usr/bin/nc 127.0.0.1 %s" %\
(
self.__sourceFile,
self.__port
)
output, exitCode = self.__command.executeCommand(command)
if exitCode:
logSys.error("Failure command %r: [%s]: %r" %
(command, exitCode, output))
elif len(output) > 0:
logSys.info("Send on port %s Output: %r" %
(self.__port, output))
else:
logSys.debug(
"Sender on port %s finished without error or output" %
self.__port
)
except Exception, e:
logSys.exception(e)
logSys.error("Fatal Error in Worker Send Thread")
if __name__ == "__main__":
mainThread = MainThread()
exitCode = mainThread.runMainThread()
if exitCode:
logSys.info("Success")
sys.exit(0)
else:
logSys.error("Failure: %s" % exitCode)
sys.exit(-1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment