Skip to content

Instantly share code, notes, and snippets.

@curzona
Last active April 12, 2016 04:00
Show Gist options
  • Save curzona/7881941c222529e3058f2bf3bdd9a641 to your computer and use it in GitHub Desktop.
Save curzona/7881941c222529e3058f2bf3bdd9a641 to your computer and use it in GitHub Desktop.
curzona@curzona-desktop:~/Desktop/hello_pytest_xdist$ py.test -v -d --tx socket=xxx.xxx.xxx.xxx:8888 --rsyncdir=.
========================================== test session starts ===========================================
platform linux2 -- Python 2.7.9, pytest-2.9.0, py-1.4.31, pluggy-0.3.1 -- /usr/bin/python
cachedir: .cache
rootdir: /home/curzona/Desktop/hello_pytest_xdist, inifile: pytest.ini
plugins: repeater-0.1.0, cov-2.1.0, xdist-1.12, bdd-2.16.0
[gw0] win32 Python 2.7.11 cwd: C:\Users\temp\Desktop\pyexecnetcache
gw0 CINTERNALERROR> Traceback (most recent call last):
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/main.py", line 92, in wrap_session
INTERNALERROR> config.hook.pytest_sessionstart(session=session)
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/vendored_packages/pluggy.py", line 724, in __call__
INTERNALERROR> return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/vendored_packages/pluggy.py", line 338, in _hookexec
INTERNALERROR> return self._inner_hookexec(hook, methods, kwargs)
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/vendored_packages/pluggy.py", line 333, in <lambda>
INTERNALERROR> _MultiCall(methods, kwargs, hook.spec_opts).execute()
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/_pytest/vendored_packages/pluggy.py", line 596, in execute
INTERNALERROR> res = hook_impl.function(*args)
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/dsession.py", line 489, in pytest_sessionstart
INTERNALERROR> nodes = self.nodemanager.setup_nodes(putevent=self.queue.put)
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/slavemanage.py", line 45, in setup_nodes
INTERNALERROR> nodes.append(self.setup_node(spec, putevent))
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/slavemanage.py", line 51, in setup_node
INTERNALERROR> self.rsync_roots(gw)
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/slavemanage.py", line 37, in rsync_roots
INTERNALERROR> self.rsync(gateway, root, **self.rsyncoptions)
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/xdist/slavemanage.py", line 136, in rsync
INTERNALERROR> rsync.send()
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/execnet/rsync.py", line 127, in send
INTERNALERROR> self._end_of_channel(channel)
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/execnet/rsync.py", line 44, in _end_of_channel
INTERNALERROR> channel.waitclose()
INTERNALERROR> File "/usr/local/lib/python2.7/dist-packages/execnet/gateway_base.py", line 670, in waitclose
INTERNALERROR> raise error
INTERNALERROR> RemoteError: Traceback (most recent call last):
INTERNALERROR> File """"
INTERNALERROR> base execnet gateway code send to the other side for bootstrapping.
INTERNALERROR>
INTERNALERROR> NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython
INTERNALERROR>
INTERNALERROR> (C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others
INTERNALERROR> """
INTERNALERROR> from __future__ import with_statement
INTERNALERROR> import sys, os, weakref
INTERNALERROR> import traceback, struct
INTERNALERROR>
INTERNALERROR> # NOTE that we want to avoid try/except style importing
INTERNALERROR> # to avoid setting sys.exc_info() during import
INTERNALERROR> #
INTERNALERROR>
INTERNALERROR> ISPY3 = sys.version_info >= (3, 0)
INTERNALERROR> if ISPY3:
INTERNALERROR> from io import BytesIO
INTERNALERROR> exec("def do_exec(co, loc): exec(co, loc)\n"
INTERNALERROR> "def reraise(cls, val, tb): raise val\n")
INTERNALERROR> unicode = str
INTERNALERROR> _long_type = int
INTERNALERROR> from _thread import interrupt_main
INTERNALERROR> else:
INTERNALERROR> from StringIO import StringIO as BytesIO
INTERNALERROR> exec("def do_exec(co, loc): exec co in loc\n"
INTERNALERROR> "def reraise(cls, val, tb): raise cls, val, tb\n")
INTERNALERROR> bytes = str
INTERNALERROR> _long_type = long
INTERNALERROR> try:
INTERNALERROR> from thread import interrupt_main
INTERNALERROR> except ImportError:
INTERNALERROR> interrupt_main = None
INTERNALERROR>
INTERNALERROR> #f = open("/tmp/execnet-%s" % os.getpid(), "w")
INTERNALERROR> #def log_extra(*msg):
INTERNALERROR> # f.write(" ".join([str(x) for x in msg]) + "\n")
INTERNALERROR>
INTERNALERROR>
INTERNALERROR> class EmptySemaphore:
INTERNALERROR> acquire = release = lambda self: None
INTERNALERROR>
INTERNALERROR> def get_execmodel(backend):
INTERNALERROR> if hasattr(backend, "backend"):
INTERNALERROR> return backend
INTERNALERROR> if backend == "thread":
INTERNALERROR> importdef = {
INTERNALERROR> 'get_ident': ['thread::get_ident', '_thread::get_ident'],
INTERNALERROR> '_start_new_thread': ['thread::start_new_thread',
INTERNALERROR> '_thread::start_new_thread'],
INTERNALERROR> 'threading': ["threading",],
INTERNALERROR> 'queue': ["queue", "Queue"],
INTERNALERROR> 'sleep': ['time::sleep'],
INTERNALERROR> 'subprocess': ['subprocess'],
INTERNALERROR> 'socket': ['socket'],
INTERNALERROR> '_fdopen': ['os::fdopen'],
INTERNALERROR> '_lock': ['threading'],
INTERNALERROR> '_event': ['threading'],
INTERNALERROR> }
INTERNALERROR> def exec_start(self, func, args=()):
INTERNALERROR> self._start_new_thread(func, args)
INTERNALERROR>
INTERNALERROR> elif backend == "eventlet":
INTERNALERROR> importdef = {
INTERNALERROR> 'get_ident': ['eventlet.green.thread::get_ident'],
INTERNALERROR> '_spawn_n': ['eventlet::spawn_n'],
INTERNALERROR> 'threading': ['eventlet.green.threading'],
INTERNALERROR> 'queue': ["eventlet.queue"],
INTERNALERROR> 'sleep': ['eventlet::sleep'],
INTERNALERROR> 'subprocess': ['eventlet.green.subprocess'],
INTERNALERROR> 'socket': ['eventlet.green.socket'],
INTERNALERROR> '_fdopen': ['eventlet.green.os::fdopen'],
INTERNALERROR> '_lock': ['eventlet.green.threading'],
INTERNALERROR> '_event': ['eventlet.green.threading'],
INTERNALERROR> }
INTERNALERROR> def exec_start(self, func, args=()):
INTERNALERROR> self._spawn_n(func, *args)
INTERNALERROR> elif backend == "gevent":
INTERNALERROR> importdef = {
INTERNALERROR> 'get_ident': ['gevent.thread::get_ident'],
INTERNALERROR> '_spawn_n': ['gevent::spawn'],
INTERNALERROR> 'threading': ['threading'],
INTERNALERROR> 'queue': ["gevent.queue"],
INTERNALERROR> 'sleep': ['gevent::sleep'],
INTERNALERROR> 'subprocess': ['gevent.subprocess'],
INTERNALERROR> 'socket': ['gevent.socket'],
INTERNALERROR> # XXX
INTERNALERROR> '_fdopen': ['gevent.fileobject::FileObjectThread'],
INTERNALERROR> '_lock': ['gevent.lock'],
INTERNALERROR> '_event': ['gevent.event'],
INTERNALERROR> }
INTERNALERROR> def exec_start(self, func, args=()):
INTERNALERROR> self._spawn_n(func, *args)
INTERNALERROR> else:
INTERNALERROR> raise ValueError("unknown execmodel %r" %(backend,))
INTERNALERROR>
INTERNALERROR> class ExecModel:
INTERNALERROR> def __init__(self, name):
INTERNALERROR> self._importdef = importdef
INTERNALERROR> self.backend = name
INTERNALERROR> self._count = 0
INTERNALERROR>
INTERNALERROR> def __repr__(self):
INTERNALERROR> return "<ExecModel %r>" % self.backend
INTERNALERROR>
INTERNALERROR> def __getattr__(self, name):
INTERNALERROR> locs = self._importdef.get(name)
INTERNALERROR> if locs is None:
INTERNALERROR> raise AttributeError(name)
INTERNALERROR> for loc in locs:
INTERNALERROR> parts = loc.split("::")
INTERNALERROR> loc = parts.pop(0)
INTERNALERROR> try:
INTERNALERROR> mod = __import__(loc, None, None, "__doc__")
INTERNALERROR> except ImportError:
INTERNALERROR> pass
INTERNALERROR> else:
INTERNALERROR> if parts:
INTERNALERROR> mod = getattr(mod, parts[0])
INTERNALERROR> setattr(self, name, mod)
INTERNALERROR> return mod
INTERNALERROR> raise AttributeError(name)
INTERNALERROR>
INTERNALERROR> start = exec_start
INTERNALERROR>
INTERNALERROR> def fdopen(self, fd, mode, bufsize=1):
INTERNALERROR> return self._fdopen(fd, mode, bufsize)
INTERNALERROR>
INTERNALERROR> def WorkerPool(self, hasprimary=False):
INTERNALERROR> return WorkerPool(self, hasprimary=hasprimary)
INTERNALERROR>
INTERNALERROR> def Semaphore(self, size=None):
INTERNALERROR> if size is None:
INTERNALERROR> return EmptySemaphore()
INTERNALERROR> return self._lock.Semaphore(size)
INTERNALERROR>
INTERNALERROR> def Lock(self):
INTERNALERROR> return self._lock.RLock()
INTERNALERROR>
INTERNALERROR> def RLock(self):
INTERNALERROR> return self._lock.RLock()
INTERNALERROR>
INTERNALERROR> def Event(self):
INTERNALERROR> event = self._event.Event()
INTERNALERROR> if sys.version_info < (2,7):
INTERNALERROR> # patch wait function to return event state instead of None
INTERNALERROR> real_wait = event.wait
INTERNALERROR> def wait(timeout=None):
INTERNALERROR> real_wait(timeout=timeout)
INTERNALERROR> return event.isSet()
INTERNALERROR> event.wait = wait
INTERNALERROR> return event
INTERNALERROR>
INTERNALERROR> def PopenPiped(self, args):
INTERNALERROR> PIPE = self.subprocess.PIPE
INTERNALERROR> return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE)
INTERNALERROR>
INTERNALERROR>
INTERNALERROR> return ExecModel(backend)
INTERNALERROR>
INTERNALERROR>
INTERNALERROR> class Reply(object):
INTERNALERROR> """ reply instances provide access to the result
INTERNALERROR> of a function execution that got dispatched
INTERNALERROR> through WorkerPool.spawn()
INTERNALERROR> """
INTERNALERROR> def __init__(self, task, threadmodel):
INTERNALERROR> self.task = task
INTERNALERROR> self._result_ready = threadmodel.Event()
INTERNALERROR> self.running = True
INTERNALERROR>
INTERNALERROR> def get(self, timeout=None):
INTERNALERROR> """ get the result object from an asynchronous function execution.
INTERNALERROR> if the function execution raised an exception,
INTERNALERROR> then calling get() will reraise that exception
INTERNALERROR> including its traceback.
INTERNALERROR> """
INTERNALERROR> self.waitfinish(timeout)
INTERNALERROR> try:
INTERNALERROR> return self._result
INTERNALERROR> except AttributeError:
INTERNALERROR> reraise(*(self._excinfo[:3])) # noqa
INTERNALERROR>
INTERNALERROR> def waitfinish(self, timeout=None):
INTERNALERROR> if not self._result_ready.wait(timeout):
INTERNALERROR> raise IOError("timeout waiting for %r" %(self.task, ))
INTERNALERROR>
INTERNALERROR> def run(self):
INTERNALERROR> func, args, kwargs = self.task
INTERNALERROR> try:
INTERNALERROR> try:
INTERNALERROR> self._result = func(*args, **kwargs)
INTERNALERROR> except:
INTERNALERROR> self._excinfo = sys.exc_info()
INTERNALERROR> finally:
INTERNALERROR> self._result_ready.set()
INTERNALERROR> self.running = False
INTERNALERROR>
INTERNALERROR>
INTERNALERROR> class WorkerPool(object):
INTERNALERROR> """ A WorkerPool allows to spawn function executions
INTERNALERROR> to threads, returning a reply object on which you
INTERNALERROR> can ask for the result (and get exceptions reraised).
INTERNALERROR>
INTERNALERROR> This implementation allows the main thread to integrate
INTERNALERROR> itself into performing function execution through
INTERNALERROR> calling integrate_as_primary_thread() which will return
INTERNALERROR> when the pool received a trigger_shutdown().
INTERNALERROR> """
INTERNALERROR> def __init__(self, execmodel, hasprimary=False):
INTERNALERROR> """ by default allow unlimited number of spawns. """
INTERNALERROR> self.execmodel = execmodel
INTERNALERROR> self._running_lock = self.execmodel.Lock()
INTERNALERROR> self._running = set()
INTERNALERROR> self._shuttingdown = False
INTERNALERROR> self._waitall_events = []
INTERNALERROR> if hasprimary:
INTERNALERROR> if self.execmodel.backend != "thread":
INTERNALERROR> raise ValueError("hasprimary=True requires thread model")
INTERNALERROR> self._primary_thread_task_ready = self.execmodel.Event()
INTERNALERROR> else:
INTERNALERROR> self._primary_thread_task_ready = None
INTERNALERROR>
INTERNALERROR> def integrate_as_primary_thread(self):
INTERNALERROR> """ integrate the thread with which we are called as a primary
INTERNALERROR> thread for executing functions triggered with spawn().
INTERNALERROR> """
INTERNALERROR> assert self.execmodel.backend == "thread", self.execmodel
INTERNALERROR> primary_thread_task_ready = self._primary_thread_task_ready
INTERNALERROR> # interacts with code at REF1
INTERNALERROR> while 1:
INTERNALERROR> primary_thread_task_ready.wait()
INTERNALERROR> reply = self._primary_thread_task
INTERNALERROR> if reply is None: # trigger_shutdown() woke us up
INTERNALERROR> break
INTERNALERROR> self._perform_spawn(reply)
INTERNALERROR> # we are concurrent with trigger_shutdown and spawn
INTERNALERROR> with self._running_lock:
INTERNALERROR> if self._shuttingdown:
INTERNALERROR> break
INTERNALERROR> primary_thread_task_ready.clear()
INTERNALERROR>
INTERNALERROR> def trigger_shutdown(self):
INTERNALERROR> with self._running_lock:
INTERNALERROR> self._shuttingdown = True
INTERNALERROR> if self._primary_thread_task_ready is not None:
INTERNALERROR> self._primary_thread_task = None
INTERNALERROR> self._primary_thread_task_ready.set()
INTERNALERROR>
INTERNALERROR> def active_count(self):
INTERNALERROR> return len(self._running)
INTERNALERROR>
INTERNALERROR> def _perform_spawn(self, reply):
INTERNALERROR> reply.run()
INTERNALERROR> with self._running_lock:
INTERNALERROR> self._running.remove(reply)
INTERNALERROR> if not self._running:
INTERNALERROR> while self._waitall_events:
INTERNALERROR> waitall_event = self._waitall_events.pop()
INTERNALERROR> waitall_event.set()
INTERNALERROR>
INTERNALERROR> def _try_send_to_primary_thread(self, reply):
INTERNALERROR> # REF1 in 'thread' model we give priority to running in main thread
INTERNALERROR> # note that we should be called with _running_lock hold
INTERNALERROR> primary_thread_task_ready = self._primary_thread_task_ready
INTERNALERROR> if primary_thread_task_ready is not None:
INTERNALERROR> if not primary_thread_task_ready.isSet():
INTERNALERROR> self._primary_thread_task = reply
INTERNALERROR> # wake up primary thread
INTERNALERROR> primary_thread_task_ready.set()
INTERNALERROR> return True
INTERNALERROR> return False
INTERNALERROR>
INTERNALERROR> def spawn(self, func, *args, **kwargs):
INTERNALERROR> """ return Reply object for the asynchronous dispatch
INTERNALERROR> of the given func(*args, **kwargs).
INTERNALERROR> """
INTERNALERROR> reply = Reply((func, args, kwargs), self.execmodel)
INTERNALERROR> with self._running_lock:
INTERNALERROR> if self._shuttingdown:
INTERNALERROR> raise ValueError("pool is shutting down")
INTERNALERROR> self._running.add(reply)
INTERNALERROR> if not self._try_send_to_primary_thread(reply):
INTERNALERROR> self.execmodel.start(self._perform_spawn, (reply,))
INTERNALERROR> return reply
INTERNALERROR>
INTERNALERROR> def terminate(self, timeout=None):
INTERNALERROR> """ trigger shutdown and wait for completion of all executions. """
INTERNALERROR> self.trigger_shutdown()
INTERNALERROR> return self.waitall(timeout=timeout)
INTERNALERROR>
INTERNALERROR> def waitall(self, timeout=None):
INTERNALERROR> """ wait until all active spawns have finished executing. """
INTERNALERROR> with self._running_lock:
INTERNALERROR> if not self._running:
INTERNALERROR> return True
INTERNALERROR> # if a Reply still runs, we let run_and_release
INTERNALERROR> # signal us -- note that we are still holding the
INTERNALERROR> # _running_lock to avoid race conditions
INTERNALERROR> my_waitall_event = self.execmodel.Event()
INTERNALERROR> self._waitall_events.append(my_waitall_event)
INTERNALERROR> return my_waitall_event.wait(timeout=timeout)
INTERNALERROR>
INTERNALERROR>
INTERNALERROR> sysex = (KeyboardInterrupt, SystemExit)
INTERNALERROR>
INTERNALERROR>
INTERNALERROR> DEBUG = os.environ.get('EXECNET_DEBUG')
INTERNALERROR> pid = os.getpid()
INTERNALERROR> if DEBUG == '2':
INTERNALERROR> def trace(*msg):
INTERNALERROR> try:
INTERNALERROR> line = " ".join(map(str, msg))
INTERNALERROR> sys.stderr.write("[%s] %s\n" % (pid, line))
INTERNALERROR> sys.stderr.flush()
INTERNALERROR> except Exception:
INTERNALERROR> pass # nothing we can do, likely interpreter-shutdown
INTERNALERROR> elif DEBUG:
INTERNALERROR> import tempfile, os.path
INTERNALERROR> fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid)
INTERNALERROR> #sys.stderr.write("execnet-debug at %r" %(fn,))
INTERNALERROR> debugfile = open(fn, 'w')
INTERNALERROR> def trace(*msg):
INTERNALERROR> try:
INTERNALERROR> line = " ".join(map(str, msg))
INTERNALERROR> debugfile.write(line + "\n")
INTERNALERROR> debugfile.flush()
INTERNALERROR> except Exception:
INTERNALERROR> try:
INTERNALERROR> v = sys.exc_info()[1]
INTERNALERROR> sys.stderr.write(
INTERNALERROR> "[%s] exception during tracing: %r\n" % (pid, v))
INTERNALERROR> except Exception:
INTERNALERROR> pass # nothing we can do, likely interpreter-shutdown
INTERNALERROR> else:
INTERNALERROR> notrace = trace = lambda *msg: None
INTERNALERROR>
INTERNALERROR> class Popen2IO:
INTERNALERROR> error = (IOError, OSError, EOFError)
INTERNALERROR>
INTERNALERROR> def __init__(self, outfile, infile, execmodel):
INTERNALERROR> # we need raw byte streams
INTERNALERROR> self.outfile, self.infile = outfile, infile
INTERNALERROR> if sys.platform == "win32":
INTERNALERROR> import msvcrt
INTERNALERROR> try:
INTERNALERROR> msvcrt.setmode(infile.fileno(), os.O_BINARY)
INTERNALERROR> msvcrt.setmode(outfile.fileno(), os.O_BINARY)
INTERNALERROR> except (AttributeError, IOError):
INTERNALERROR> pass
INTERNALERROR> self._read = getattr(infile, "buffer", infile).read
INTERNALERROR> self._write = getattr(outfile, "buffer", outfile).write
INTERNALERROR> self.execmodel = execmodel
INTERNALERROR>
INTERNALERROR> def read(self, numbytes):
INTERNALERROR> """Read exactly 'numbytes' bytes from the pipe. """
INTERNALERROR> # a file in non-blocking mode may return less bytes, so we loop
INTERNALERROR> buf = bytes()
INTERNALERROR> while numbytes > len(buf):
INTERNALERROR> data = self._read(numbytes-len(buf))
INTERNALERROR> if not data:
INTERNALERROR> raise EOFError("expected %d bytes, got %d" %(numbytes, len(buf)))
INTERNALERROR> buf += data
INTERNALERROR> return buf
INTERNALERROR>
INTERNALERROR> def write(self, data):
INTERNALERROR> """write out all data bytes. """
INTERNALERROR> assert isinstance(data, bytes)
INTERNALERROR> self._write(data)
INTERNALERROR> self.outfile.flush()
INTERNALERROR>
INTERNALERROR> def close_read(self):
INTERNALERROR> self.infile.close()
INTERNALERROR>
INTERNALERROR> def close_write(self):
INTERNALERROR> self.outfile.close()
INTERNALERROR>
INTERNALERROR> class Message:
INTERNALERROR> """ encapsulates Messages and their wire protocol. """
INTERNALERROR> _types = []
INTERNALERROR>
INTERNALERROR> def __init__(self, msgcode, channelid=0, data=''):
INTERNALERROR> self.msgcode = msgcode
INTERNALERROR> self.channelid = channelid
INTERNALERROR> self.data = data
INTERNALERROR>
INTERNALERROR> @staticmethod
INTERNALERROR> def from_io(io):
INTERNALERROR> try:
INTERNALERROR> header = io.read(9) # type 1, channel 4, payload 4
INTERNALERROR> if not header:
INTERNALERROR> raise EOFError("empty read")
INTERNALERROR> except EOFError:
INTERNALERROR> e = sys.exc_info()[1]
INTERNALERROR> raise EOFError('couldnt load message header, ' + e.args[0])
INTERNALERROR> msgtype, channel, payload = struct.unpack('!bii', header)
INTERNALERROR> return Message(msgtype, channel, io.read(payload))
INTERNALERROR>
INTERNALERROR> def to_io(self, io):
INTERNALERROR> header = struct.pack('!bii', self.msgcode, self.channelid,
INTERNALERROR> len(self.data))
INTERNALERROR> io.write(header+self.data)
INTERNALERROR>
INTERNALERROR> def received(self, gateway):
INTERNALERROR> self._types[self.msgcode](self, gateway)
INTERNALERROR>
INTERNALERROR> def __repr__(self):
INTERNALERROR> name = self._types[self.msgcode].__name__.upper()
INTERNALERROR> return "<Message %s channel=%s lendata=%s>" %(
INTERNALERROR> name, self.channelid, len(self.data))
INTERNALERROR>
INTERNALERROR> class GatewayReceivedTerminate(Exception):
INTERNALERROR> """ Receiverthread got termination message. """
INTERNALERROR>
INTERNALERROR> def _setupmessages():
INTERNALERROR> def status(message, gateway):
INTERNALERROR> # we use the channelid to send back information
INTERNALERROR> # but don't instantiate a channel object
INTERNALERROR> d = {'numchannels': len(gateway._channelfactory._channels),
INTERNALERROR> 'numexecuting': gateway._execpool.active_count(),
INTERNALERROR> 'execmodel': gateway.execmodel.backend,
INTERNALERROR> }
INTERNALERROR> gateway._send(Message.CHANNEL_DATA, message.channelid,
INTERNALERROR> dumps_internal(d))
INTERNALERROR> gateway._send(Message.CHANNEL_CLOSE, message.channelid)
INTERNALERROR>
INTERNALERROR> def channel_exec(message, gateway):
INTERNALERROR> channel = gateway._channelfactory.new(message.channelid)
INTERNALERROR> gateway._local_schedulexec(channel=channel,sourcetask=message.data)
INTERNALERROR>
INTERNALERROR> def channel_data(message, gateway):
INTERNALERROR> gateway._channelfactory._local_receive(message.channelid, message.data)
INTERNALERROR>
INTERNALERROR> def channel_close(message, gateway):
INTERNALERROR> gateway._channelfactory._local_close(message.channelid)
INTERNALERROR>
INTERNALERROR> def channel_close_error(message, gateway):
INTERNALERROR> remote_error = RemoteError(loads_internal(message.data))
INTERNALERROR> gateway._channelfactory._local_close(message.channelid, remote_error)
INTERNALERROR>
INTERNALERROR> def channel_last_message(message, gateway):
INTERNALERROR> gateway._channelfactory._local_close(message.channelid, sendonly=True)
INTERNALERROR>
INTERNALERROR> def gateway_terminate(message, gateway):
INTERNALERROR> raise GatewayReceivedTerminate(gateway)
INTERNALERROR>
INTERNALERROR> def reconfigure(message, gateway):
INTERNALERROR> if message.channelid == 0:
INTERNALERROR> target = gateway
INTERNALERROR> else:
INTERNALERROR> target = gateway._channelfactory.new(message.channelid)
INTERNALERROR> target._strconfig = loads_internal(message.data, gateway)
INTERNALERROR>
INTERNALERROR> types = [
INTERNALERROR> status, reconfigure, gateway_terminate,
INTERNALERROR> channel_exec, channel_data, channel_close,
INTERNALERROR> channel_close_error, channel_last_message,
INTERNALERROR> ]
INTERNALERROR> for i, handler in enumerate(types):
INTERNALERROR> Message._types.append(handler)
INTERNALERROR> setattr(Message, handler.__name__.upper(), i)
INTERNALERROR>
INTERNALERROR> _setupmessages()
INTERNALERROR>
INTERNALERROR> def geterrortext(excinfo,
INTERNALERROR> format_exception=traceback.format_exception, sysex=sysex):
INTERNALERROR> try:
INTERNALERROR> l = format_exception(*excinfo)
INTERNALERROR> errortext = "".join(l)
INTERNALERROR> except sysex:
INTERNALERROR> raise
INTERNALERROR> except:
INTERNALERROR> errortext = '%s: %s' % (excinfo[0].__name__,
INTERNALERROR> excinfo[1])
INTERNALERROR> return errortext
INTERNALERROR>
INTERNALERROR> class RemoteError(Exception):
INTERNALERROR> """ Exception containing a stringified error from the other side. """
INTERNALERROR> def __init__(self, formatted):
INTERNALERROR> self.formatted = formatted
INTERNALERROR> Exception.__init__(self)
INTERNALERROR>
INTERNALERROR> def __str__(self):
INTERNALERROR> return self.formatted
INTERNALERROR>
INTERNALERROR> def __repr__(self):
INTERNALERROR> return "%s: %s" %(self.__class__.__name__, self.formatted)
INTERNALERROR>
INTERNALERROR> def warn(self):
INTERNALERROR> if self.formatted != INTERRUPT_TEXT:
INTERNALERROR> # XXX do this better
INTERNALERROR> sys.stderr.write("[%s] Warning: unhandled %r\n"
INTERNALERROR> % (os.getpid(), self,))
INTERNALERROR>
INTERNALERROR> class TimeoutError(IOError):
INTERNALERROR> """ Exception indicating that a timeout was reached. """
INTERNALERROR>
INTERNALERROR>
INTERNALERROR> NO_ENDMARKER_WANTED = object()
INTERNALERROR>
INTERNALERROR> class Channel(object):
INTERNALERROR> """Communication channel between two Python Interpreter execution points."""
INTERNALERROR> RemoteError = RemoteError
INTERNALERROR> TimeoutError = TimeoutError
INTERNALERROR> _INTERNALWAKEUP = 1000
INTERNALERROR> _executing = False
INTERNALERROR>
INTERNALERROR> def __init__(self, gateway, id):
INTERNALERROR> assert isinstance(id, int)
INTERNALERROR> self.gateway = gateway
INTERNALERROR> #XXX: defaults copied from Unserializer
INTERNALERROR> self._strconfig = getattr(gateway, '_strconfig', (True, False))
INTERNALERROR> self.id = id
INTERNALERROR> self._items = self.gateway.execmodel.queue.Queue()
INTERNALERROR> self._closed = False
INTERNALERROR> self._receiveclosed = self.gateway.execmodel.Event()
INTERNALERROR> self._remoteerrors = []
INTERNALERROR>
INTERNALERROR> def _trace(self, *msg):
INTERNALERROR> self.gateway._trace(self.id, *msg)
INTERNALERROR>
INTERNALERROR> def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
INTERNALERROR> """ set a callback function for receiving items.
INTERNALERROR>
INTERNALERROR> All already queued items will immediately trigger the callback.
INTERNALERROR> Afterwards the callback will execute in the receiver thread
INTERNALERROR> for each received data item and calls to ``receive()`` will
INTERNALERROR> raise an error.
INTERNALERROR> If an endmarker is specified the callback will eventually
INTERNALERROR> be called with the endmarker when the channel closes.
INTERNALERROR> """
INTERNALERROR> _callbacks = self.gateway._channelfactory._callbacks
INTERNALERROR> with self.gateway._receivelock:
INTERNALERROR> if self._items is None:
INTERNALERROR> raise IOError("%r has callback already registered" %(self,))
INTERNALERROR> items = self._items
INTERNALERROR> self._items = None
INTERNALERROR> while 1:
INTERNALERROR> try:
INTERNALERROR> olditem = items.get(block=False)
INTERNALERROR> except self.gateway.execmodel.queue.Empty:
INTERNALERROR> if not (self._closed or self._receiveclosed.isSet()):
INTERNALERROR> _callbacks[self.id] = (
INTERNALERROR> callback,
INTERNALERROR> endmarker,
INTERNALERROR> self._strconfig,
INTERNALERROR> )
INTERNALERROR> break
INTERNALERROR> else:
INTERNALERROR> if olditem is ENDMARKER:
INTERNALERROR> items.put(olditem) # for other receivers
INTERNALERROR> if endmarker is not NO_ENDMARKER_WANTED:
INTERNALERROR> callback(endmarker)
INTERNALERROR> break
INTERNALERROR> else:
INTERNALERROR> callback(olditem)
INTERNALERROR>
INTERNALERROR> def __repr__(self):
INTERNALERROR> flag = self.isclosed() and "closed" or "open"
INTERNALERROR> return "<Channel id=%d %s>" % (self.id, flag)
INTERNALERROR>
INTERNALERROR> def __del__(self):
INTERNALERROR> if self.gateway is None: # can be None in tests
INTERNALERROR> return
INTERNALERROR> self._trace("channel.__del__")
INTERNALERROR> # no multithreading issues here, because we have the last ref to 'self'
INTERNALERROR> if self._closed:
INTERNALERROR> # state transition "closed" --> "deleted"
INTERNALERROR> for error in self._remoteerrors:
INTERNALERROR> error.warn()
INTERNALERROR> elif self._receiveclosed.isSet():
INTERNALERROR> # state transition "sendonly" --> "deleted"
INTERNALERROR> # the remote channel is already in "deleted" state, nothing to do
INTERNALERROR> pass
INTERNALERROR> else:
INTERNALERROR> # state transition "opened" --> "deleted"
INTERNALERROR> # check if we are in the middle of interpreter shutdown
INTERNALERROR> # in which case the process will go away and we probably
INTERNALERROR> # don't need to try to send a closing or last message
INTERNALERROR> # (and often it won't work anymore to send things out)
INTERNALERROR> if Message is not None:
INTERNALERROR> if self._items is None: # has_callback
INTERNALERROR> msgcode = Message.CHANNEL_LAST_MESSAGE
INTERNALERROR> else:
INTERNALERROR> msgcode = Message.CHANNEL_CLOSE
INTERNALERROR> try:
INTERNALERROR> self.gateway._send(msgcode, self.id)
INTERNALERROR> except (IOError, ValueError): # ignore problems with sending
INTERNALERROR> pass
INTERNALERROR>
INTERNALERROR> def _getremoteerror(self):
INTERNALERROR> try:
INTERNALERROR> return self._remoteerrors.pop(0)
INTERNALERROR> except IndexError:
INTERNALERROR> try:
INTERNALERROR> return self.gateway._error
INTERNALERROR> except AttributeError:
INTERNALERROR> pass
INTERNALERROR> return None
INTERNALERROR>
INTERNALERROR> #
INTERNALERROR> # public API for channel objects
INTERNALERROR> #
INTERNALERROR> def isclosed(self):
INTERNALERROR> """ return True if the channel is closed. A closed
INTERNALERROR> channel may still hold items.
INTERNALERROR> """
INTERNALERROR> return self._closed
INTERNALERROR>
INTERNALERROR> def makefile(self, mode='w', proxyclose=False):
INTERNALERROR> """ return a file-like object.
INTERNALERROR> mode can be 'w' or 'r' for writeable/readable files.
INTERNALERROR> if proxyclose is true file.close() will also close the channel.
INTERNALERROR> """
INTERNALERROR> if mode == "w":
INTERNALERROR> return ChannelFileWrite(channel=self, proxyclose=proxyclose)
INTERNALERROR> elif mode == "r":
INTERNALERROR> return ChannelFileRead(channel=self, proxyclose=proxyclose)
INTERNALERROR> raise ValueError("mode %r not availabe" %(mode,))
INTERNALERROR>
INTERNALERROR> def close(self, error=None):
INTERNALERROR> """ close down this channel with an optional error message.
INTERNALERROR> Note that closing of a channel tied to remote_exec happens
INTERNALERROR> automatically at the end of execution and cannot
INTERNALERROR> be done explicitely.
INTERNALERROR> """
INTERNALERROR> if self._executing:
INTERNALERROR> raise IOError("cannot explicitly close channel within remote_exec")
INTERNALERROR> if self._closed:
INTERNALERROR> self.gateway._trace(self, "ignoring redundant call to close()")
INTERNALERROR> if not self._closed:
INTERNALERROR> # state transition "opened/sendonly" --> "closed"
INTERNALERROR> # threads warning: the channel might be closed under our feet,
INTERNALERROR> # but it's never damaging to send too many CHANNEL_CLOSE messages
INTERNALERROR> # however, if the other side triggered a close already, we
INTERNALERROR> # do not send back a closed message.
INTERNALERROR> if not self._receiveclosed.isSet():
INTERNALERROR> put = self.gateway._send
INTERNALERROR> if error is not None:
INTERNALERROR> put(Message.CHANNEL_CLOSE_ERROR, self.id,
INTERNALERROR> dumps_internal(error))
INTERNALERROR> else:
INTERNALERROR> put(Message.CHANNEL_CLOSE, self.id)
INTERNALERROR> self._trace("sent channel close message")
INTERNALERROR> if isinstance(error, RemoteError):
INTERNALERROR> self._remoteerrors.append(error)
INTERNALERROR> self._closed = True # --> "closed"
INTERNALERROR> self._receiveclosed.set()
INTERNALERROR> queue = self._items
INTERNALERROR> if queue is not None:
INTERNALERROR> queue.put(ENDMARKER)
INTERNALERROR> self.gateway._channelfactory._no_longer_opened(self.id)
INTERNALERROR>
INTERNALERROR> def waitclose(self, timeout=None):
INTERNALERROR> """ wait until this channel is closed (or the remote side
INTERNALERROR> otherwise signalled that no more data was being sent).
INTERNALERROR> The channel may still hold receiveable items, but not receive
INTERNALERROR> any more after waitclose() has returned. Exceptions from executing
INTERNALERROR> code on the other side are reraised as local channel.RemoteErrors.
INTERNALERROR> EOFError is raised if the reading-connection was prematurely closed,
INTERNALERROR> which often indicates a dying process.
INTERNALERROR> self.TimeoutError is raised after the specified number of seconds
INTERNALERROR> (default is None, i.e. wait indefinitely).
INTERNALERROR> """
INTERNALERROR> self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state
INTERNALERROR> if not self._receiveclosed.isSet():
INTERNALERROR> raise self.TimeoutError("Timeout after %r seconds" % timeout)
INTERNALERROR> error = self._getremoteerror()
INTERNALERROR> if error:
INTERNALERROR> raise error
INTERNALERROR>
INTERNALERROR> def send(self, item):
INTERNALERROR> """sends the given item to the other side of the channel,
INTERNALERROR> possibly blocking if the sender queue is full.
INTERNALERROR> The item must be a simple python type and will be
INTERNALERROR> copied to the other side by value. IOError is
INTERNALERROR> raised if the write pipe was prematurely closed.
INTERNALERROR> """
INTERNALERROR> if self.isclosed():
INTERNALERROR> raise IOError("cannot send to %r" %(self,))
INTERNALERROR> self.gateway._send(Message.CHANNEL_DATA, self.id, dumps_internal(item))
INTERNALERROR>
INTERNALERROR> def receive(self, timeout=None):
INTERNALERROR> """receive a data item that was sent from the other side.
INTERNALERROR> timeout: None [default] blocked waiting. A positive number
INTERNALERROR> indicates the number of seconds after which a channel.TimeoutError
INTERNALERROR> exception will be raised if no item was received.
INTERNALERROR> Note that exceptions from the remotely executing code will be
INTERNALERROR> reraised as channel.RemoteError exceptions containing
INTERNALERROR> a textual representation of the remote traceback.
INTERNALERROR> """
INTERNALERROR> itemqueue = self._items
INTERNALERROR> if itemqueue is None:
INTERNALERROR> raise IOError("cannot receive(), channel has receiver callback")
INTERNALERROR> try:
INTERNALERROR> x = itemqueue.get(timeout=timeout)
INTERNALERROR> except self.gateway.execmodel.queue.Empty:
INTERNALERROR> raise self.TimeoutError("no item after %r seconds" %(timeout))
INTERNALERROR> if x is ENDMARKER:
INTERNALERROR> itemqueue.put(x) # for other receivers
INTERNALERROR> raise self._getremoteerror() or EOFError()
INTERNALERROR> else:
INTERNALERROR> return x
INTERNALERROR>
INTERNALERROR> def __iter__(self):
INTERNALERROR> return self
INTERNALERROR>
INTERNALERROR> def next(self):
INTERNALERROR> try:
INTERNALERROR> return self.receive()
INTERNALERROR> except EOFError:
INTERNALERROR> raise StopIteration
INTERNALERROR> __next__ = next
INTERNALERROR>
INTERNALERROR>
INTERNALERROR> def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False):
INTERNALERROR> """
INTERNALERROR> set the string coercion for this channel
INTERNALERROR> the default is to try to convert py2 str as py3 str,
INTERNALERROR> but not to try and convert py3 str to py2 str
INTERNALERROR> """
INTERNALERROR> self._strconfig = (py2str_as_py3str, py3str_as_py2str)
INTERNALERROR> data = dumps_internal(self._strconfig)
INTERNALERROR> self.gateway._send(Message.RECONFIGURE, self.id, data=data)
INTERNALERROR>
INTERNALERROR> ENDMARKER = object()
INTERNALERROR> INTERRUPT_TEXT = "keyboard-interrupted"
INTERNALERROR>
INTERNALERROR> class ChannelFactory(object):
INTERNALERROR> def __init__(self, gateway, startcount=1):
INTERNALERROR> self._channels = weakref.WeakValueDictionary()
INTERNALERROR> self._callbacks = {}
INTERNALERROR> self._writelock = gateway.execmodel.Lock()
INTERNALERROR> self.gateway = gateway
INTERNALERROR> self.count = startcount
INTERNALERROR> self.finished = False
INTERNALERROR> self._list = list # needed during interp-shutdown
INTERNALERROR>
INTERNALERROR> def new(self, id=None):
INTERNALERROR> """ create a new Channel with 'id' (or create new id if None). """
INTERNALERROR> with self._writelock:
INTERNALERROR> if self.finished:
INTERNALERROR> raise IOError("connexion already closed: %s" % (self.gateway,))
INTERNALERROR> if id is None:
INTERNALERROR> id = self.count
INTERNALERROR> self.count += 2
INTERNALERROR> try:
INTERNALERROR> channel = self._channels[id]
INTERNALERROR> except KeyError:
INTERNALERROR> channel = self._channels[id] = Channel(self.gateway, id)
INTERNALERROR> return channel
INTERNALERROR>
INTERNALERROR> def channels(self):
INTERNALERROR> return self._list(self._channels.values())
INTERNALERROR>
INTERNALERROR> #
INTERNALERROR> # internal methods, called from the receiver thread
INTERNALERROR> #
INTERNALERROR> def _no_longer_opened(self, id):
INTERNALERROR> try:
INTERNALERROR> del self._channels[id]
INTERNALERROR> except KeyError:
INTERNALERROR> pass
INTERNALERROR> try:
INTERNALERROR> callback, endmarker, strconfig = self._callbacks.pop(id)
INTERNALERROR> except KeyError:
INTERNALERROR> pass
INTERNALERROR> else:
INTERNALERROR> if endmarker is not NO_ENDMARKER_WANTED:
INTERNALERROR> callback(endmarker)
INTERNALERROR>
INTERNALERROR> def _local_close(self, id, remoteerror=None, sendonly=False):
INTERNALERROR> channel = self._channels.get(id)
INTERNALERROR> if channel is None:
INTERNALERROR> # channel already in "deleted" state
INTERNALERROR> if remoteerror:
INTERNALERROR> remoteerror.warn()
INTERNALERROR> self._no_longer_opened(id)
INTERNALERROR> else:
INTERNALERROR> # state transition to "closed" state
INTERNALERROR> if remoteerror:
INTERNALERROR> channel._remoteerrors.append(remoteerror)
INTERNALERROR> queue = channel._items
INTERNALERROR> if queue is not None:
INTERNALERROR> queue.put(ENDMARKER)
INTERNALERROR> self._no_longer_opened(id)
INTERNALERROR> if not sendonly: # otherwise #--> "sendonly"
INTERNALERROR> channel._closed = True # --> "closed"
INTERNALERROR> channel._receiveclosed.set()
INTERNALERROR>
INTERNALERROR> def _local_receive(self, id, data):
INTERNALERROR> # executes in receiver thread
INTERNALERROR> channel = self._channels.get(id)
INTERNALERROR> try:
INTERNALERROR> callback, endmarker, strconfig = self._callbacks[id]
INTERNALERROR> except KeyError:
INTERNALERROR> queue = channel and channel._items
INTERNALERROR> if queue is None:
INTERNALERROR> pass # drop data
INTERNALERROR> else:
INTERNALERROR> item = loads_internal(data, channel)
INTERNALERROR> queue.put(item)
INTERNALERROR> else:
INTERNALERROR> try:
INTERNALERROR> data = loads_internal(data, channel, strconfig)
INTERNALERROR> callback(data) # even if channel may be already closed
INTERNALERROR> except Exception:
INTERNALERROR> excinfo = sys.exc_info()
INTERNALERROR> self.gateway._trace("exception during callback: %s" %
INTERNALERROR> excinfo[1])
INTERNALERROR> errortext = self.gateway._geterrortext(excinfo)
INTERNALERROR> self.gateway._send(Message.CHANNEL_CLOSE_ERROR,
INTERNALERROR> id, dumps_internal(errortext))
INTERNALERROR> self._local_close(id, errortext)
INTERNALERROR>
INTERNALERROR> def _finished_receiving(self):
INTERNALERROR> with self._writelock:
INTERNALERROR> self.finished = True
INTERNALERROR> for id in self._list(self._channels):
INTERNALERROR> self._local_close(id, sendonly=True)
INTERNALERROR> for id in self._list(self._callbacks):
INTERNALERROR> self._no_longer_opened(id)
INTERNALERROR>
INTERNALERROR> class ChannelFile(object):
INTERNALERROR> def __init__(self, channel, proxyclose=True):
INTERNALERROR> self.channel = channel
INTERNALERROR> self._proxyclose = proxyclose
INTERNALERROR>
INTERNALERROR> def isatty(self):
INTERNALERROR> return False
INTERNALERROR>
INTERNALERROR> def close(self):
INTERNALERROR> if self._proxyclose:
INTERNALERROR> self.channel.close()
INTERNALERROR>
INTERNALERROR> def __repr__(self):
INTERNALERROR> state = self.channel.isclosed() and 'closed' or 'open'
INTERNALERROR> return '<ChannelFile %d %s>' %(self.channel.id, state)
INTERNALERROR>
INTERNALERROR> class ChannelFileWrite(ChannelFile):
INTERNALERROR> def write(self, out):
INTERNALERROR> self.channel.send(out)
INTERNALERROR>
INTERNALERROR> def flush(self):
INTERNALERROR> pass
INTERNALERROR>
INTERNALERROR> class ChannelFileRead(ChannelFile):
INTERNALERROR> def __init__(self, channel, proxyclose=True):
INTERNALERROR> super(ChannelFileRead, self).__init__(channel, proxyclose)
INTERNALERROR> self._buffer = None
INTERNALERROR>
INTERNALERROR> def read(self, n):
INTERNALERROR> try:
INTERNALERROR> if self._buffer is None:
INTERNALERROR> self._buffer = self.channel.receive()
INTERNALERROR> while len(self._buffer) < n:
INTERNALERROR> self._buffer += self.channel.receive()
INTERNALERROR> except EOFError:
INTERNALERROR> self.close()
INTERNALERROR> if self._buffer is None:
INTERNALERROR> ret = ""
INTERNALERROR> else:
INTERNALERROR> ret = self._buffer[:n]
INTERNALERROR> self._buffer = self._buffer[n:]
INTERNALERROR> return ret
INTERNALERROR>
INTERNALERROR> def readline(self):
INTERNALERROR> if self._buffer is not None:
INTERNALERROR> i = self._buffer.find("\n")
INTERNALERROR> if i != -1:
INTERNALERROR> return self.read(i+1)
INTERNALERROR> line = self.read(len(self._buffer)+1)
INTERNALERROR> else:
INTERNALERROR> line = self.read(1)
INTERNALERROR> while line and line[-1] != "\n":
INTERNALERROR> c = self.read(1)
INTERNALERROR> if not c:
INTERNALERROR> break
INTERNALERROR> line += c
INTERNALERROR> return line
INTERNALERROR>
INTERNALERROR> class BaseGateway(object):
INTERNALERROR> exc_info = sys.exc_info
INTERNALERROR> _sysex = sysex
INTERNALERROR> id = "<slave>"
INTERNALERROR>
INTERNALERROR> def __init__(self, io, id, _startcount=2):
INTERNALERROR> self.execmodel = io.execmodel
INTERNALERROR> self._io = io
INTERNALERROR> self.id = id
INTERNALERROR> self._strconfig = (Unserializer.py2str_as_py3str,
INTERNALERROR> Unserializer.py3str_as_py2str)
INTERNALERROR> self._channelfactory = ChannelFactory(self, _startcount)
INTERNALERROR> self._receivelock = self.execmodel.RLock()
INTERNALERROR> # globals may be NONE at process-termination
INTERNALERROR> self.__trace = trace
INTERNALERROR> self._geterrortext = geterrortext
INTERNALERROR> self._receivepool = self.execmodel.WorkerPool()
INTERNALERROR>
INTERNALERROR> def _trace(self, *msg):
INTERNALERROR> self.__trace(self.id, *msg)
INTERNALERROR>
INTERNALERROR> def _initreceive(self):
INTERNALERROR> self._receivepool.spawn(self._thread_receiver)
INTERNALERROR>
INTERNALERROR> def _thread_receiver(self):
INTERNALERROR> def log(*msg):
INTERNALERROR> self._trace("[receiver-thread]", *msg)
INTERNALERROR>
INTERNALERROR> log("RECEIVERTHREAD: starting to run")
INTERNALERROR> io = self._io
INTERNALERROR> try:
INTERNALERROR> while 1:
INTERNALERROR> msg = Message.from_io(io)
INTERNALERROR> log("received", msg)
INTERNALERROR> with self._receivelock:
INTERNALERROR> msg.received(self)
INTERNALERROR> del msg
INTERNALERROR> except (KeyboardInterrupt, GatewayReceivedTerminate):
INTERNALERROR> pass
INTERNALERROR> except EOFError:
INTERNALERROR> log("EOF without prior gateway termination message")
INTERNALERROR> self._error = self.exc_info()[1]
INTERNALERROR> except Exception:
INTERNALERROR> log(self._geterrortext(self.exc_info()))
INTERNALERROR> log('finishing receiving thread')
INTERNALERROR> # wake up and terminate any execution waiting to receive
INTERNALERROR> self._channelfactory._finished_receiving()
INTERNALERROR> log('terminating execution')
INTERNALERROR> self._terminate_execution()
INTERNALERROR> log('closing read')
INTERNALERROR> self._io.close_read()
INTERNALERROR> log('closing write')
INTERNALERROR> self._io.close_write()
INTERNALERROR> log('terminating our receive pseudo pool')
INTERNALERROR> self._receivepool.trigger_shutdown()
INTERNALERROR>
INTERNALERROR> def _terminate_execution(self):
INTERNALERROR> pass
INTERNALERROR>
INTERNALERROR> def _send(self, msgcode, channelid=0, data=bytes()):
INTERNALERROR> message = Message(msgcode, channelid, data)
INTERNALERROR> try:
INTERNALERROR> message.to_io(self._io)
INTERNALERROR> self._trace('sent', message)
INTERNALERROR> except (IOError, ValueError):
INTERNALERROR> e = sys.exc_info()[1]
INTERNALERROR> self._trace('failed to send', message, e)
INTERNALERROR> # ValueError might be because the IO is already closed
INTERNALERROR> raise IOError("cannot send (already closed?)")
INTERNALERROR>
INTERNALERROR> def _local_schedulexec(self, channel, sourcetask):
INTERNALERROR> channel.close("execution disallowed")
INTERNALERROR>
INTERNALERROR> # _____________________________________________________________________
INTERNALERROR> #
INTERNALERROR> # High Level Interface
INTERNALERROR> # _____________________________________________________________________
INTERNALERROR> #
INTERNALERROR> def newchannel(self):
INTERNALERROR> """ return a new independent channel. """
INTERNALERROR> return self._channelfactory.new()
INTERNALERROR>
INTERNALERROR> def join(self, timeout=None):
INTERNALERROR> """ Wait for receiverthread to terminate. """
INTERNALERROR> self._trace("waiting for receiver thread to finish")
INTERNALERROR> self._receivepool.waitall()
INTERNALERROR>
INTERNALERROR> class SlaveGateway(BaseGateway):
INTERNALERROR>
INTERNALERROR> def _local_schedulexec(self, channel, sourcetask):
INTERNALERROR> sourcetask = loads_internal(sourcetask)
INTERNALERROR> self._execpool.spawn(self.executetask, ((channel, sourcetask)))
INTERNALERROR>
INTERNALERROR> def _terminate_execution(self):
INTERNALERROR> # called from receiverthread
INTERNALERROR> self._trace("shutting down execution pool")
INTERNALERROR> self._execpool.trigger_shutdown()
INTERNALERROR> if not self._execpool.waitall(5.0):
INTERNALERROR> self._trace("execution ongoing after 5 secs, trying interrupt_main")
INTERNALERROR> # We try hard to terminate execution based on the assumption
INTERNALERROR> # that there is only one gateway object running per-process.
INTERNALERROR> if sys.platform != "win32":
INTERNALERROR> self._trace("sending ourselves a SIGINT")
INTERNALERROR> os.kill(os.getpid(), 2) # send ourselves a SIGINT
INTERNALERROR> elif interrupt_main is not None:
INTERNALERROR> self._trace("calling interrupt_main()")
INTERNALERROR> interrupt_main()
INTERNALERROR> if not self._execpool.waitall(10.0):
INTERNALERROR> self._trace("execution did not finish in another 10 secs, "
INTERNALERROR> "calling os._exit()")
INTERNALERROR> os._exit(1)
INTERNALERROR>
INTERNALERROR> def serve(self):
INTERNALERROR> trace = lambda msg: self._trace("[serve] " + msg)
INTERNALERROR> hasprimary = self.execmodel.backend == "thread"
INTERNALERROR> self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary)
INTERNALERROR> trace("spawning receiver thread")
INTERNALERROR> self._initreceive()
INTERNALERROR> try:
INTERNALERROR> if hasprimary:
INTERNALERROR> # this will return when we are in shutdown
INTERNALERROR> trace("integrating as primary thread")
INTERNALERROR> self._execpool.integrate_as_primary_thread()
INTERNALERROR> trace("joining receiver thread")
INTERNALERROR> self.join()
INTERNALERROR> except KeyboardInterrupt:
INTERNALERROR> # in the slave we can't really do anything sensible
INTERNALERROR> trace("swallowing keyboardinterrupt, serve finished")
INTERNALERROR>
INTERNALERROR> def executetask(self, item):
INTERNALERROR> try:
INTERNALERROR> channel, (source, call_name, kwargs) = item
INTERNALERROR> if not ISPY3 and kwargs:
INTERNALERROR> # some python2 versions do not accept unicode keyword params
INTERNALERROR> # note: Unserializer generally turns py2-str to py3-str objects
INTERNALERROR> newkwargs = {}
INTERNALERROR> for name, value in kwargs.items():
INTERNALERROR> if isinstance(name, unicode):
INTERNALERROR> name = name.encode('ascii')
INTERNALERROR> newkwargs[name] = value
INTERNALERROR> kwargs = newkwargs
INTERNALERROR> loc = {'channel' : channel, '__name__': '__channelexec__'}
INTERNALERROR> self._trace("execution starts[%s]: %s" %
INTERNALERROR> (channel.id, repr(source)[:50]))
INTERNALERROR> channel._executing = True
INTERNALERROR> try:
INTERNALERROR> co = compile(source+'\n', '<remote exec>', 'exec')
INTERNALERROR> do_exec(co, loc) # noqa
INTERNALERROR> if call_name:
INTERNALERROR> self._trace('calling %s(**%60r)' % (call_name, kwargs))
INTERNALERROR> function = loc[call_name]
INTERNALERROR> function(channel, **kwargs)
INTERNALERROR> finally:
INTERNALERROR> channel._executing = False
INTERNALERROR> self._trace("execution finished")
INTERNALERROR> except KeyboardInterrupt:
INTERNALERROR> channel.close(INTERRUPT_TEXT)
INTERNALERROR> raise
INTERNALERROR> except:
INTERNALERROR> excinfo = self.exc_info()
INTERNALERROR> if not isinstance(excinfo[1], EOFError):
INTERNALERROR> if not channel.gateway._channelfactory.finished:
INTERNALERROR> self._trace("got exception: %r" % (excinfo[1],))
INTERNALERROR> errortext = self._geterrortext(excinfo)
INTERNALERROR> channel.close(errortext)
INTERNALERROR> return
INTERNALERROR> self._trace("ignoring EOFError because receiving finished")
INTERNALERROR> channel.close()
INTERNALERROR>
INTERNALERROR> #
INTERNALERROR> # Cross-Python pickling code, tested from test_serializer.py
INTERNALERROR> #
INTERNALERROR>
INTERNALERROR> class DataFormatError(Exception):
INTERNALERROR> pass
INTERNALERROR>
INTERNALERROR> class DumpError(DataFormatError):
INTERNALERROR> """Error while serializing an object."""
INTERNALERROR>
INTERNALERROR> class LoadError(DataFormatError):
INTERNALERROR> """Error while unserializing an object."""
INTERNALERROR>
INTERNALERROR> if ISPY3:
INTERNALERROR> def bchr(n):
INTERNALERROR> return bytes([n])
INTERNALERROR> else:
INTERNALERROR> bchr = chr
INTERNALERROR>
INTERNALERROR> DUMPFORMAT_VERSION = bchr(1)
INTERNALERROR>
INTERNALERROR> FOUR_BYTE_INT_MAX = 2147483647
INTERNALERROR>
INTERNALERROR> FLOAT_FORMAT = "!d"
INTERNALERROR> FLOAT_FORMAT_SIZE = struct.calcsize(FLOAT_FORMAT)
INTERNALERROR>
INTERNALERROR> class _Stop(Exception):
INTERNALERROR> pass
INTERNALERROR>
INTERNALERROR> class Unserializer(object):
INTERNALERROR> num2func = {} # is filled after this class definition
INTERNALERROR> py2str_as_py3str = True # True
INTERNALERROR> py3str_as_py2str = False # false means py2 will get unicode
INTERNALERROR>
INTERNALERROR> def __init__(self, stream, channel_or_gateway=None, strconfig=None):
INTERNALERROR> gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway)
INTERNALERROR> strconfig = getattr(channel_or_gateway, '_strconfig', strconfig)
INTERNALERROR> if strconfig:
INTERNALERROR> self.py2str_as_py3str, self.py3str_as_py2str = strconfig
INTERNALERROR> self.stream = stream
INTERNALERROR> self.channelfactory = getattr(gateway, '_channelfactory', gateway)
INTERNALERROR>
INTERNALERROR> def load(self, versioned=False):
INTERNALERROR> if versioned:
INTERNALERROR> ver = self.stream.read(1)
INTERNALERROR> if ver != DUMPFORMAT_VERSION:
INTERNALERROR> raise LoadError("wrong dumpformat version")
INTERNALERROR> self.stack = []
INTERNALERROR> try:
INTERNALERROR> while True:
INTERNALERROR> opcode = self.stream.read(1)
INTERNALERROR> if not opcode:
INTERNALERROR> raise EOFError
INTERNALERROR> try:
INTERNALERROR> loader = self.num2func[opcode]
INTERNALERROR> except KeyError:
INTERNALERROR> raise LoadError("unkown opcode %r - "
INTERNALERROR> "wire protocol corruption?" % (opcode,))
INTERNALERROR> loader(self)
INTERNALERROR> except _Stop:
INTERNALERROR> if len(self.stack) != 1:
INTERNALERROR> raise LoadError("internal unserialization error")
INTERNALERROR> return self.stack.pop(0)
INTERNALERROR> else:
INTERNALERROR> raise LoadError("didn't get STOP")
INTERNALERROR>
INTERNALERROR> def load_none(self):
INTERNALERROR> self.stack.append(None)
INTERNALERROR>
INTERNALERROR> def load_true(self):
INTERNALERROR> self.stack.append(True)
INTERNALERROR>
INTERNALERROR> def load_false(self):
INTERNALERROR> self.stack.append(False)
INTERNALERROR>
INTERNALERROR> def load_int(self):
INTERNALERROR> i = self._read_int4()
INTERNALERROR> self.stack.append(i)
INTERNALERROR>
INTERNALERROR> def load_longint(self):
INTERNALERROR> s = self._read_byte_string()
INTERNALERROR> self.stack.append(int(s))
INTERNALERROR>
INTERNALERROR> if ISPY3:
INTERNALERROR> load_long = load_int
INTERNALERROR> load_longlong = load_longint
INTERNALERROR> else:
INTERNALERROR> def load_long(self):
INTERNALERROR> i = self._read_int4()
INTERNALERROR> self.stack.append(long(i))
INTERNALERROR>
INTERNALERROR> def load_longlong(self):
INTERNALERROR> l = self._read_byte_string()
INTERNALERROR> self.stack.append(long(l))
INTERNALERROR>
INTERNALERROR> def load_float(self):
INTERNALERROR> binary = self.stream.read(FLOAT_FORMAT_SIZE)
INTERNALERROR> self.stack.append(struct.unpack(FLOAT_FORMAT, binary)[0])
INTERNALERROR>
INTERNALERROR> def _read_int4(self):
INTERNALERROR> return struct.unpack("!i", self.stream.read(4))[0]
INTERNALERROR>
INTERNALERROR> def _read_byte_string(self):
INTERNALERROR> length = self._read_int4()
INTERNALERROR> as_bytes = self.stream.read(length)
INTERNALERROR> return as_bytes
INTERNALERROR>
INTERNALERROR> def load_py3string(self):
INTERNALERROR> as_bytes = self._read_byte_string()
INTERNALERROR> if not ISPY3 and self.py3str_as_py2str:
INTERNALERROR> # XXX Should we try to decode into latin-1?
INTERNALERROR> self.stack.append(as_bytes)
INTERNALERROR> else:
INTERNALERROR> self.stack.append(as_bytes.decode("utf-8"))
INTERNALERROR>
INTERNALERROR> def load_py2string(self):
INTERNALERROR> as_bytes = self._read_byte_string()
INTERNALERROR> if ISPY3 and self.py2str_as_py3str:
INTERNALERROR> s = as_bytes.decode("latin-1")
INTERNALERROR> else:
INTERNALERROR> s = as_bytes
INTERNALERROR> self.stack.append(s)
INTERNALERROR>
INTERNALERROR> def load_bytes(self):
INTERNALERROR> s = self._read_byte_string()
INTERNALERROR> self.stack.append(s)
INTERNALERROR>
INTERNALERROR> def load_unicode(self):
INTERNALERROR> self.stack.append(self._read_byte_string().decode("utf-8"))
INTERNALERROR>
INTERNALERROR> def load_newlist(self):
INTERNALERROR> length = self._read_int4()
INTERNALERROR> self.stack.append([None] * length)
INTERNALERROR>
INTERNALERROR> def load_setitem(self):
INTERNALERROR> if len(self.stack) < 3:
INTERNALERROR> raise LoadError("not enough items for setitem")
INTERNALERROR> value = self.stack.pop()
INTERNALERROR> key = self.stack.pop()
INTERNALERROR> self.stack[-1][key] = value
INTERNALERROR>
INTERNALERROR> def load_newdict(self):
INTERNALERROR> self.stack.append({})
INTERNALERROR>
INTERNALERROR> def _load_collection(self, type_):
INTERNALERROR> length = self._read_int4()
INTERNALERROR> if length:
INTERNALERROR> res = type_(self.stack[-length:])
INTERNALERROR> del self.stack[-length:]
INTERNALERROR> self.stack.append(res)
INTERNALERROR> else:
INTERNALERROR> self.stack.append(type_())
INTERNALERROR>
INTERNALERROR> def load_buildtuple(self):
INTERNALERROR> self._load_collection(tuple)
INTERNALERROR>
INTERNALERROR> def load_set(self):
INTERNALERROR> self._load_collection(set)
INTERNALERROR>
INTERNALERROR> def load_frozenset(self):
INTERNALERROR> self._load_collection(frozenset)
INTERNALERROR>
INTERNALERROR> def load_stop(self):
INTERNALERROR> raise _Stop
INTERNALERROR>
INTERNALERROR> def load_channel(self):
INTERNALERROR> id = self._read_int4()
INTERNALERROR> newchannel = self.channelfactory.new(id)
INTERNALERROR> self.stack.append(newchannel)
INTERNALERROR>
INTERNALERROR> # automatically build opcodes and byte-encoding
INTERNALERROR>
INTERNALERROR> class opcode:
INTERNALERROR> """ container for name -> num mappings. """
INTERNALERROR>
INTERNALERROR> def _buildopcodes():
INTERNALERROR> l = []
INTERNALERROR> for name, func in Unserializer.__dict__.items():
INTERNALERROR> if name.startswith("load_"):
INTERNALERROR> opname = name[5:].upper()
INTERNALERROR> l.append((opname, func))
INTERNALERROR> l.sort()
INTERNALERROR> for i,(opname, func) in enumerate(l):
INTERNALERROR> assert i < 26, "xxx"
INTERNALERROR> i = bchr(64+i)
INTERNALERROR> Unserializer.num2func[i] = func
INTERNALERROR> setattr(opcode, opname, i)
INTERNALERROR>
INTERNALERROR> _buildopcodes()
INTERNALERROR>
INTERNALERROR> def dumps(obj):
INTERNALERROR> """ return a serialized bytestring of the given obj.
INTERNALERROR>
INTERNALERROR> The obj and all contained objects must be of a builtin
INTERNALERROR> python type (so nested dicts, sets, etc. are all ok but
INTERNALERROR> not user-level instances).
INTERNALERROR> """
INTERNALERROR> return _Serializer().save(obj, versioned=True)
INTERNALERROR>
INTERNALERROR> def dump(byteio, obj):
INTERNALERROR> """ write a serialized bytestring of the given obj to the given stream. """
INTERNALERROR> _Serializer(write=byteio.write).save(obj, versioned=True)
INTERNALERROR>
INTERNALERROR> def loads(bytestring, py2str_as_py3str=False, py3str_as_py2str=False):
INTERNALERROR> """ return the object as deserialized from the given bytestring.
INTERNALERROR>
INTERNALERROR> py2str_as_py3str: if true then string (str) objects previously
INTERNALERROR> dumped on Python2 will be loaded as Python3
INTERNALERROR> strings which really are text objects.
INTERNALERROR> py3str_as_py2str: if true then string (str) objects previously
INTERNALERROR> dumped on Python3 will be loaded as Python2
INTERNALERROR> strings instead of unicode objects.
INTERNALERROR>
INTERNALERROR> if the bytestring was dumped with an incompatible protocol
INTERNALERROR> version or if the bytestring is corrupted, the
INTERNALERROR> ``execnet.DataFormatError`` will be raised.
INTERNALERROR> """
INTERNALERROR> io = BytesIO(bytestring)
INTERNALERROR> return load(io, py2str_as_py3str=py2str_as_py3str,
INTERNALERROR> py3str_as_py2str=py3str_as_py2str)
INTERNALERROR>
INTERNALERROR> def load(io, py2str_as_py3str=False, py3str_as_py2str=False):
INTERNALERROR> """ derserialize an object form the specified stream.
INTERNALERROR>
INTERNALERROR> Behaviour and parameters are otherwise the same as with ``loads``
INTERNALERROR> """
INTERNALERROR> strconfig=(py2str_as_py3str, py3str_as_py2str)
INTERNALERROR> return Unserializer(io, strconfig=strconfig).load(versioned=True)
INTERNALERROR>
INTERNALERROR> def loads_internal(bytestring, channelfactory=None, strconfig=None):
INTERNALERROR> io = BytesIO(bytestring)
INTERNALERROR> return Unserializer(io, channelfactory, strconfig).load()
INTERNALERROR>
INTERNALERROR> def dumps_internal(obj):
INTERNALERROR> return _Serializer().save(obj)
INTERNALERROR>
INTERNALERROR>
INTERNALERROR> class _Serializer(object):
INTERNALERROR> _dispatch = {}
INTERNALERROR>
INTERNALERROR> def __init__(self, write=None):
INTERNALERROR> if write is None:
INTERNALERROR> self._streamlist = []
INTERNALERROR> write = self._streamlist.append
INTERNALERROR> self._write = write
INTERNALERROR>
INTERNALERROR> def save(self, obj, versioned=False):
INTERNALERROR> # calling here is not re-entrant but multiple instances
INTERNALERROR> # may write to the same stream because of the common platform
INTERNALERROR> # atomic-write guaruantee (concurrent writes each happen atomicly)
INTERNALERROR> if versioned:
INTERNALERROR> self._write(DUMPFORMAT_VERSION)
INTERNALERROR> self._save(obj)
INTERNALERROR> self._write(opcode.STOP)
INTERNALERROR> try:
INTERNALERROR> streamlist = self._streamlist
INTERNALERROR> except AttributeError:
INTERNALERROR> return None
INTERNALERROR> return type(streamlist[0])().join(streamlist)
INTERNALERROR>
INTERNALERROR> def _save(self, obj):
INTERNALERROR> tp = type(obj)
INTERNALERROR> try:
INTERNALERROR> dispatch = self._dispatch[tp]
INTERNALERROR> except KeyError:
INTERNALERROR> methodname = 'save_' + tp.__name__
INTERNALERROR> meth = getattr(self.__class__, methodname, None)
INTERNALERROR> if meth is None:
INTERNALERROR> raise DumpError("can't serialize %s" % (tp,))
INTERNALERROR> dispatch = self._dispatch[tp] = meth
INTERNALERROR> dispatch(self, obj)
INTERNALERROR>
INTERNALERROR> def save_NoneType(self, non):
INTERNALERROR> self._write(opcode.NONE)
INTERNALERROR>
INTERNALERROR> def save_bool(self, boolean):
INTERNALERROR> if boolean:
INTERNALERROR> self._write(opcode.TRUE)
INTERNALERROR> else:
INTERNALERROR> self._write(opcode.FALSE)
INTERNALERROR>
INTERNALERROR> def save_bytes(self, bytes_):
INTERNALERROR> self._write(opcode.BYTES)
INTERNALERROR> self._write_byte_sequence(bytes_)
INTERNALERROR>
INTERNALERROR> if ISPY3:
INTERNALERROR> def save_str(self, s):
INTERNALERROR> self._write(opcode.PY3STRING)
INTERNALERROR> self._write_unicode_string(s)
INTERNALERROR> else:
INTERNALERROR> def save_str(self, s):
INTERNALERROR> self._write(opcode.PY2STRING)
INTERNALERROR> self._write_byte_sequence(s)
INTERNALERROR>
INTERNALERROR> def save_unicode(self, s):
INTERNALERROR> self._write(opcode.UNICODE)
INTERNALERROR> self._write_unicode_string(s)
INTERNALERROR>
INTERNALERROR> def _write_unicode_string(self, s):
INTERNALERROR> try:
INTERNALERROR> as_bytes = s.encode("utf-8")
INTERNALERROR> except UnicodeEncodeError:
INTERNALERROR> raise DumpError("strings must be utf-8 encodable")
INTERNALERROR> self._write_byte_sequence(as_bytes)
INTERNALERROR>
INTERNALERROR> def _write_byte_sequence(self, bytes_):
INTERNALERROR> self._write_int4(len(bytes_), "string is too long")
INTERNALERROR> self._write(bytes_)
INTERNALERROR>
INTERNALERROR> def _save_integral(self, i, short_op, long_op):
INTERNALERROR> if i <= FOUR_BYTE_INT_MAX:
INTERNALERROR> self._write(short_op)
INTERNALERROR> self._write_int4(i)
INTERNALERROR> else:
INTERNALERROR> self._write(long_op)
INTERNALERROR> self._write_byte_sequence(str(i).rstrip("L").encode("ascii"))
INTERNALERROR>
INTERNALERROR> def save_int(self, i):
INTERNALERROR> self._save_integral(i, opcode.INT, opcode.LONGINT)
INTERNALERROR>
INTERNALERROR> def save_long(self, l):
INTERNALERROR> self._save_integral(l, opcode.LONG, opcode.LONGLONG)
INTERNALERROR>
INTERNALERROR> def save_float(self, flt):
INTERNALERROR> self._write(opcode.FLOAT)
INTERNALERROR> self._write(struct.pack(FLOAT_FORMAT, flt))
INTERNALERROR>
INTERNALERROR> def _write_int4(self, i, error="int must be less than %i" %
INTERNALERROR> (FOUR_BYTE_INT_MAX,)):
INTERNALERROR> if i > FOUR_BYTE_INT_MAX:
INTERNALERROR> raise DumpError(error)
INTERNALERROR> self._write(struct.pack("!i", i))
INTERNALERROR>
INTERNALERROR> def save_list(self, L):
INTERNALERROR> self._write(opcode.NEWLIST)
INTERNALERROR> self._write_int4(len(L), "list is too long")
INTERNALERROR> for i, item in enumerate(L):
INTERNALERROR> self._write_setitem(i, item)
INTERNALERROR>
INTERNALERROR> def _write_setitem(self, key, value):
INTERNALERROR> self._save(key)
INTERNALERROR> self._save(value)
INTERNALERROR> self._write(opcode.SETITEM)
INTERNALERROR>
INTERNALERROR> def save_dict(self, d):
INTERNALERROR> self._write(opcode.NEWDICT)
INTERNALERROR> for key, value in d.items():
INTERNALERROR> self._write_setitem(key, value)
INTERNALERROR>
INTERNALERROR> def save_tuple(self, tup):
INTERNALERROR> for item in tup:
INTERNALERROR> self._save(item)
INTERNALERROR> self._write(opcode.BUILDTUPLE)
INTERNALERROR> self._write_int4(len(tup), "tuple is too long")
INTERNALERROR>
INTERNALERROR> def _write_set(self, s, op):
INTERNALERROR> for item in s:
INTERNALERROR> self._save(item)
INTERNALERROR> self._write(op)
INTERNALERROR> self._write_int4(len(s), "set is too long")
INTERNALERROR>
INTERNALERROR> def save_set(self, s):
INTERNALERROR> self._write_set(s, opcode.SET)
INTERNALERROR>
INTERNALERROR> def save_frozenset(self, s):
INTERNALERROR> self._write_set(s, opcode.FROZENSET)
INTERNALERROR>
INTERNALERROR> def save_Channel(self, channel):
INTERNALERROR> self._write(opcode.CHANNEL)
INTERNALERROR> self._write_int4(channel.id)
INTERNALERROR>
INTERNALERROR> def init_popen_io(execmodel):
INTERNALERROR> if not hasattr(os, 'dup'): # jython
INTERNALERROR> io = Popen2IO(sys.stdout, sys.stdin, execmodel)
INTERNALERROR> import tempfile
INTERNALERROR> sys.stdin = tempfile.TemporaryFile('r')
INTERNALERROR> sys.stdout = tempfile.TemporaryFile('w')
INTERNALERROR> else:
INTERNALERROR> try:
INTERNALERROR> devnull = os.devnull
INTERNALERROR> except AttributeError:
INTERNALERROR> if os.name == 'nt':
INTERNALERROR> devnull = 'NUL'
INTERNALERROR> else:
INTERNALERROR> devnull = '/dev/null'
INTERNALERROR> # stdin
INTERNALERROR> stdin = execmodel.fdopen(os.dup(0), 'r', 1)
INTERNALERROR> fd = os.open(devnull, os.O_RDONLY)
INTERNALERROR> os.dup2(fd, 0)
INTERNALERROR> os.close(fd)
INTERNALERROR>
INTERNALERROR> # stdout
INTERNALERROR> stdout = execmodel.fdopen(os.dup(1), 'w', 1)
INTERNALERROR> fd = os.open(devnull, os.O_WRONLY)
INTERNALERROR> os.dup2(fd, 1)
INTERNALERROR>
INTERNALERROR> # stderr for win32
INTERNALERROR> if os.name == 'nt':
INTERNALERROR> sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1)
INTERNALERROR> os.dup2(fd, 2)
INTERNALERROR> os.close(fd)
INTERNALERROR> io = Popen2IO(stdout, stdin, execmodel)
INTERNALERROR> sys.stdin = execmodel.fdopen(0, 'r', 1)
INTERNALERROR> sys.stdout = execmodel.fdopen(1, 'w', 1)
INTERNALERROR> return io
INTERNALERROR>
INTERNALERROR> def serve(io, id):
INTERNALERROR> trace("creating slavegateway on %r" %(io,))
INTERNALERROR> SlaveGateway(io=io, id=id, _startcount=2).serve()
INTERNALERROR>
INTERNALERROR> import socket
INTERNALERROR> class SocketIO:
INTERNALERROR> def __init__(self, sock, execmodel):
INTERNALERROR> self.sock = sock
INTERNALERROR> self.execmodel = execmodel
INTERNALERROR> socket = execmodel.socket
INTERNALERROR> try:
INTERNALERROR> sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY
INTERNALERROR> sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
INTERNALERROR> except (AttributeError, socket.error):
INTERNALERROR> sys.stderr.write("WARNING: cannot set socketoption")
INTERNALERROR>
INTERNALERROR> def read(self, numbytes):
INTERNALERROR> "Read exactly 'bytes' bytes from the socket."
INTERNALERROR> buf = bytes()
INTERNALERROR> while len(buf) < numbytes:
INTERNALERROR> t = self.sock.recv(numbytes - len(buf))
INTERNALERROR> if not t:
INTERNALERROR> raise EOFError
INTERNALERROR> buf += t
INTERNALERROR> return buf
INTERNALERROR>
INTERNALERROR> def write(self, data):
INTERNALERROR> self.sock.sendall(data)
INTERNALERROR>
INTERNALERROR> def close_read(self):
INTERNALERROR> try:
INTERNALERROR> self.sock.shutdown(0)
INTERNALERROR> except self.execmodel.socket.error:
INTERNALERROR> pass
INTERNALERROR> def close_write(self):
INTERNALERROR> try:
INTERNALERROR> self.sock.shutdown(1)
INTERNALERROR> except self.execmodel.socket.error:
INTERNALERROR> pass
INTERNALERROR>
INTERNALERROR> def wait(self):
INTERNALERROR> pass
INTERNALERROR>
INTERNALERROR> def kill(self):
INTERNALERROR> pass
INTERNALERROR>
INTERNALERROR> try: execmodel
INTERNALERROR> except NameError:
INTERNALERROR> execmodel = get_execmodel('thread')
INTERNALERROR> io = SocketIO(clientsock, execmodel)
INTERNALERROR> io.write('1'.encode('ascii'))
INTERNALERROR> serve(io, id='socket=192.168.1.108:8888-slave')", line 1029, in executetask
INTERNALERROR> File "<string>", line 1, in do_exec
INTERNALERROR> File "<remote exec>", line 109, in <module>
INTERNALERROR> File "<remote exec>", line 104, in serve_rsync
INTERNALERROR> AttributeError: 'module' object has no attribute 'symlink'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment