Skip to content

Instantly share code, notes, and snippets.

@kquick
Created February 10, 2022 23:13
Show Gist options
  • Save kquick/371519aa6cb0bea16961cc100eb7cf3f to your computer and use it in GitHub Desktop.
Save kquick/371519aa6cb0bea16961cc100eb7cf3f to your computer and use it in GitHub Desktop.
"""Simple TCP sockets.
Each Actor has a TCP IPv4 port/socket that will accept incoming
connections for messages. Each connection from a remote Actor will
accept a single message per connection. The connection is dropped and
re-established for multiple messages; this is less efficient but has
more fairness.
This transport can be used within a process, between processes, and
even between processes on separate systems.
"""
# n.b. The core of this is very similar to asyncore/asynchat.
# Unfortunately, those modules are deprecated in Python 3.4 in favor
# of asyncio, which is powerful... and complex. Thespian aims to
# support Python 2.6 through 3.4 and beyond, and has more specific
# needs (undoubtably a subset of asyncio capabilities) that can be
# implemented more simply and directly here. In addition, this module
# should be extensible to support SSL, which asynchat is not (maybe).
# For Thespian, there are two classes of sockets:
# * the Actor's primary receive socket, and
# * transient outgoing send sockets.
# All steps of both types of sockets are handled asynchronously, very
# similarly to the asyncore channel.
#
# For the receive socket, it will listen for and accept incoming
# connections, and then accept a single message on that connection,
# closing the connection on completion (or error).
#
# For the transmit socket, it will connect, send, and close with a
# TransmitIntent.
# ----------------------------------------------------------------------
# TCP Buffering issues
#
# TCP is unique in that there are unusual buffering considerations to
# account for. Specifically, a sender can connect to a listener, send
# a message, and close the socket --- *WITHOUT the receiver even
# processing the accept! As a result, the transmitter must take
# additional steps to ensure that the message that was sent has been
# delivered.
#
# There are two ways that this confirmation can be handled:
#
# 1) confirmation sent back in the original connection
#
# 2) messages confirmed by a separate confirmation message with a
# unique message identifier for idempotency.
#
# Disadvantages of #1:
# * More complicated exchange between sender and receiver
# * There must be a header synchronization with a size indicator so
# that the receiver knows when the full message has been received
# and should be acknowledged.
# * The socket must exist for a potentially much longer period and
# retransmits must still be attempted on failure.
#
# Disadvantages of #2:
# * Doubles connection establishment requirements.
# * More complicated send queuing to ensure ordering of messages between
# sender and recipient. However, this really must exist for condition
# #1 as well.
#
# Could also do a hybrid of both. On send, start with header
# containing message ID (and size?) then wait a brief time after send
# for the ACK, then disconnect and wait for the separate ACK later.
#
# At this point, the connection establishment seems to be the
# overriding performance dominator, and the message header
# synchronization and size indication seem like good ideas anyhow to
# confirm that the entire message has been received by the recipient.
# This method is feasible because of the asynchronous handling of the
# transmit sequence (as opposed to a blocking transmit, which would
# consume the processing budget for highly active scenarios).
import logging
from thespian.system.utilis import (thesplog, fmap, partition)
from thespian.system.timing import (timePeriodSeconds, ExpirationTimer,
currentTime)
from thespian.actors import *
from thespian.system.transport import *
from thespian.system.transport.IPBase import (TCPv4ActorAddress)
from thespian.system.transport.streamBuffer import (toSendBuffer,
ReceiveBuffer,
ackMsg, ackPacket,
ackDataErrMsg,
isControlMessage)
from thespian.system.transport.asyncTransportBase import (asyncTransportBase,
exclusive_processing)
from thespian.system.transport.wakeupTransportBase import wakeupTransportBase
from thespian.system.transport.errmgmt import *
from thespian.system.messages.convention import CONV_ADDR_IPV4_CAPABILITY
from thespian.system.messages.multiproc import ChildMayHaveDied
from thespian.system.addressManager import ActorLocalAddress
import socket
import select
from datetime import timedelta
try:
import cPickle as pickle
except Exception:
import pickle # type: ignore
import errno
from contextlib import closing
DEFAULT_ADMIN_PORT = 1900
serializer = pickle
# json cannot be used because Messages are often structures, which
# cannot be converted to JSON.
# max # of listens to sign up for at a time
LISTEN_DEPTH = 100
# max time to hold open an incoming socket
MAX_INCOMING_SOCKET_PERIOD = timedelta(minutes=7)
MAX_CONSECUTIVE_READ_FAILURES = 20
# close idle sockets after this amount of time
MAX_IDLE_SOCKET_PERIOD = timedelta(minutes=20)
# if true, keep sockets open for multiple messages
REUSE_SOCKETS = True
class TCPEndpoint(TransportInit__Base):
def __init__(self, *args): self.args = args
@property
def addrInst(self): return self.args[0]
def _safeSocketShutdown(sock):
if sock:
sock = getattr(sock, 'socket', sock)
if sock and hasattr(sock, 'shutdown'):
try:
sock.shutdown(socket.SHUT_RDWR)
except socket.error as ex:
if ex.errno != errno.ENOTCONN:
thesplog('Error during shutdown of socket %s: %s', sock, ex)
sock.close()
class TCPIncoming_Common(PauseWithBackoff):
def __init__(self, rmtAddr, baseSock, rcvBuf=None):
super(TCPIncoming_Common, self).__init__()
self._openSock = baseSock
# _rmtAddr may be None until a message is rcvd with
# identification
self._rmtAddr = rmtAddr
self._rData = rcvBuf or ReceiveBuffer(serializer.loads)
self._expires = ExpirationTimer(MAX_INCOMING_SOCKET_PERIOD)
self.failCount = 0
@property
def socket(self):
return self._openSock
@property
def fromAddress(self):
return self._rmtAddr
@fromAddress.setter
def fromAddress(self, newAddr):
self._rmtAddr = newAddr
def delay(self, current_time = None):
ct = current_time or currentTime()
# n.b. include _pauseUntil from PauseWithBackoff
return max(timedelta(seconds=0),
min(self._expires.view(ct).remaining(),
getattr(self, '_pauseUntil', self._expires).view(ct).remaining()))
def addData(self, newData): self._rData.addMore(newData)
def remainingSize(self): return self._rData.remainingAmount()
def receivedAllData(self): return self._rData.isDone()
@property
def data(self): return self._rData.completed()
def close(self):
_safeSocketShutdown(self)
self._openSock = None
def __str__(self):
return 'TCPInc(%s)<%s>' % (str(self._rmtAddr), str(self._rData))
class TCPIncoming(TCPIncoming_Common):
def __del__(self):
_safeSocketShutdown(self)
self._openSock = None
class TCPIncomingPersistent(TCPIncoming_Common):
pass
class IdleSocket(object):
def __init__(self, socket, addr):
self.socket = socket
self.rmtaddr = addr
# n.b. the remote may have bound an outbound connect socket to
# a different address, but rmtAddr represents the primary
# address of an Actor/Admin: the one it listens on.
# self.rmtAddr = rmtAddr
self.validity = ExpirationTimer(MAX_IDLE_SOCKET_PERIOD)
def expired(self):
return self.validity.view().expired()
def __str__(self):
return 'Idle-socket %s->%s (%s)' % (str(self.socket),
str(self.rmtaddr),
str(self.validity))
def shutdown(self, shtarg=socket.SHUT_RDWR):
self.socket.shutdown(shtarg)
def close(self):
self.socket.close()
def opsKey(addr):
return addr.addressDetails
# The definition of these two address types has moved to IPBase, but
# declare them here as well for backward compatibility with older
# running Thespian instances.
import thespian.system.transport.IPBase
class RoutedTCPv4ActorAddress(
thespian.system.transport.IPBase.RoutedTCPv4ActorAddress):
pass
class TXOnlyAdminTCPv4ActorAddress(
thespian.system.transport.IPBase.TXOnlyAdminTCPv4ActorAddress):
pass
class ExternalTransportCopy(object): pass
class TCPTransport(asyncTransportBase, wakeupTransportBase):
"A transport using TCP IPv4 sockets for communications."
def __init__(self, initType, *args):
super(TCPTransport, self).__init__()
if isinstance(initType, ExternalInterfaceTransportInit):
# External process that is going to talk "in". There is
# no parent, and the child is the systemAdmin.
capabilities, logDefs, concurrency_context = args
adminRouting = False
self.txOnly = False # communications from outside-in are always local and therefore not restricted.
convAddr = capabilities.get(CONV_ADDR_IPV4_CAPABILITY, '')
if isinstance(convAddr, list):
convAddr = convAddr[0]
if convAddr and type(convAddr) == type( (1,2) ):
externalAddr = convAddr
elif type(convAddr) == type("") and ':' in convAddr:
externalAddr = convAddr.split(':')
externalAddr = externalAddr[0], int(externalAddr[1])
else:
externalAddr = (convAddr, capabilities.get('Admin Port', DEFAULT_ADMIN_PORT))
templateAddr = ActorAddress(TCPv4ActorAddress(None, 0, external = externalAddr))
self._adminAddr = TCPTransport.getAdminAddr(capabilities)
self._parentAddr = None
isAdmin = False
elif isinstance(initType, TCPEndpoint):
instanceNum, assignedAddr, self._parentAddr, self._adminAddr, adminRouting, self.txOnly = initType.args
isAdmin = assignedAddr == self._adminAddr
templateAddr = assignedAddr or \
ActorAddress(
TCPv4ActorAddress(None, 0,
external=(self._parentAddr or
self._adminAddr or
True)))
elif isinstance(initType, ExternalTransportCopy):
self._adminAddr, self.txOnly, adminRouting = args
self._parentAddr = None
isAdmin = False
templateAddr = ActorAddress(
TCPv4ActorAddress(None, 0, self._adminAddr))
else:
thesplog('TCPTransport init of type %s unsupported', type(initType), level=logging.ERROR)
raise ActorSystemStartupFailure('Invalid TCPTransport init type (%s)'%type(initType))
self.socket = socket.socket(*templateAddr.addressDetails.socketArgs)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(*templateAddr.addressDetails.bindArgs)
self.socket.listen(LISTEN_DEPTH)
# N.B. myAddress is actually the address we will export for
# others to talk to us, not the bind address. The difference
# is that we bind to '0.0.0.0' (inaddr_any), but that's not a
# valid address for people to send stuff to us. The
# self.socket socket name is likely inaddr_any but has the
# valid port, whereas the templateAddr has our actual public
# address.
if isAdmin and self.txOnly:
# Must be the admin, and in txOnly mode
self.myAddress = ActorAddress(TXOnlyAdminTCPv4ActorAddress(
templateAddr.addressDetails.connectArgs[0][0],
self.socket.getsockname()[1],
external=True))
elif adminRouting:
self.myAddress = ActorAddress(RoutedTCPv4ActorAddress(
templateAddr.addressDetails.connectArgs[0][0],
self.socket.getsockname()[1],
self._adminAddr,
txOnly=self.txOnly,
external=True))
else:
self.myAddress = ActorAddress(TCPv4ActorAddress(
templateAddr.addressDetails.connectArgs[0][0],
self.socket.getsockname()[1],
external=True))
self._transmitIntents = {} # key = fd, value = tx intent
self._waitingTransmits = [] # list of intents without sockets
self._incomingSockets = {} # key = fd, value = TCP Incoming
self._incomingEnvelopes = []
self._finished_intents = []
self._watches = []
if REUSE_SOCKETS:
# key = opsKey(remote listen address), value=IdleSocket
self._openSockets = {}
self._checkChildren = False
self._shutdownSignalled = False
def close(self):
"""Releases all resources and terminates functionality. This is
better done deterministically by explicitly calling this
method (although __del__ will attempt to perform similar
operations), but it has the unfortunate side-effect of
making this object modal: after the close it can be
referenced but not successfully used anymore, so it
explicitly nullifies its contents.
"""
if hasattr(self, '_transmitIntents'):
for each in self._transmitIntents:
self._transmitIntents[each].tx_done(SendStatus.Failed)
delattr(self, '_transmitIntents')
if hasattr(self, '_waitingTransmits'):
for each in self._waitingTransmits:
each.tx_done(SendStatus.Failed)
delattr(self, '_waitingTransmits')
if hasattr(self, '_incomingSockets'):
for each in self._incomingSockets:
self._incomingSockets[each].close()
delattr(self, '_incomingSockets')
if hasattr(self, 'socket'):
self._safeSocketShutdown(getattr(self, 'socket', None))
delattr(self, 'socket')
def __del__(self):
self.close()
@staticmethod
def _safeSocketShutdown(sock):
# n.b. _safeSocketShutdown is a static method instead of a
# global because if __del__ calls close, the
# _safeSocketShutdown may already have been unbound. However,
# this still needs unusual protection to validate socket in
# case socket was already unloaded.
if sock and socket and isinstance(socket.error, Exception):
sock = getattr(sock, 'socket', sock)
try:
sock.shutdown(socket.SHUT_RDWR)
except socket.error as ex:
if ex.errno != errno.ENOTCONN:
thesplog('Error during shutdown of socket %s: %s', sock, ex)
sock.close()
def protectedFileNumList(self):
return (list(self._transmitIntents.keys()) +
list(filter(None, map(self._socketFile,
self._waitingTransmits))) +
list(self._incomingSockets.keys()) + [self.socket.fileno()])
def childResetFileNumList(self):
return self.protectedFileNumList() + \
[self._openSockets[S].socket.fileno() for S in getattr(self, '_openSockets', [])]
@staticmethod
def getAdminAddr(capabilities):
return ActorAddress(
(TXOnlyAdminTCPv4ActorAddress
if capabilities.get('Outbound Only', False) else
TCPv4ActorAddress)
(None, capabilities.get('Admin Port', DEFAULT_ADMIN_PORT),
external=(TCPTransport.getConventionAddress(capabilities) or
('', capabilities.get('Admin Port',
DEFAULT_ADMIN_PORT)) or
True)))
@staticmethod
def getAddressFromString(addrspec, adminRouting=False):
if isinstance(addrspec, tuple):
addrparts = addrspec
else:
addrparts = addrspec.split(':')
addrtype = (RoutedTCPv4ActorAddress if adminRouting else
TCPv4ActorAddress)
return ActorAddress(
addrtype(addrparts[0],
addrparts[1] if addrparts[1:] else DEFAULT_ADMIN_PORT,
external=True))
#TODO - Need to gracefully handle the scenario when host suffers catastrophic failure
@staticmethod
def getConventionAddress(capabilities):
convAddr = capabilities.get(CONV_ADDR_IPV4_CAPABILITY, None)
if not convAddr:
return None
try:
if isinstance(convAddr, list):
return [TCPTransport.getAddressFromString(a) for a in convAddr]
return [TCPTransport.getAddressFromString(convAddr)]
except Exception as ex:
thesplog('Invalid TCP convention address entry "%s": %s',
convAddr, ex,
level=logging.ERROR)
raise InvalidActorAddress(convAddr, str(ex))
def external_transport_clone(self):
# An external process wants a unique context for communicating
# with Actors.
return TCPTransport(ExternalTransportCopy(),
self._adminAddr,
self.txOnly,
isinstance(self.myAddress.addressDetails,
RoutedTCPv4ActorAddress))
def _updateStatusResponse(self, resp):
"""Called to update a Thespian_SystemStatus or Thespian_ActorStatus
with common information
"""
for each in self._transmitIntents.values():
resp.addPendingMessage(self.myAddress,
each.targetAddr,
str(each.message))
for each in self._waitingTransmits:
resp.addPendingMessage(self.myAddress,
each.targetAddr,
str(each.message))
for each in self._incomingEnvelopes:
resp.addReceivedMessage(each.sender,
self.myAddress,
str(each.message))
asyncTransportBase._updateStatusResponse(self, resp)
wakeupTransportBase._updateStatusResponse(self, resp)
if hasattr(self, '_openSockets'):
for num, each in enumerate(self._openSockets.values()):
resp.addKeyVal(str(each), 'sock#%d-fd%d' % (num, each.socket.fileno()))
@staticmethod
def probeAdmin(addr):
"""Called to see if there might be an admin running already at the
specified addr. This is called from the systemBase, so
simple blocking operations are fine. This only needs to
check for a responder; higher level logic will verify that
it's actually an ActorAdmin suitable for use.
"""
ss = socket.socket(*addr.addressDetails.socketArgs)
try:
ss.setsockopt(socket.SOL_SOCKET,
getattr(socket, 'SO_EXCLUSIVEADDRUSE',
socket.SO_REUSEADDR), 1)
try:
ss.bind(*addr.addressDetails.bindArgs)
# no other process bound
return False
except socket.error as ex:
if err_bind_inuse(ex.errno):
return True
# Some other error... not sure if that means an admin
# is running or not.
return False # assume not
finally:
ss.close()
def prepEndpoint(self, assignedLocalAddr, capabilities):
"""In the parent, prepare to establish a new communications endpoint
with a new Child Actor. The result of this call will be
passed to a created child process to use when initializing
the Transport object for that class; the result of this
call will also be kept by the parent to finalize the
communications after creation of the Child by calling
connectEndpoint() with this returned object.
"""
if isinstance(assignedLocalAddr.addressDetails, ActorLocalAddress):
a1, a2 = assignedLocalAddr.addressDetails.addressInstanceNum, None
else:
# assumed to be an actual TCPActorAddress-based address
# (e.g. admin)
a1, a2 = None, assignedLocalAddr
return TCPEndpoint(a1, a2,
self.myAddress,
self._adminAddr,
capabilities.get('Admin Routing', False) or
capabilities.get('Outbound Only', False),
capabilities.get('Outbound Only', False))
def connectEndpoint(self, endPoint):
pass
def deadAddress(self, addressManager, childAddr):
canceli, continuei = partition(lambda i: i[1].targetAddr == childAddr,
self._transmitIntents.items())
self._transmitIntents = dict(continuei)
for _, each in canceli:
each.socket.close()
delattr(each, 'socket')
self._finishIntent(each, SendStatus.DeadTarget)
canceli, continuei = partition(lambda i: i.targetAddr == childAddr,
self._waitingTransmits)
self._waitingTransmits = continuei
for each in canceli:
self._finishIntent(each, SendStatus.DeadTarget)
# No need to clean up self._incomingSockets entries: they will
# timeout naturally.
super(TCPTransport, self).deadAddress(addressManager, childAddr)
# Ports may be re-used, so do not set this address to dead in
# the address manager
def close_oldest_idle_sockets(self, num_to_close=1):
if hasattr(self, '_openSockets'):
aged_keys = sorted([(self._openSockets[K].validity, K)
for K in self._openSockets])
for _,oldkey in aged_keys[:num_to_close]:
_safeSocketShutdown(self._openSockets.pop(oldkey))
def new_socket(self, op, *args, **kw):
try:
return op(*args, **kw)
except IOError as ex:
if err_too_many_open_sockets(ex):
pass
else:
raise
self.close_oldest_idle_sockets(3)
return op(*args, **kw)
_XMITStepSendConnect = 1
_XMITStepSendData = 2
_XMITStepShutdownWrite = 3
_XMITStepWaitForAck = 4
_XMITStepFinishCleanup = 5
_XMITStepRetry = 6
def serializer(self, intent):
return toSendBuffer((self.myAddress, intent.message), serializer.dumps)
def lostRemote(self, rmtaddr):
"""[optional] Called by adminstrative levels (e.g. convention.py) to
indicate that the indicated remote address is no longer
accessible. This is customarily used only by the Admin in
"Admin Routing" scenarios when the remote is shutdown or
de-registered to allow the transport to cleanup (e.g. close
open sockets, etc.).
This does *not* do anything to remote TXOnly sockets: those
connections were initiated by the remote and should
therefore be dropped by the remote. Dropping those
connections at this point would be harmful, especially
because this is typically called when first reconnecting to
the remote.
"""
if isinstance(rmtaddr.addressDetails, TXOnlyAdminTCPv4ActorAddress):
return
if hasattr(self, '_openSockets'):
for rmvkey in [each
for each in self._openSockets
if rmtaddr.addressDetails.isSameSystem(
self._openSockets[each].rmtaddr)]:
_safeSocketShutdown(self._openSockets[rmvkey])
del self._openSockets[rmvkey]
for each in [i for i in self._transmitIntents
if rmtaddr.addressDetails.isSameSystem(
self._transmitIntents[i].targetAddr)]:
self._cancel_fd_ops(each)
for each in [i for i,v in self._incomingSockets.items()
if rmtaddr.addressDetails.isSameSystem(
v.fromAddress
if v.fromAddress.addressDetails else
v.socket)]:
self._cancel_fd_ops(each)
def _cancel_fd_ops(self, errfileno):
if errfileno == self.socket.fileno():
thesplog('SELECT FATAL ERROR ON MAIN LISTEN SOCKET',
level=logging.ERROR)
raise RuntimeError('Fatal listen socket error; aborting')
if errfileno in self._incomingSockets:
incoming = self._incomingSockets[errfileno]
del self._incomingSockets[errfileno]
incoming = self._handlePossibleIncoming(incoming, errfileno,
closed=True)
if incoming:
self._incomingSockets[incoming.socket.fileno()] = incoming
return
if self._processIntents(errfileno, closed=True):
return
if self._waitingTransmits:
W = self._waitingTransmits.pop(0)
if self._nextTransmitStepCheck(W, errfileno, closed=True):
self._waitingTransmits.append(W)
return
closed_openSocks = []
for I in getattr(self, '_openSockets', {}):
if self._socketFile(self._openSockets[I]) == errfileno:
closed_openSocks.append(I)
for each in closed_openSocks:
del self._openSockets[each]
def interrupt_wait(self,
signal_shutdown=False,
check_children=False):
self._shutdownSignalled |= signal_shutdown
self._checkChildren |= check_children
# Now generate a spurious connection to break out of the
# select.select loop. This is especially useful if a signal
# handler caused a message to be sent to myself: get the
# select loop to wakeup and process the message.
with closing(self.new_socket(
socket.socket,
*self.myAddress.addressDetails.socketArgs)) as ts:
try:
ts.setblocking(0)
ts.connect(*self.myAddress.addressDetails.connectArgs)
except Exception:
pass
def _scheduleTransmitActual(self, intent, has_exclusive_flag=False):
intent = self._forwardIfNeeded(intent)
if not intent:
return
if intent.targetAddr == self.myAddress:
self._processReceivedEnvelope(ReceiveEnvelope(intent.targetAddr,
intent.message),
has_exclusive_flag=has_exclusive_flag)
if not isinstance(intent.message, ForwardMessage):
self.interrupt_wait()
return self._finishIntent(intent)
intent.stage = self._XMITStepSendConnect
if self._nextTransmitStep(intent):
if hasattr(intent, 'socket'):
self._transmitIntents[intent.socket.fileno()] = intent
else:
self._waitingTransmits.append(intent)
def _finishIntent(self, intent, status=SendStatus.Sent):
if hasattr(intent, 'socket'):
if hasattr(self, '_openSockets'):
if not self._queue_intent_extra(intent):
if status == SendStatus.Sent:
opskey = opsKey(intent.targetAddr)
_safeSocketShutdown(self._openSockets.get(opskey, None))
self._openSockets[opskey] = IdleSocket(intent.socket,
intent.targetAddr)
# No need to restart a pending transmit for
# this target here; the main loop will check
# the waitingIntents and find/start the next one
# automatically.
else:
_safeSocketShutdown(intent)
# Here waiting intents need to be re-queued
# since otherwise they won't run until timeout
runnable, waiting = partition(
lambda I: I.targetAddr == intent.targetAddr,
self._waitingTransmits)
self._waitingTransmits = waiting
for R in runnable:
if status == SendStatus.DeadTarget:
self._finished_intents.append((R, status))
elif self._nextTransmitStep(R):
if hasattr(R, 'socket'):
thesplog('<S> waiting intent is now re-processing: %s', R.identify())
self._transmitIntents[R.socket.fileno()] = R
else:
self._waitingTransmits.append(R)
else:
_safeSocketShutdown(intent)
delattr(intent, 'socket')
self._finished_intents.append((intent, status))
return False # intent no longer needs to be attempted
def _queue_intent_extra(self, intent):
extraRead = getattr(intent, 'extraRead', None)
if not extraRead:
return False
incoming = TCPIncomingPersistent(intent.targetAddr,
intent.socket)
try:
incoming.addData(extraRead)
except Exception:
# Bad trailing data, so discard it.
thesplog('discarding bad trailing tx ack data')
return False
pendingIncoming = self._addedDataToIncoming(incoming)
if pendingIncoming:
self._incomingSockets[
pendingIncoming.socket.fileno()] = pendingIncoming
return True # socket is in-progress or was already handled
def _forwardIfNeeded(self, intent):
# Called when an intent is originally received to determine if
# the target address requires forwarding; if so, wrap the
# message in a ForwardMessage wrapper and set the routing
# path.
if intent.targetAddr == self.myAddress or \
isinstance(intent.message, ForwardMessage) or \
not isinstance(intent.targetAddr.addressDetails,
RoutedTCPv4ActorAddress):
return intent
# Replace None with our local admin and remove this actor
routing = [A or self._adminAddr
for A in intent.targetAddr.addressDetails.routing
if A != self.myAddress]
if self.txOnly and routing and \
(routing[0] != self._adminAddr) and \
self.myAddress != self._adminAddr:
routing.insert(0, self._adminAddr)
if not routing or routing == [intent.targetAddr]:
return intent
if intent.targetAddr.addressDetails.isLocalAddr():
return intent
newmsg = ForwardMessage(intent.message,
intent.targetAddr,
self.myAddress, routing)
newaddr = newmsg.fwdTargets[0]
if hasattr(self, '_addressMgr'):
newaddr, newmsg = self._addressMgr.prepMessageSend(newaddr, newmsg)
try:
isDead = newmsg == SendStatus.DeadTarget
except Exception:
isDead = False
if isDead:
# this is a DeadEnvelope or a ChildActorExited; drop
# it instead of recursing forever.
self._finishIntent(intent, SendStatus.Sent)
return None
# Changing the target addr to the next relay target for the
# transmit machinery, but the levels above may process
# completion based on the original target (e.g. systemCommon
# _checkNextTransmit), so add a completion operation that will
# reset the target back to the original (this occurs before
# other callbacks because callbacks are called in reverse
# order of addition).
intent.addCallback(lambda r, i, ta=intent.targetAddr:
i.changeTargetAddr(ta))
intent.changeMessage(newmsg)
intent.changeTargetAddr(newaddr)
intent.serMsg = self.serializer(intent)
return intent
def _nextTransmitStepCheck(self, intent, fileno, closed=False):
# Return True if this intent is still valid, False if it has
# been completed. If fileno is -1, this means check if there is
# time remaining still on this intent
if self._socketFile(intent) == fileno or \
(fileno == -1 and
intent.timeToRetry(hasattr(self, '_openSockets') and
opsKey(intent.targetAddr) in self._openSockets)):
if closed:
intent.stage = self._XMITStepRetry
return self._nextTransmitStep(intent)
if intent.expired():
# Transmit timed out (consider this permanent)
thesplog('Transmit attempt from %s to %s timed out'
', returning PoisonPacket',
self.myAddress, intent.targetAddr, level=logging.WARNING)
# self._incomingEnvelopes.append(ReceiveEnvelope(intent.targetAddr,
# PoisonPacket(intent.message)))
# Stop attempting this transmit
return self._finishIntent(intent, SendStatus.Failed)
# Continue to attempt this transmit
if not intent.delay():
return self._nextTransmitStep(intent)
return True
def _nextTransmitStep(self, intent):
# Return of True means keep waiting on this Transmit Intent;
# False means it is done
try:
return getattr(self, '_next_XMIT_%s' % intent.stage,
'_unknown_XMIT_step')(intent)
except Exception as ex:
import traceback
thesplog('xmit UNcaught exception %s; aborting intent.\n%s',
ex, traceback.format_exc(), level=logging.ERROR)
return False
def _next_XMIT_1(self, intent):
if hasattr(self, '_openSockets'):
opskey = opsKey(intent.targetAddr)
if opskey in self._openSockets:
intent.socket = self._openSockets[opskey].socket
# This intent takes the open socket; there should be only
# one intent per target but this "take" prevents an
# erroneous second target intent from causing corruption.
# The _finishIntent operation will return the socket to
# the _openSockets list. It's possible that both sides
# will simultaneously attempt to transmit, but this should
# be rare, and the effect will be that neither will get
# the expected ACK and will close the socket to be
# re-opened on the next retry period, which is a
# reasonable approach.
del self._openSockets[opskey]
intent.stage = self._XMITStepSendData
intent.amtSent = 0
return self._nextTransmitStep(intent)
# If there is an active or pending Intent for this target,
# just queue this one (by returning True)
if any(T for T in self._transmitIntents.values()
if T.targetAddr == intent.targetAddr and
hasattr(T, 'socket')):
intent.awaitingTXSlot()
return True
# Fall through to get a new Socket for this intent
if isinstance(intent.targetAddr.addressDetails,
TXOnlyAdminTCPv4ActorAddress) and \
intent.targetAddr != self._adminAddr:
# Cannot initiate outbound connection to this remote
# Admin; wait for incoming connection instead.
intent.backoffPause(True) # KWQ... not really
intent.stage = self._XMITStepRetry
return self._nextTransmitStep(intent)
intent.socket = self.new_socket(
socket.socket,
*intent.targetAddr .addressDetails.socketArgs)
intent.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
intent.socket.setblocking(0)
# Disable Nagle to transmit headers and acks asap; our sends
# are usually small
intent.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
intent.socket.connect(*intent.targetAddr
.addressDetails.connectArgs)
intent.socket.setblocking(0)
except socket.error as err:
# EINPROGRESS means non-blocking socket connect is in progress...
if not err_inprogress(err.errno):
thesplog('Socket connect failure %s to %s on %s'
' (returning %s)',
err, intent.targetAddr, intent.socket,
intent.completionCallback,
level=logging.WARNING)
return self._finishIntent(intent,
SendStatus.DeadTarget
if err_conn_refused(err)
else SendStatus.Failed)
except Exception as ex:
thesplog('Unexpected TCP socket connect exception: %s', ex,
level=logging.ERROR)
return self._finishIntent(intent, SendStatus.BadPacket)
intent.stage = self._XMITStepSendData # When connect completes
intent.amtSent = 0
return True
def _next_XMIT_2(self, intent):
# Got connected, ready to send
if not hasattr(intent, 'socket'):
intent.stage = self._XMITStepRetry
return self._nextTransmitStep(intent)
try:
intent.amtSent += intent.socket.send(
intent.serMsg[intent.amtSent:])
except socket.error as err:
if err_send_inprogress(err.errno):
intent.backoffPause(True)
return True
if err_send_connrefused(err):
# in non-blocking, sometimes connection attempts are
# discovered here rather than for the actual connect
# request.
thesplog('ConnRefused to %s; declaring as DeadTarget.',
intent.targetAddr, level=logging.INFO)
return self._finishIntent(intent, SendStatus.DeadTarget)
thesplog('Socket error sending to %s on %s: %s / %s: %s',
intent.targetAddr, intent.socket, str(err), err.errno,
intent, level=logging.ERROR)
intent.stage = self._XMITStepRetry
return self._nextTransmitStep(intent)
except Exception:
import traceback
thesplog('Error sending: %s', traceback.format_exc(),
level=logging.ERROR)
intent.stage = self._XMITStepRetry
return self._nextTransmitStep(intent)
if intent.amtSent >= len(intent.serMsg):
# After data is sent, stop transmit
intent.stage = self._XMITStepShutdownWrite
return True
def _next_XMIT_3(self, intent):
try:
pass
# Original did a socket shutdown for writing, but actual
# socket implementations aren't so sophisticated and this
# tended to stop all socket communications in both
# directions.
# intent.socket.shutdown(socket.SHUT_WR)
except socket.error:
# No shutdown handling, just close
intent.stage = self._XMITStepFinishCleanup
return self._nextTransmitStep(intent)
intent.ackbuf = ReceiveBuffer(serializer.loads)
intent.stage = self._XMITStepWaitForAck
return True
def _next_XMIT_4(self, intent):
# Actually, select below waited on readable, not writeable
try:
rcv = intent.socket.recv(intent.ackbuf.remainingAmount())
except socket.error as err:
if err_recv_retry(err.errno):
intent.backoffPause(True)
return True
if err_recv_connreset(err):
thesplog('Remote %s closed connection before ack received'
' at %s for %s',
str(intent.targetAddr), str(self.myAddress),
intent.identify(),
level=logging.INFO)
else:
thesplog('Socket Error waiting for transmit ack from'
' %s to %s: %s',
str(intent.targetAddr), str(self.myAddress), err,
level=logging.ERROR, exc_info=True)
rcv = '' # Remote closed connection
except Exception as err:
thesplog('General error waiting for transmit ack from'
' %s to %s: %s',
str(intent.targetAddr), str(self.myAddress), err,
level=logging.ERROR, exc_info=True)
rcv = '' # Remote closed connection
if not rcv:
# Socket closed. Reschedule transmit.
intent.backoffPause(True)
intent.stage = self._XMITStepRetry
return self._nextTransmitStep(intent)
return self._check_XMIT_4_done(intent, rcv)
def _check_XMIT_4_done(self, intent, rcv):
intent.ackbuf.addMore(rcv)
if not intent.ackbuf.isDone():
# Continue waiting for ACK
return True
compl = intent.ackbuf.completed()
if not compl:
# ACK/NAK was corrupted; retry.
intent.backoffPause(True)
intent.stage = self._XMITStepRetry
return self._nextTransmitStep(intent)
ackmsg, intent.extraRead = compl
if isControlMessage(ackmsg):
intent.result = SendStatus.Sent if ackmsg == ackPacket \
else SendStatus.BadPacket
intent.stage = self._XMITStepFinishCleanup
return self._nextTransmitStep(intent)
# Must have received a transmit packet from the remote;
# process as a received incoming.
intent.ackbuf.removeExtra()
if self._addedDataToIncoming(TCPIncomingPersistent(intent.targetAddr,
intent.socket,
intent.ackbuf),
True):
# intent.ackbuf.completed() said ackmsg was a full receive
# packet, but _addedDataToIncoming disagreed. This should
# NEVER HAPPEN.
thesplog('<<< Should never happen: '
'not full receive while waiting for ACK.'
' Aborting socket.',
level=logging.CRITICAL)
intent.ackbuf = ReceiveBuffer(serializer.loads)
intent.backoffPause(True)
intent.stage = self._XMITStepRetry
return self._nextTransmitStep(intent)
intent.ackbuf = ReceiveBuffer(serializer.loads)
nxtrcv = intent.extraRead
intent.extraRead = ''
return self._check_XMIT_4_done(intent, nxtrcv)
def _next_XMIT_5(self, intent):
return self._finishIntent(intent, intent.result)
def _next_XMIT_6(self, intent):
if hasattr(intent, 'socket'):
_safeSocketShutdown(intent)
delattr(intent, 'socket')
if hasattr(intent, 'ackbuf'):
delattr(intent, 'ackbuf')
if intent.retry():
intent.stage = self._XMITStepSendConnect
# stage just set won't be executed until retry delay times out
return True
return self._finishIntent(intent, SendStatus.Failed)
def _processIntents(self, filedesc, closed=False):
if filedesc in self._transmitIntents:
intent = self._transmitIntents[filedesc]
del self._transmitIntents[filedesc]
if self._nextTransmitStepCheck(intent, filedesc):
if hasattr(intent, 'socket'):
self._transmitIntents[intent.socket.fileno()] = intent
else:
self._waitingTransmits.append(intent)
return True
return False
def _processIntentTimeouts(self):
procIntents = list(self._transmitIntents.values())
waitIntents = list(self._waitingTransmits)
self._transmitIntents = {}
self._waitingTransmits = []
for intent in procIntents:
if hasattr(intent, '_pauseUntil') and not intent.expired():
self._transmitIntents[intent.socket.fileno()] = intent
continue
if self._nextTransmitStepCheck(intent, -1):
if hasattr(intent, 'socket'):
self._transmitIntents[intent.socket.fileno()] = intent
else:
self._waitingTransmits.append(intent)
for intent in waitIntents:
if self._nextTransmitStepCheck(intent, -1):
if hasattr(intent, 'socket'):
self._transmitIntents[intent.socket.fileno()] = intent
else:
self._waitingTransmits.append(intent)
@staticmethod
def _waitForSendable(sendIntent):
return sendIntent.stage != TCPTransport._XMITStepWaitForAck
@staticmethod
def _socketFile(sendOrRecv):
return sendOrRecv.socket.fileno() \
if getattr(sendOrRecv, 'socket', None) else None
def set_watch(self, watchlist):
self._watches = watchlist
def _runWithExpiry(self, incomingHandler):
xmitOnly = incomingHandler == TransmitOnly or \
isinstance(incomingHandler, TransmitOnly)
if hasattr(self, '_aborting_run'):
delattr(self, '_aborting_run')
while (not hasattr(self, '_aborting_run') or
(self._aborting_run and
(len(self._transmitIntents) > 0 or
len(self._waitingTransmits) > 0))):
ct = currentTime()
if self.run_time.view(ct).expired():
break
if xmitOnly:
if not self._transmitIntents and not self._waitingTransmits:
return 0
else:
while self._incomingEnvelopes:
rEnv = self._incomingEnvelopes.pop(0)
if incomingHandler is None:
return rEnv
r = Thespian__Run_HandlerResult(incomingHandler(rEnv))
if not r:
return r
# Socket management can happen on other threads that are
# attempting transmits, so acquire exclusive processing
# flag. Busy-cycle to obtain the flag since this is the
# primary thread.
with exclusive_processing(self):
wsend, wrecv = fmap(
TCPTransport._socketFile,
partition(TCPTransport._waitForSendable,
filter(lambda T: not T.backoffPause(),
self._transmitIntents.values())))
wrecv = list(filter(None, wrecv))
wsend = list(filter(None, wsend))
wrecv.extend(list(
filter(lambda I: not self._incomingSockets[I].backoffPause(),
filter(None, self._incomingSockets))))
if hasattr(self, '_openSockets'):
wrecv.extend(list(map(lambda s: s.socket.fileno(),
self._openSockets.values())))
delays = list(filter(None,
[self.run_time.view(ct).remaining()] +
[self._transmitIntents[T].delay(ct)
for T in self._transmitIntents] +
[W.delay() for W in self._waitingTransmits] +
[self._incomingSockets[I].delay(ct)
for I in self._incomingSockets]))
# n.b. if a long period of time has elapsed (e.g. laptop
# sleeping) then delays could be negative.
delay = max(0, timePeriodSeconds(min(delays))) if delays else None
if not xmitOnly:
wrecv.extend([self.socket.fileno()])
else:
# Windows does not support calling select with three
# empty lists, so as a workaround, supply the main
# listener if everything else is pending delays (or
# completed but unrealized) here, and ensure the main
# listener does not accept any listens below.
if not wrecv and not wsend:
if not hasattr(self, 'dummySock'):
self.dummySock = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
wrecv.extend([self.dummySock.fileno()])
if self._watches:
wrecv.extend(self._watches)
rrecv, rsend, rerr = [], [], []
try:
rrecv, rsend, rerr = select.select(wrecv, wsend,
set(wsend+wrecv), delay)
except (OSError, select.error, ValueError) as ex:
errnum = errno.EBADF if isinstance(ex, ValueError) \
else getattr(ex, 'errno', ex.args[0])
if err_select_retry(errnum):
# probably a change in descriptors
thesplog('select retry on %s', ex, level=logging.ERROR)
self._check_indicators()
continue
if err_bad_fileno(errnum):
# One of the selected file descriptors was bad,
# but no indication which one. It should not be
# one of the ones locally managed by this
# transport, so it's likely one of the
# user-supplied "watched" file descriptors. Find
# and remove it, then carry on.
if errnum == errno.EBADF:
bad = []
for each in self._watches:
try:
_ = select.select([each], [], [], 0)
except Exception:
bad.append(each)
if not bad:
thesplog('bad internal file descriptor!')
try:
_ = select.select([self.socket.fileno()], [], [], 0)
except Exception:
thesplog('listen %s is bad', self.socket.fileno)
rerr.append(self.socket.fileno)
for each in wrecv:
try:
_ = select.select([each], [], [], 0)
except Exception:
thesplog('wrecv %s is bad', each)
rerr.append(each)
for each in wsend:
try:
select.select([each], [], [], 0)
except Exception:
thesplog('wsend %s is bad', each)
rerr.append(each)
else:
self._watches = [W for W in self._watches if W not in bad]
continue
# If it was a regular file descriptor, fall through to clean it up.
else:
raise
with exclusive_processing(self): # modifying internal structures...
if rerr:
for errfileno in rerr:
self._cancel_fd_ops(errfileno)
origPendingSends = len(self._transmitIntents) + \
len(self._waitingTransmits)
# Get idleSockets before checking incoming and
# transmit; those latter may modify _openSockets
# (including replacing the element) so ensure that
# only the sockets indicated by select are processed,
# and only once each.
idleSockets = list(getattr(self, '_openSockets', {}).values())
# Handle newly receivable data
for each in rrecv:
# n.b. ignore this if trying to quiesce; may have had
# to supply this fd to avoid calling select with three
# empty lists.
if each == self.socket.fileno() and not xmitOnly:
self._acceptNewIncoming()
continue
if each in self._incomingSockets:
incoming = self._incomingSockets[each]
del self._incomingSockets[each]
incoming = self._handlePossibleIncoming(incoming, each,
has_exclusive_flag=True)
if incoming:
self._incomingSockets[
incoming.socket.fileno()] = incoming
continue
if self._processIntents(each):
continue
for idle in idleSockets:
rmtaddr = idle.rmtaddr
curOpen = getattr(self, '_openSockets', dict()).get(opsKey(rmtaddr), None)
if curOpen and curOpen != idle:
# duplicate sockets to remote, and this one is
# no longer tracked, so close it and keep
# existing openSocket.
_safeSocketShutdown(idle)
else:
fnum = None
try:
fnum = idle.socket.fileno()
except IOError as ex:
if not err_bad_fileno(ex.errno):
raise
if fnum is None or fnum in rrecv:
if hasattr(self, '_openSockets') and opsKey(rmtaddr) in self._openSockets:
del self._openSockets[opsKey(rmtaddr)]
if fnum:
incoming = self._handlePossibleIncoming(
TCPIncomingPersistent(rmtaddr, idle.socket),
fnum,
has_exclusive_flag=True)
if incoming:
self._incomingSockets[
incoming.socket.fileno()] = incoming
elif idle.expired():
_safeSocketShutdown(idle)
if hasattr(self, '_openSockets') and opsKey(rmtaddr) in self._openSockets:
del self._openSockets[opsKey(rmtaddr)]
# Handle newly sendable data. Handle this after
# receives since receives might create outbound
# traffic that we can immediately start processing.
for eachs in rsend:
self._processIntents(eachs)
# Handle timeouts. Do this after transmits in case
# the transmit can be completed just at the timeout
# boundary: we prefer success to timeout.
self._processIntentTimeouts()
rmvIncoming = []
for I in self._incomingSockets:
newI = self._handlePossibleIncoming(self._incomingSockets[I],
-1,
has_exclusive_flag=True)
if newI:
# newI will possibly be new incoming data, but
# it's going to use the same socket
self._incomingSockets[I] = newI
else:
rmvIncoming.append(I)
for I in rmvIncoming:
del self._incomingSockets[I]
watchready = [W for W in self._watches if W in rrecv]
if watchready:
self._incomingEnvelopes.append(
ReceiveEnvelope(self.myAddress, WatchMessage(watchready)))
# Initiate completion operations for transmits (which may
# result in other transmit calls).
senddone = self._finished_intents
self._finished_intents = []
for intent,sts in senddone:
intent.tx_done(sts)
# Check if it's time to quit
if [] == rrecv and [] == rsend:
if [] == rerr and self.run_time.view().expired():
# Timeout, give up
return Thespian__Run_Expired()
continue
if xmitOnly:
remXmits = len(self._transmitIntents) + \
len(self._waitingTransmits)
if origPendingSends > remXmits or remXmits == 0:
return remXmits
# Handle queued internal "received" data
if not xmitOnly:
while self._incomingEnvelopes:
rEnv = self._incomingEnvelopes.pop(0)
if incomingHandler is None:
return rEnv
r = Thespian__Run_HandlerResult(incomingHandler(rEnv))
if not r:
return r
return Thespian__Run_Terminated() \
if hasattr(self, '_aborting_run') else \
Thespian__Run_Expired()
def _check_indicators(self):
thesplog('_check_indicators %s / %s', self._checkChildren, self._shutdownSignalled, level=logging.ERROR)
if self._checkChildren:
self._checkChildren = False
self._incomingEnvelopes.append(
ReceiveEnvelope(self.myAddress, ChildMayHaveDied()))
if self._shutdownSignalled:
self._shutdownSignalled = False
self._incomingEnvelopes.append(
ReceiveEnvelope(self.myAddress, ActorExitRequest()))
def _acceptNewIncoming(self):
accepted = False
try:
lsock, rmtTxAddr = self.new_socket(self.socket.accept)
accepted = True
except (OSError, socket.error) as ex:
thesplog('Error accepting incoming: %s', ex)
thesplog('_accept call check', level=logging.ERROR)
self._check_indicators()
if not accepted or rmtTxAddr == self.myAddress:
self._incomingEnvelopes.append(Thespian__UpdateWork())
if not accepted:
return
lsock.setblocking(0)
# Disable Nagle to transmit headers and acks asap
lsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Note that the TCPIncoming is initially None.
# Due to the way sockets work, the transmit comes from a
# system-selected port that is different from the port that
# the remote Actor (or admin) is listening on (and which
# represents it's official ActorAddress). Once a successful
# message has been received, the message will indicate the
# originating address and the TCPIncoming object will be
# updated accordingly.
self._incomingSockets[lsock.fileno()] = (
(TCPIncomingPersistent
if hasattr(self, '_openSockets') else
TCPIncoming)
(ActorAddress(None), lsock))
def _handlePossibleIncoming(self, incomingSocket, fileno, closed=False,
has_exclusive_flag=False):
if closed:
# Remote closed, so unconditionally drop this socket
incomingSocket.close()
return None
elif incomingSocket.socket and \
(incomingSocket.socket.fileno() == fileno or
not incomingSocket.delay()):
return self._handleReadableIncoming(incomingSocket,
has_exclusive_flag=has_exclusive_flag)
else:
if not incomingSocket.delay():
# No more delay time left
incomingSocket.close()
return None
return incomingSocket
def _finishIncoming(self, incomingSocket, fromRealAddr):
# Only called if incomingSocket can continue to be used; if
# there was an error then incomingSocket should be closed and
# released.
fromAddr = incomingSocket.fromAddress
if fromAddr and isinstance(incomingSocket, TCPIncomingPersistent) \
and hasattr(self, '_openSockets'):
opskey = opsKey(fromAddr)
_safeSocketShutdown(self._openSockets.get(opskey, None))
self._openSockets[opskey] = IdleSocket(incomingSocket.socket,
fromAddr)
for T in self._transmitIntents.values():
if T.targetAddr == fromAddr and T.stage == self._XMITStepRetry:
T.retry(immediately=True)
# This intent will be picked up on the next
# timeout check in the main loop and
# processed; by waiting for main loop
# processing, fairness with read handling is
# allowed.
break
else:
incomingSocket.close()
return None
def _handleReadableIncoming(self, inc, has_exclusive_flag=False):
try:
rdata = inc.socket.recv(min(1024000, inc.remainingSize()))
inc.failCount = 0
except socket.error as e:
inc.failCount = getattr(inc, 'failCount', 0) + 1
if err_recv_inprogress(e.errno) and \
inc.failCount < MAX_CONSECUTIVE_READ_FAILURES:
inc.backoffPause(True)
return inc
inc.close()
return None
if not rdata:
# Since this point is only arrived at when select() says
# the socket is readable, this is an indicator of a closed
# socket. Since previous calls didn't detect
# receivedAllData(), this is an aborted/incomplete
# reception. Discard it.
inc.close()
return None
try:
inc.addData(rdata)
except Exception:
# Bad data, so discard it and close the socket.
thesplog('corrupted incoming data; closing socket',
level=logging.WARNING)
inc.close()
return None
return self._addedDataToIncoming(inc, has_exclusive_flag=has_exclusive_flag)
def _addedDataToIncoming(self, inc, skipFinish=False, has_exclusive_flag=False):
if not inc.receivedAllData():
# Continue running and monitoring this socket
return inc
rdata, extra = '', ''
try:
rdata, extra = inc.data
if isControlMessage(rdata):
raise ValueError('Error: received control message'
' "%s"; expecting incoming data.' %
(str(rdata)))
rEnv = ReceiveEnvelope(*rdata)
except Exception:
import traceback
thesplog('OUCH! Error deserializing received data:'
' %s (rdata="%s", extra="%s")',
traceback.format_exc(), rdata, extra,
level=logging.ERROR)
try:
inc.socket.send(ackDataErrMsg)
except Exception:
pass # socket will be closed anyhow; AckErr was a courtesy
inc.close()
return None
try:
inc.socket.send(ackMsg)
except socket.error as err:
if err_send_connreset(err):
thesplog('Remote %s closed socket before ACK could be sent',
inc.socket, level=logging.WARNING)
else:
raise
inc.fromAddress = rdata[0]
self._processReceivedEnvelope(rEnv, has_exclusive_flag=has_exclusive_flag)
if extra and isinstance(inc, TCPIncomingPersistent):
newinc = TCPIncomingPersistent(inc.fromAddress, inc.socket)
try:
newinc.addData(rdata)
except Exception:
# Bad trailing data, so discard it by doing nothing.
thesplog('discarding bad incoming trailing data')
pass
else:
return self._addedDataToIncoming(newinc, has_exclusive_flag=has_exclusive_flag)
if not skipFinish:
self._finishIncoming(inc, rEnv.sender)
return None
def _processReceivedEnvelope(self, envelope, has_exclusive_flag=False):
if not isinstance(envelope.message, ForwardMessage):
self._incomingEnvelopes.append(envelope)
return
if envelope.message.fwdTo == self.myAddress:
self._incomingEnvelopes.append(
ReceiveEnvelope(envelope.message.fwdFrom,
envelope.message.fwdMessage))
return
# The ForwardMessage has not reached the final destination, so
# update and target it at the next one.
if len(envelope.message.fwdTargets) < 1 and \
envelope.message.fwdTo != self.myAddress:
thesplog('Incorrectly received ForwardMessage destined for'
' %s at %s via %s: %s',
envelope.message.fwdTo, self.myAddress,
list(map(str, envelope.message.fwdTargets)),
envelope.message.fwdMessage,
level=logging.ERROR)
# discard (TBD: send back as Poison? DeadLetter? Routing failure)
return
nextTgt = envelope.message.fwdTargets[0]
envelope.message.fwdTargets = envelope.message.fwdTargets[1:]
self.scheduleTransmit(getattr(self, '_addressMgr', None),
TransmitIntent(nextTgt, envelope.message),
has_exclusive_flag=has_exclusive_flag)
def abort_run(self, drain=False):
self._aborting_run = drain
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment