Skip to content

Instantly share code, notes, and snippets.

@dbr
Created October 30, 2012 11:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dbr/3979658 to your computer and use it in GitHub Desktop.
Save dbr/3979658 to your computer and use it in GitHub Desktop.
"""Broken-but-maybe-informative version of code from
http://stackoverflow.com/questions/13124235/python-process-communications-via-pipes-race-condition#13124235
"""
import os
import string
def safe_write(*args, **kwargs):
while True:
try:
return os.write(*args, **kwargs)
except OSError as e:
if e.errno == 35:
import time
print(".")
time.sleep(0.5)
else:
raise
class Pipe:
"""
there are a bunch of constants set up here. I dont think it would be useful to include them. Just think like this: Pipe.WHATEVER = 'WHATEVER'
"""
LINE_START = "LINE_START"
MESSAGE_START = "MESSAGE_START"
MESSAGE_END = "MESSAGE_END"
LINE_END = "LINE_END"
KEY = "KEY"
VALUE = "VALUE"
READLINE_FAIL = "READLINE_FAIL"
def __init__(self,sPath):
"""
create the fifo. if it already exists just associate with it
"""
self.sPath = sPath
if not os.path.exists(sPath):
os.mkfifo(sPath)
self.iFH = os.open(sPath,os.O_RDWR | os.O_NONBLOCK)
self.iFHBlocking = os.open(sPath,os.O_RDWR)
def write(self,dMessage):
"""
write the dict to the fifo
if dMessage is not a dictionary then there will be an exception here. There never is
"""
self.writeln(Pipe.MESSAGE_START)
for k in dMessage:
self.writeln(Pipe.KEY)
self.writeln(k)
self.writeln(Pipe.VALUE)
self.writeln(dMessage[k])
self.writeln(Pipe.MESSAGE_END)
def writeln(self, s):
safe_write(self.iFH, bytes('{0} : {1}\n'.format(Pipe.LINE_START, len(s)+1), 'utf-8'))
os.fsync(self.iFH)
safe_write(self.iFH, bytes('{0}\n'.format(s), 'utf-8'))
os.fsync(self.iFH)
safe_write(self.iFH, bytes(Pipe.LINE_END+'\n','utf-8'))
os.fsync(self.iFH)
def readln(self):
"""
look for LINE_START, get line size
read until LINE_END
clean up
return string
"""
#os.fsync(self.iFH)
iLineStartBaseLength = len(self.LINE_START)+3 #'{0} : '
s = os.read(self.iFH, iLineStartBaseLength).decode('utf-8')
print(s)
print(len(s), iLineStartBaseLength)
assert len(s) == iLineStartBaseLength
if Pipe.LINE_START in s:
#get the length of the line
sLineLen = ''
while True:
sCurrent = os.read(self.iFH,1).decode('utf-8')
if sCurrent == '\n':
break
sLineLen += sCurrent
try:
iLineLen = int(sLineLen.strip(string.punctuation+string.whitespace))
except:
raise Exception('Not a valid line length: "{0}"'.format(sLineLen))
#read the line
# raise Exception(sLineLen)
sLine = os.read(self.iFHBlocking,iLineLen).decode('utf-8')
#read the line terminator
sTerm = os.read(self.iFH,len(Pipe.LINE_END+'\n')).decode('utf-8')
if sTerm == Pipe.LINE_END+'\n':
return sLine
return Pipe.READLINE_FAIL
else:
return Pipe.READLINE_FAIL
def read(self):
"""
read from the fifo, make a dict
"""
dRet = {}
sKey = ''
sValue = ''
sCurrent = None
def value_flush():
nonlocal dRet, sKey, sValue, sCurrent
if sKey:
dRet[sKey.strip()] = sValue.strip()
sKey = ''
sValue = ''
sCurrent = ''
if True or self.message_start(): #FIXME: hacky
while True:
try:
sLine = self.readln()
except OSError:
continue # Try again
if sLine == Pipe.READLINE_FAIL:
continue # try again again
self.running_log.append(sLine)
if Pipe.MESSAGE_END in sLine:
value_flush()
return dRet
elif Pipe.KEY in sLine:
value_flush()
sCurrent = Pipe.KEY
elif Pipe.VALUE in sLine:
sCurrent = Pipe.VALUE
else:
if sCurrent == Pipe.VALUE:
sValue += sLine
elif sCurrent == Pipe.KEY:
sKey += sLine
else:
return Pipe.NO_MESSAGE
TEST_VALUE = {'a':'a1','b':'b2','c':'c3','d':'d4','e':'e5'}
from piping import Pipe, TEST_VALUE
oP = Pipe("/tmp/pipey")
while 1:
oP.write(TEST_VALUE)
from piping import Pipe, TEST_VALUE
oP = Pipe("/tmp/pipey")
while 1:
oP.running_log = []
line = oP.read()
if line == Pipe.READLINE_FAIL:
pass
elif line != TEST_VALUE:
print(line)
print(oP.running_log)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment