Skip to content

Instantly share code, notes, and snippets.

@zTrix
Created Nov 9, 2013
Embed
What would you like to do?
pyuv segfault demo code
import os, sys, random, threading
def execute(host):
while True:
io = zio((ip, 21712))
io.read_until('READY')
io.write('something something')
io.close()
for i in range(256):
t = threading.Thread(target = execute, args = (ip, ))
t.daemon = True
t.start()
import struct, socket, os, sys, tempfile, select, pyuv, subprocess, signal, threading, pty, time, re
from StringIO import StringIO
try:
from termcolor import colored
except:
def colored(text, color=None, on_color=None, attrs=None):
return text
def stdout(s, color = None, on_color = None, attrs = None):
if not color:
sys.stdout.write(s)
else:
sys.stdout.write(colorde(s, color, on_color, attrs))
sys.stdout.flush()
def log(s, color = None, on_color = None, attrs = None, new_line = True):
if not color:
print >> sys.stderr, str(s),
else:
print >> sys.stderr, colored(str(s), color, on_color, attrs),
if new_line:
sys.stderr.write('\n')
sys.stderr.flush()
def l16(i):
return struct.pack('<H', i)
def b16(i):
return struct.pack('>H', i)
def l32(i):
return struct.pack('<I', i)
def b32(i):
return struct.pack('>I', i)
def l64(i):
return struct.pack('<Q', i)
def b64(i):
return struct.pack('>Q', i)
class zio(threading.Thread):
# this useless TIMEOUT is for condition.wait(TIMEOUT)
# to bypass python issue http://bugs.python.org/issue8296
# if no timeout provided, Ctrl-C cannot kill the waiting thread
TIMEOUT = 1<<30
def __init__(self, cmdline, print_read = True, print_write = True, print_log = True):
if not cmdline:
raise Exception('cmdline or socket not provided for zio')
threading.Thread.__init__(self)
self.lock = threading.Condition()
self.print_read = print_read
self.print_write = print_write
self.print_log = print_log
self.outbuf = StringIO()
self.loop = pyuv.Loop.default_loop()
self.uvout = pyuv.Pipe(self.loop)
self.eof = False
self.is_socket = type(cmdline) == tuple and len(cmdline) == 2 and isinstance(cmdline[1], (int, long))
def on_uv_read(handle, data, error):
# print >> sys.stderr, 'on_uv_read', repr(handle), repr(data), repr(error)
self.lock.acquire()
if data is None:
if self.print_log: log('process stdout EOF', 'cyan')
if self.is_socket:
self.sock.close()
else:
if not self.proc.stdin.closed:
self.proc.stdin.close()
# self.proc.stdout.close()
self.eof = True
# self.uvout.stop()
else:
if self.print_read:
stdout(data)
# be careful here, since we use the pos of the StringIO object, so we should not move the position after writing the buffer
pos = self.outbuf.tell()
self.outbuf.seek(0, os.SEEK_END)
self.outbuf.write(data)
self.outbuf.seek(pos)
self.lock.notify()
self.lock.release()
if self.is_socket:
# consider as remote socket connection
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(cmdline)
self.uvout.open(self.sock.fileno())
self.uvout.start_read(on_uv_read)
else:
master_fd, slave_fd = pty.openpty()
def on_proc_exit(proc, exit_status, term_signal):
self.lock.acquire()
if self.print_log: log('process exit, status = %d, signal = %d' % (exit_status, term_signal), 'cyan')
self.proc.exit_status = exit_status
os.close(slave_fd)
self.lock.notify()
self.lock.release()
self.uvout.open(master_fd)
self.proc = pyuv.Process(self.loop)
self.proc.stdin = pyuv.Pipe(self.loop)
self.proc.stdout = pyuv.Pipe(self.loop)
# if we use the default PIPE in pyuv, then it would be block buffer for process
# so we have to use pty to solve this issue, but we cannot get EOF in on_uv_read
# the work around solution is to close slave_fd when process exit, and it works like a charm
# and the downside of using pty is, \n will be replaced to \r\n, so be careful
stdio = [pyuv.StdIO(stream = self.proc.stdin, flags = pyuv.UV_CREATE_PIPE | pyuv.UV_READABLE_PIPE), pyuv.StdIO(fd = slave_fd, flags = pyuv.UV_INHERIT_FD)]
if type(cmdline) == str:
executable = cmdline
args = []
elif type(cmdline) == list:
executable = cmdline[0]
args = cmdline[1:]
else:
raise Exception('Bad cmdline param')
self.proc.exit_status = None
self.proc.spawn(file = executable, args = args, exit_callback = on_proc_exit, stdio = stdio)
# self.proc.stdout.start_read(on_uv_read)
self.uvout.start_read(on_uv_read)
self.daemon = True
self.start()
def stop_read(self):
self.uvout.stop()
def close(self):
'''
close writing channel
'''
if self.is_socket:
self.sock.shutdown(socket.SHUT_RDWR)
else:
self.proc.stdin.close()
def run(self):
if not self.loop:
raise Exception('ctor failed to init variable')
if self.print_log: log('begins reading event loop', 'cyan')
while self.loop.run(): pass
if self.print_log: log('ended reading event loop', 'cyan')
def write(self, s):
if self.is_socket:
self.lock.acquire()
if self.print_write:
stdout(s)
self.sock.sendall(s)
self.lock.release()
else:
if not self.writable():
raise Exception('subprocess stdin not writable')
self.lock.acquire()
if self.print_write:
stdout(s)
self.proc.stdin.write(s)
# self.proc.stdin.flush()
self.lock.release()
def read_before(self, pivot, length):
raise Exception('Not Implemented')
def read_after(self, pivot, length):
raise Exception('Not Implemented')
def read_between(self, pivot, end):
raise Exception('Not Implemented')
def read_range(self, begin, end):
raise Exception('Not Implemented')
def wait_eof(self):
self.lock.acquire()
while not self.eof:
self.lock.wait(zio.TIMEOUT)
self.lock.release()
def read_until(self, s, skip = 0):
'''
s could be string, re, or function
'''
if not isinstance(skip, (int, long)):
raise TypeError("skip must be an integer")
self.lock.acquire()
found = None
while found is None:
pos = self.outbuf.tell()
cur = self.outbuf.getvalue()
if type(s) == str:
off = pos
sk = skip
while True:
off = cur.find(s, off)
if off < 0:
break
if sk:
off += len(s)
sk -= 1
else:
found = off + len(s)
self.outbuf.seek(found, os.SEEK_SET)
break
elif hasattr(s, 'search'):
sk = skip
for mo in re.finditer(s, cur[pos:]):
if sk == 0:
self.lock.release()
found = pos + mo.end() + 1
self.outbuf.seek(found)
break
else:
sk -= 1
elif callable(s):
rs = s(cur[offset:])
if rs > -1:
found = rs
self.outbuf.seek(found)
break
else:
raise Exception('invalid parameter s', s)
if found is None:
if self.print_log: log('target string `%s` not found, wait for more input, total recved = %d bytes, tell = %d' % (s, len(cur), self.outbuf.tell()), 'yellow')
self.lock.wait(zio.TIMEOUT)
self.lock.release()
return found
def readline(self):
self.lock.acquire()
found = None
while not found:
pos = self.outbuf.tell()
if self.outbuf.getvalue().find('\n', pos) > -1 or self.eof:
found = self.outbuf.readline()
else:
self.lock.wait(zio.TIMEOUT)
self.lock.release()
return found
read_line = readline
def read(self, size = None):
if size is None:
self.lock.acquire()
while not self.eof:
self.lock.wait(zio.TIMEOUT)
ret = self.outbuf.read()
self.lock.release()
return ret
self.lock.acquire()
while self.outbuf.tell() + size > self.outbuf.len and not self.eof:
if self.print_log: log('target len `%d` not satisfied, waiting for more input, total recved = %d bytes, tell = %d' % (size, self.outbuf.len, self.outbuf.tell()), 'yellow')
self.lock.wait(zio.TIMEOUT)
ret = self.outbuf.read(size)
self.lock.release()
return ret
def pin(self, offset = None):
self.lock.acquire()
if isinstance(offset, (int, long)):
while offset >= self.outbuf.len and not self.eof:
self.lock.wait(zio.TIMEOUT)
self.outbuf.seek(offset, os.SEEK_SET)
else:
self.outbuf.seek(0, os.SEEK_END)
self.lock.release()
def writable(self):
if self.eof:
return False
if not self.is_socket and self.proc.exit_status is not None:
return False
return True
def interact(self):
try:
if self.is_socket:
while self.writable():
getline = raw_input()
if not self.writable(): break
self.sock.sendall(getline + '\n')
else:
while self.writable():
getline = raw_input()
if not self.writable(): break
self.proc.stdin.write(getline + '\n')
# self.proc.stdin.flush()
except KeyboardInterrupt, ex:
if self.print_log: log('received Ctrl-C,', 'yellow', new_line = False)
pass
if self.print_log: log('quit interact mode', 'yellow')
if __name__ == '__main__':
pass
Copy link

ghost commented Nov 9, 2013

vissza struct. csomag ('<H', i)

Copy link

ghost commented Nov 9, 2013

vissza struct. csomag ('<H', i)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment