Created
November 9, 2013 15:32
-
-
Save zTrix/7386625 to your computer and use it in GitHub Desktop.
pyuv segfault demo code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, sys, random, threading | |
def execute(host): | |
while True: | |
io = zio((ip, 21712)) | |
io.read_until('READY') | |
io.write('something something') | |
io.close() | |
for i in range(256): | |
t = threading.Thread(target = execute, args = (ip, )) | |
t.daemon = True | |
t.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct, socket, os, sys, tempfile, select, pyuv, subprocess, signal, threading, pty, time, re | |
from StringIO import StringIO | |
try: | |
from termcolor import colored | |
except: | |
def colored(text, color=None, on_color=None, attrs=None): | |
return text | |
def stdout(s, color = None, on_color = None, attrs = None): | |
if not color: | |
sys.stdout.write(s) | |
else: | |
sys.stdout.write(colorde(s, color, on_color, attrs)) | |
sys.stdout.flush() | |
def log(s, color = None, on_color = None, attrs = None, new_line = True): | |
if not color: | |
print >> sys.stderr, str(s), | |
else: | |
print >> sys.stderr, colored(str(s), color, on_color, attrs), | |
if new_line: | |
sys.stderr.write('\n') | |
sys.stderr.flush() | |
def l16(i): | |
return struct.pack('<H', i) | |
def b16(i): | |
return struct.pack('>H', i) | |
def l32(i): | |
return struct.pack('<I', i) | |
def b32(i): | |
return struct.pack('>I', i) | |
def l64(i): | |
return struct.pack('<Q', i) | |
def b64(i): | |
return struct.pack('>Q', i) | |
class zio(threading.Thread): | |
# this useless TIMEOUT is for condition.wait(TIMEOUT) | |
# to bypass python issue http://bugs.python.org/issue8296 | |
# if no timeout provided, Ctrl-C cannot kill the waiting thread | |
TIMEOUT = 1<<30 | |
def __init__(self, cmdline, print_read = True, print_write = True, print_log = True): | |
if not cmdline: | |
raise Exception('cmdline or socket not provided for zio') | |
threading.Thread.__init__(self) | |
self.lock = threading.Condition() | |
self.print_read = print_read | |
self.print_write = print_write | |
self.print_log = print_log | |
self.outbuf = StringIO() | |
self.loop = pyuv.Loop.default_loop() | |
self.uvout = pyuv.Pipe(self.loop) | |
self.eof = False | |
self.is_socket = type(cmdline) == tuple and len(cmdline) == 2 and isinstance(cmdline[1], (int, long)) | |
def on_uv_read(handle, data, error): | |
# print >> sys.stderr, 'on_uv_read', repr(handle), repr(data), repr(error) | |
self.lock.acquire() | |
if data is None: | |
if self.print_log: log('process stdout EOF', 'cyan') | |
if self.is_socket: | |
self.sock.close() | |
else: | |
if not self.proc.stdin.closed: | |
self.proc.stdin.close() | |
# self.proc.stdout.close() | |
self.eof = True | |
# self.uvout.stop() | |
else: | |
if self.print_read: | |
stdout(data) | |
# be careful here, since we use the pos of the StringIO object, so we should not move the position after writing the buffer | |
pos = self.outbuf.tell() | |
self.outbuf.seek(0, os.SEEK_END) | |
self.outbuf.write(data) | |
self.outbuf.seek(pos) | |
self.lock.notify() | |
self.lock.release() | |
if self.is_socket: | |
# consider as remote socket connection | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.sock.connect(cmdline) | |
self.uvout.open(self.sock.fileno()) | |
self.uvout.start_read(on_uv_read) | |
else: | |
master_fd, slave_fd = pty.openpty() | |
def on_proc_exit(proc, exit_status, term_signal): | |
self.lock.acquire() | |
if self.print_log: log('process exit, status = %d, signal = %d' % (exit_status, term_signal), 'cyan') | |
self.proc.exit_status = exit_status | |
os.close(slave_fd) | |
self.lock.notify() | |
self.lock.release() | |
self.uvout.open(master_fd) | |
self.proc = pyuv.Process(self.loop) | |
self.proc.stdin = pyuv.Pipe(self.loop) | |
self.proc.stdout = pyuv.Pipe(self.loop) | |
# if we use the default PIPE in pyuv, then it would be block buffer for process | |
# so we have to use pty to solve this issue, but we cannot get EOF in on_uv_read | |
# the work around solution is to close slave_fd when process exit, and it works like a charm | |
# and the downside of using pty is, \n will be replaced to \r\n, so be careful | |
stdio = [pyuv.StdIO(stream = self.proc.stdin, flags = pyuv.UV_CREATE_PIPE | pyuv.UV_READABLE_PIPE), pyuv.StdIO(fd = slave_fd, flags = pyuv.UV_INHERIT_FD)] | |
if type(cmdline) == str: | |
executable = cmdline | |
args = [] | |
elif type(cmdline) == list: | |
executable = cmdline[0] | |
args = cmdline[1:] | |
else: | |
raise Exception('Bad cmdline param') | |
self.proc.exit_status = None | |
self.proc.spawn(file = executable, args = args, exit_callback = on_proc_exit, stdio = stdio) | |
# self.proc.stdout.start_read(on_uv_read) | |
self.uvout.start_read(on_uv_read) | |
self.daemon = True | |
self.start() | |
def stop_read(self): | |
self.uvout.stop() | |
def close(self): | |
''' | |
close writing channel | |
''' | |
if self.is_socket: | |
self.sock.shutdown(socket.SHUT_RDWR) | |
else: | |
self.proc.stdin.close() | |
def run(self): | |
if not self.loop: | |
raise Exception('ctor failed to init variable') | |
if self.print_log: log('begins reading event loop', 'cyan') | |
while self.loop.run(): pass | |
if self.print_log: log('ended reading event loop', 'cyan') | |
def write(self, s): | |
if self.is_socket: | |
self.lock.acquire() | |
if self.print_write: | |
stdout(s) | |
self.sock.sendall(s) | |
self.lock.release() | |
else: | |
if not self.writable(): | |
raise Exception('subprocess stdin not writable') | |
self.lock.acquire() | |
if self.print_write: | |
stdout(s) | |
self.proc.stdin.write(s) | |
# self.proc.stdin.flush() | |
self.lock.release() | |
def read_before(self, pivot, length): | |
raise Exception('Not Implemented') | |
def read_after(self, pivot, length): | |
raise Exception('Not Implemented') | |
def read_between(self, pivot, end): | |
raise Exception('Not Implemented') | |
def read_range(self, begin, end): | |
raise Exception('Not Implemented') | |
def wait_eof(self): | |
self.lock.acquire() | |
while not self.eof: | |
self.lock.wait(zio.TIMEOUT) | |
self.lock.release() | |
def read_until(self, s, skip = 0): | |
''' | |
s could be string, re, or function | |
''' | |
if not isinstance(skip, (int, long)): | |
raise TypeError("skip must be an integer") | |
self.lock.acquire() | |
found = None | |
while found is None: | |
pos = self.outbuf.tell() | |
cur = self.outbuf.getvalue() | |
if type(s) == str: | |
off = pos | |
sk = skip | |
while True: | |
off = cur.find(s, off) | |
if off < 0: | |
break | |
if sk: | |
off += len(s) | |
sk -= 1 | |
else: | |
found = off + len(s) | |
self.outbuf.seek(found, os.SEEK_SET) | |
break | |
elif hasattr(s, 'search'): | |
sk = skip | |
for mo in re.finditer(s, cur[pos:]): | |
if sk == 0: | |
self.lock.release() | |
found = pos + mo.end() + 1 | |
self.outbuf.seek(found) | |
break | |
else: | |
sk -= 1 | |
elif callable(s): | |
rs = s(cur[offset:]) | |
if rs > -1: | |
found = rs | |
self.outbuf.seek(found) | |
break | |
else: | |
raise Exception('invalid parameter s', s) | |
if found is None: | |
if self.print_log: log('target string `%s` not found, wait for more input, total recved = %d bytes, tell = %d' % (s, len(cur), self.outbuf.tell()), 'yellow') | |
self.lock.wait(zio.TIMEOUT) | |
self.lock.release() | |
return found | |
def readline(self): | |
self.lock.acquire() | |
found = None | |
while not found: | |
pos = self.outbuf.tell() | |
if self.outbuf.getvalue().find('\n', pos) > -1 or self.eof: | |
found = self.outbuf.readline() | |
else: | |
self.lock.wait(zio.TIMEOUT) | |
self.lock.release() | |
return found | |
read_line = readline | |
def read(self, size = None): | |
if size is None: | |
self.lock.acquire() | |
while not self.eof: | |
self.lock.wait(zio.TIMEOUT) | |
ret = self.outbuf.read() | |
self.lock.release() | |
return ret | |
self.lock.acquire() | |
while self.outbuf.tell() + size > self.outbuf.len and not self.eof: | |
if self.print_log: log('target len `%d` not satisfied, waiting for more input, total recved = %d bytes, tell = %d' % (size, self.outbuf.len, self.outbuf.tell()), 'yellow') | |
self.lock.wait(zio.TIMEOUT) | |
ret = self.outbuf.read(size) | |
self.lock.release() | |
return ret | |
def pin(self, offset = None): | |
self.lock.acquire() | |
if isinstance(offset, (int, long)): | |
while offset >= self.outbuf.len and not self.eof: | |
self.lock.wait(zio.TIMEOUT) | |
self.outbuf.seek(offset, os.SEEK_SET) | |
else: | |
self.outbuf.seek(0, os.SEEK_END) | |
self.lock.release() | |
def writable(self): | |
if self.eof: | |
return False | |
if not self.is_socket and self.proc.exit_status is not None: | |
return False | |
return True | |
def interact(self): | |
try: | |
if self.is_socket: | |
while self.writable(): | |
getline = raw_input() | |
if not self.writable(): break | |
self.sock.sendall(getline + '\n') | |
else: | |
while self.writable(): | |
getline = raw_input() | |
if not self.writable(): break | |
self.proc.stdin.write(getline + '\n') | |
# self.proc.stdin.flush() | |
except KeyboardInterrupt, ex: | |
if self.print_log: log('received Ctrl-C,', 'yellow', new_line = False) | |
pass | |
if self.print_log: log('quit interact mode', 'yellow') | |
if __name__ == '__main__': | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
vissza struct. csomag ('<H', i)