Skip to content

Instantly share code, notes, and snippets.

@amon-ra
Created May 6, 2014 11:39
Show Gist options
  • Save amon-ra/3e6a3789e6d6543522ac to your computer and use it in GitHub Desktop.
Save amon-ra/3e6a3789e6d6543522ac to your computer and use it in GitHub Desktop.
Python non-blocking read-write
class StdAsyncIOt():
"""Implementes Non blocking readads compatible with windows"""
def _worker_reader_t(self):
#print ("Hilo")
while self.run and not self.err:
try:
data=os.read(self.fd, self.BUFSIZE)
#print (self.BUFSIZE,data)
#if data == '' and self.process.poll() != None: raise "Null Data Readed"
self.q.put(data)
except Exception as e:
self.err=True
self.run=False
print("worker_reader_t error:",e)
print ("worker_reader_t finish")
def _worker_writer_t(self):
#print ("Hilo")
while not self.err and (not self.q.empty() or self.run):
try:
try:
data=self.q.get(timeout=1)
except:
continue
data=os.write(self.fd, data)
# print ("Size:",self.q.qsize())
self.q.task_done()
# print ("Write Done")
#print (self.BUFSIZE,data)
#if data == '' and self.process.poll() != None: raise "Null Data Readed"
except Exception as e:
self.err=True
self.run=False
print("worker_writer_t error:",e)
print ("worker_writer_t finish")
def __init__(self,mode='read',SIZE=8192):
self.run=False
if sys.platform.startswith('win'):
print ("New Async Thread")
self.BUFSIZE=SIZE
self.MAXSIZE=SIZE
if mode == 'read':
self.MAXSIZE=0
self.q=queue.Queue(self.MAXSIZE)
self.err=False
# self.fd=r
# self.run=True
# self.t=threading.Thread(target=self.worker_t)
# self.t.start()
self.mode=mode
if mode == 'read':
self.t=threading.Thread(target=self._worker_reader_t)
else:
self.t=threading.Thread(target=self._worker_writer_t)
def start(self,fd):
if not self.t.isAlive():
print ("r start")
self.fd = fd
# self.q=Queue()
self.err=False
self.run=True
self.t.start()
def stop(self):
self.run=False
print ("stopping",self.mode)
if self.mode == 'write':
self.t.join()
def is_read(self):
return not self.q.empty()
def is_write(self):
if self.MAXSIZE == 0:
return True
if self.q.qsize() < self.MAXSIZE-1:
return True
print ("MAXSIZE")
return False
def read_t(self,stdout_fd, BUFSIZE):
if sys.platform.startswith('win'):
try:
data=self.q.get_nowait()
self.q.task_done()
return data
except Exception as e:
print ("read_t",e)
#self.run=False
pass
else:
return os.read(stdout_fd, BUFSIZE)
def write_t(self,stdin_fd,data):
if sys.platform.startswith('win'):
try:
self.q.put(data)
# self.q.task_done()
return len(data)
except Exception as e:
print ("write_t",e)
#self.run=False
pass
else:
return os.write(stdin_fd, data)
class StdAsyncIO:
def __init__(self,BUFSIZE=4096):
self.BUFSIZE=BUFSIZE
self.std_list=dict()
def stop(self):
for key,elem in self.std_list.items():
elem.stop()
def select(self,r,w,x,timeout):
if sys.platform.startswith('win'):
ret_r=[]
ret_w=[]
ret_x=[]
wait=False
for elem in r:
if elem in self.std_list:
if self.std_list[elem].err:
ret_x.append(elem)
elif self.std_list[elem].is_read():
ret_r.append(elem)
else:
self.std_list[elem].start(elem)
wait=self.std_list[elem].t
# self.std_list[elem].t.join(timeout)
else:
self.std_list[elem]=StdAsyncIOt('read',self.BUFSIZE)
self.std_list[elem].start(elem)
wait=self.std_list[elem].t
# self.std_list[elem].t.join(timeout)
for elem in w:
if elem in self.std_list:
if self.std_list[elem].err:
ret_x.append(elem)
elif self.std_list[elem].is_write():
ret_w.append(elem)
else:
self.std_list[elem].start(elem)
else:
self.std_list[elem]=StdAsyncIOt('write')
self.std_list[elem].start(elem)
if len(ret_w) <1 and len(ret_x) <1 and wait:
wait.join(timeout)
return ret_r,ret_w,ret_x
else:
return select.select(r,w,x,timeout)
def read_t(self,stdout_fd, BUFSIZE):
return self.std_list[stdout_fd].read_t(stdout_fd, BUFSIZE)
def write_t(self,stdin_fd,data):
return self.std_list[stdin_fd].write_t(stdin_fd,data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment