Skip to content

Instantly share code, notes, and snippets.

@kquick
Created June 15, 2021 15:57
Show Gist options
  • Save kquick/71628639d57de2033f56bb1e606a9f4f to your computer and use it in GitHub Desktop.
Save kquick/71628639d57de2033f56bb1e606a9f4f to your computer and use it in GitHub Desktop.
import logging
from thespian.actors import *
from thespian.system.utilis import (thesplog, checkActorCapabilities,
foldl, join, fmap, AssocList,
actualActorClass)
from thespian.system.timing import ExpirationTimer, currentTime
from thespian.system.logdirector import LogAggregator
from thespian.system.admin.globalNames import GlobalNamesAdmin
from thespian.system.admin.adminCore import PendingSource
from thespian.system.transport import (TransmitIntent, ReceiveEnvelope,
Thespian__Run_Terminated)
from thespian.system.messages.admin import PendingActorResponse
from thespian.system.messages.convention import *
from thespian.system.sourceLoader import loadModuleFromHashSource
from thespian.system.transport.hysteresis import HysteresisDelaySender
from functools import partial
from datetime import (timedelta, datetime)
import os
import socket
from thespian.system.transport.IPBase import (TCPv4ActorAddress)
#TODO - Perhaps this should be configurable?
#CONVENTION_REREGISTRATION_PERIOD = timedelta(minutes=7, seconds=22)
#CONVENTION_RESTART_PERIOD = timedelta(minutes=3, seconds=22)
CONVENTION_REREGISTRATION_PERIOD = timedelta(minutes=1, seconds=00)
CONVENTION_RESTART_PERIOD = timedelta(minutes=1, seconds=00)
#TODO - Perhaps this should be configurable?
CONVENTION_REGISTRATION_MISS_MAX = 1 # # of missing convention registrations before death declared
CONVENTION_REINVITE_ADJUSTMENT = 1.1 # multiply by remote checkin expected time for new invite timeout period
CURR_CONV_ADDR_IPV4 = 'Convention Address.IPv4'
def convention_reinvite_adjustment(t):
try:
return t * CONVENTION_REINVITE_ADJUSTMENT
except TypeError:
# Python2 cannot multiply timedelta by a float, so take a longer route
return t + (t / int(1 / (CONVENTION_REINVITE_ADJUSTMENT % 1)))
class PreRegistration(object):
def __init__(self):
self.pingValid = ExpirationTimer(timedelta(seconds=0))
self.pingPending = False
def refresh(self):
self.pingValid = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD)
class ConventionMemberData(object):
def __init__(self, address, capabilities, preRegOnly=False):
self.remoteAddress = address
self.remoteCapabilities = capabilities
self.hasRemoteActors = [] # (localParent, remoteActor) addresses created remotely
# The preRegOnly field indicates that this information is only
# from a pre-registration.
self.preRegOnly = preRegOnly
# preRegistered is not None if the ConventionRegister has the
# preRegister flag set. This indicates a call from
# preRegisterRemoteSystem. The pingValid is only used for
# preRegistered systems and is used to determine how long an
# active check of the preRegistered remote is valid. If
# pingValid is expired, the local attempts to send a
# QueryExists message (which will retry) and a QueryAck will
# reset pingValid to another CONVENTION_REGISTRATION_PERIOD.
# The pingPending is true while the QueryExists is pending and
# will suppress additional pingPending messages. A success or
# failure completion of a QueryExists message will reset
# pingPending to false. Note that pinging occurs continually
# for a preRegistered remote, regardless of whether or not its
# Convention membership is currently valid.
self.preRegistered = None # or PreRegistration object
self._reset_valid_timer()
@property
def permanentEntry(self):
return bool(self.preRegOnly or self.preRegistered)
def createdActor(self, localParentAddress, newActorAddress):
entry = localParentAddress, newActorAddress
if entry not in self.hasRemoteActors:
self.hasRemoteActors.append(entry)
def refresh(self, remoteCapabilities, preReg=False):
self.remoteCapabilities = remoteCapabilities
self._reset_valid_timer()
if self.preRegistered:
self.preRegistered.refresh()
def _reset_valid_timer(self):
# registryValid is a timer that is usually set to a multiple
# of the convention re-registration period. Each successful
# convention re-registration resets the timer to the maximum
# value (actually, it replaces this structure with a newly
# generated structure). If the timer expires, the remote is
# declared as dead and the registration is removed (or
# quiesced if it is a pre-registration).
self.registryValid = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD *
CONVENTION_REGISTRATION_MISS_MAX)
def __str__(self):
return 'ActorSystem @ %s%s, registry valid for %s with %s' % (
str(self.remoteAddress),
(' (prereg-only)' if self.preRegOnly else
(' (prereg)' if self.preRegistered else '')),
str(self.registryValid),
str(self.remoteCapabilities))
class HysteresisCancel(object):
def __init__(self, cancel_addr):
self.cancel_addr = cancel_addr
class HysteresisSend(TransmitIntent): pass
class LostRemote(object):
# tells transport to reset (close sockets, drop buffers, etc.)
def __init__(self, lost_addr):
self.lost_addr = lost_addr
class LocalConventionState(object):
def __init__(self, myAddress, capabilities, sCBStats, getAllConventionAddressesFunc):
self._myAddress = myAddress
self._capabilities = capabilities
self._sCBStats = sCBStats
self._conventionMembers = AssocList() # key=Remote Admin Addr, value=ConventionMemberData
self._conventionNotificationHandlers = []
self._convntn_ipv4_marker = 0
#Rename?
self._getConventionAddr = getAllConventionAddressesFunc
#self._getConventionAddr = getConventionAddressFunc
#self._resetConventionAddrMarker = resetConventionAddressMarkerFunc
#self._conventionAddress = getConventionAddressFunc(capabilities)
self._conventionAddresses = getAllConventionAddressesFunc(capabilities)
self._currentLeaderIdx = 0
self._conventionAddress = getAllConventionAddressesFunc(capabilities)[self._convntn_ipv4_marker]
self._conventionRegistration = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD)
self._has_been_activated = False
self._invited = False # entered convention as a result of an explicit invite
self._current_avlbl_leaders = []
# if isinstance(self._capabilities.get(CURR_CONV_ADDR_IPV4), list):
# self._current_avlbl_leaders = list(map(self._populate_initial_leaders, self._capabilities.get(CURR_CONV_ADDR_IPV4)))
self._avlbl_leader_last_knwn_ts = int(datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3])
@property
def myAddress(self):
return self._myAddress
@property
def capabilities(self):
return self._capabilities
def updateStatusResponse(self, resp):
resp.setConventionLeaderAddress(self.conventionLeaderAddr)
#TODO - set convention supporters here
resp.setConventionRegisterTime(self._conventionRegistration)
for each in self._conventionMembers.values():
resp.addConventioneer(each.remoteAddress, each.registryValid)
resp.setNotifyHandlers(self._conventionNotificationHandlers)
def active_in_convention(self):
# If this is the convention leader, it is automatically
# active, otherwise this convention member should have a
# convention leader and that leader should have an active
# entry in the _conventionMembers table (indicating it has
# updated this system with its information)
return bool(self.conventionLeaderAddr and
self._conventionMembers.find(self.conventionLeaderAddr))
@property
def conventionLeaderAddr(self):
return self._conventionAddress
def isConventionLeader(self):
# Might also be the leader if self.conventionLeaderAddr is None
return self.conventionLeaderAddr == self.myAddress
def _populate_initial_leaders(self, convention_leader):
thesplog(' _populate_initial_leaders: %s', convention_leader, level=logging.WARNING)
curr_leader = {}
curr_leader['convention_alias'] = convention_leader[0]
curr_leader['admin_port'] = convention_leader[1]
# Status is DOWN unless confirmed otherwise
curr_leader['status'] = 'DOWN'
# Defaulted to current system up (UTC)
curr_leader['last_known_ts'] = int(datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3])
return curr_leader
def capabilities_have_changed(self, new_capabilities):
self._capabilities = new_capabilities
return self.setup_convention()
def setup_convention(self, activation=False):
thesplog(' setup_convention:entry, activation: %s', activation, level=logging.WARNING)
self._has_been_activated |= activation
rmsgs = []
# If not specified in capabilities, don't override any invites
# that may have been received.
self._conventionAddress = self._getConventionAddr(self.capabilities)[self._convntn_ipv4_marker] or \
self._conventionAddress
leader_is_gone = (self._conventionMembers.find(self.conventionLeaderAddr) is None) \
if self.conventionLeaderAddr else True
thesplog(' isConventionLeader:%s, conventionLeaderAddr: %s', self.isConventionLeader(), self.conventionLeaderAddr, level=logging.WARNING)
if not self.isConventionLeader() and self.conventionLeaderAddr:
thesplog('Admin registering with Convention @ %s (%s)',
self.conventionLeaderAddr,
'first time' if leader_is_gone else 're-registering',
level=logging.INFO, primary=True)
rmsgs.append(
HysteresisSend(self.conventionLeaderAddr,
ConventionRegister(self.myAddress,
self.capabilities,
leader_is_gone),
onSuccess = self._setupConventionCBGood,
onError = self._setupConventionCBError))
rmsgs.append(LogAggregator(self.conventionLeaderAddr))
# Check if this is part of convention leaders
thesplog(' I am: %s', self.myAddress, level=logging.WARNING)
if isinstance(self.capabilities.get(CURR_CONV_ADDR_IPV4), list):
# for curr_entry in self.capabilities.get(CURR_CONV_ADDR_IPV4):
for curr_entry in self._conventionAddresses:
# if socket.gethostname() == socket.getfqdn(curr_entry[0]):
thesplog(' checking: %s (%s)', curr_entry, curr_entry == self.myAddress, level=logging.WARNING)
if curr_entry != self.myAddress:
thesplog(' Telling other leader %s I am available', curr_entry, level=logging.WARNING)
rmsgs.append(TransmitIntent(self.myAddress, NewLeaderAvailable(self.myAddress, curr_entry)))
self._conventionRegistration = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD)
return rmsgs
def _setupConventionCBGood(self, result, finishedIntent):
self._sCBStats.inc('Admin Convention Registered')
if hasattr(self, '_conventionLeaderMissCount'):
delattr(self, '_conventionLeaderMissCount')
def _setupConventionCBError(self, result, finishedIntent):
self._sCBStats.inc('Admin Convention Registration Failed')
if hasattr(self, '_conventionLeaderMissCount'):
self._conventionLeaderMissCount += 1
else:
self._conventionLeaderMissCount = 1
thesplog('Admin cannot register with convention @ %s (miss %d): %s',
finishedIntent.targetAddr,
self._conventionLeaderMissCount,
result, level=logging.WARNING, primary=True)
def got_convention_invite(self, sender):
thesplog('****** got_convention_invite from %s', sender, level=logging.WARNING)
self._conventionAddress = sender
self._invited = True
return self.setup_convention()
def got_convention_register(self, regmsg):
# Called when remote convention member has sent a ConventionRegister message
self._sCBStats.inc('Admin Handle Convention Registration')
if self._invited and not self.conventionLeaderAddr:
# Lost connection to an invitation-only convention.
# Cannot join again until another invitation is received.
return []
# Registrant may re-register if changing capabilities
rmsgs = []
registrant = regmsg.adminAddress
prereg = getattr(regmsg, 'preRegister', False) # getattr used; see definition
existing = self._conventionMembers.find(registrant)
thesplog('****** Got Convention %sregistration from %s (%s) (new? %s)',
'pre-' if prereg else '',
registrant,
'first time' if regmsg.firstTime else 're-registering',
not existing,
level=logging.WARNING)
if registrant == self.myAddress:
# Either remote failed getting an external address and is
# using 127.0.0.1 or else this is a malicious attempt to
# make us talk to ourselves. Ignore it.
thesplog('Convention registration from %s is an invalid address; ignoring.',
registrant,
level=logging.WARNING)
return rmsgs
thesplog(" whoami: %s", self.myAddress, level=logging.WARNING)
if isinstance(self.capabilities.get(CURR_CONV_ADDR_IPV4), list):
# for curr_entry in self.capabilities.get(CURR_CONV_ADDR_IPV4):
for curr_entry in self._conventionAddresses:
# if socket.gethostname() == socket.getfqdn(curr_entry[0]):
thesplog(" curr check of %s (%s)", curr_entry, curr_entry == self.myAddress, level=logging.WARNING)
if curr_entry == self.myAddress:
thesplog(' Post convention_register, new leader %s identified', curr_entry, level=logging.WARNING)
rmsgs.append(TransmitIntent(self.myAddress, NewLeaderAvailable(self.myAddress, curr_entry)))
break
else:
thesplog(" no new leader found here", level=logging.ERROR)
existingPreReg = (
# existing.preRegOnly
# or existing.preRegistered
existing.permanentEntry
) if existing else False
notify = (not existing or existing.preRegOnly) and not prereg
if regmsg.firstTime or not existing:
if existing:
existing = None
notify = not prereg
rmsgs.extend(self._remote_system_cleanup(registrant))
newmember = ConventionMemberData(registrant,
regmsg.capabilities,
prereg)
if prereg or existingPreReg:
newmember.preRegistered = PreRegistration()
self._conventionMembers.add(registrant, newmember)
else:
existing.refresh(regmsg.capabilities, prereg or existingPreReg)
if not prereg:
existing.preRegOnly = False
if not self.isConventionLeader():
self._conventionRegistration = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD)
# Convention Members normally periodically initiate a
# membership message, to which the leader confirms by
# responding; if this was a pre-registration, that identifies
# this system as the "leader" for that remote. Also, if the
# remote sent this because it was a pre-registration leader,
# it doesn't yet have all the member information so the member
# should respond.
#if self.isConventionLeader() or prereg or regmsg.firstTime:
if prereg:
rmsgs.append(HysteresisCancel(registrant))
rmsgs.append(TransmitIntent(registrant, ConventionInvite()))
elif (self.isConventionLeader() or prereg or regmsg.firstTime or \
(existing and existing.permanentEntry)):
# If we are the Convention Leader, this would be the point to
# inform all other registrants of the new registrant. At
# present, there is no reciprocity here, so just update the
# new registrant with the leader's info.
rmsgs.append(
TransmitIntent(registrant,
ConventionRegister(self.myAddress,
self.capabilities)))
if notify:
rmsgs.extend(self._notifications_of(
ActorSystemConventionUpdate(registrant,
regmsg.capabilities,
True)))
return rmsgs
def _notifications_of(self, msg):
return [TransmitIntent(H, msg) for H in self._conventionNotificationHandlers]
def add_notification_handler(self, addr):
if addr not in self._conventionNotificationHandlers:
self._conventionNotificationHandlers.append(addr)
# Now update the registrant on the current state of all convention members
return [TransmitIntent(addr,
ActorSystemConventionUpdate(M.remoteAddress,
M.remoteCapabilities,
True))
for M in self._conventionMembers.values()
if not M.preRegOnly]
return []
def remove_notification_handler(self, addr):
self._conventionNotificationHandlers = [
H for H in self._conventionNotificationHandlers
if H != addr]
def got_convention_deregister(self, deregmsg):
self._sCBStats.inc('Admin Handle Convention De-registration')
remoteAdmin = deregmsg.adminAddress
if remoteAdmin == self.myAddress:
# Either remote failed getting an external address and is
# using 127.0.0.1 or else this is a malicious attempt to
# make us talk to ourselves. Ignore it.
thesplog('Convention deregistration from %s is an invalid address; ignoring.',
remoteAdmin,
level=logging.WARNING)
rmsgs = []
if getattr(deregmsg, 'preRegistered', False): # see definition for getattr use
existing = self._conventionMembers.find(remoteAdmin)
if existing:
existing.preRegistered = None
rmsgs.append(TransmitIntent(remoteAdmin, ConventionDeRegister(self.myAddress)))
return rmsgs + self._remote_system_cleanup(remoteAdmin)
def got_system_shutdown(self):
return self.exit_convention()
def exit_convention(self):
self.invited = False
gen_ops = lambda addr: [HysteresisCancel(addr),
TransmitIntent(addr,
ConventionDeRegister(self.myAddress)),
]
terminate = lambda a: [ self._remote_system_cleanup(a), gen_ops(a) ][-1]
if self.conventionLeaderAddr and \
self.conventionLeaderAddr != self.myAddress:
thesplog('Admin de-registering with Convention @ %s',
str(self.conventionLeaderAddr),
level=logging.INFO, primary=True)
# Cache convention leader address because it might get reset by terminate()
claddr = self.conventionLeaderAddr
terminate(self.conventionLeaderAddr)
return gen_ops(claddr)
return join(fmap(terminate,
[M.remoteAddress
for M in self._conventionMembers.values()
if M.remoteAddress != self.myAddress]))
def check_convention(self):
ct = currentTime()
rmsgs = []
thesplog(' *** check convention ***', level=logging.DEBUG)
if self._has_been_activated:
rmsgs = foldl(lambda x, y: x + y,
[self._check_preregistered_ping(ct, member)
for member in self._conventionMembers.values()],
self._convention_leader_checks(ct)
if self.isConventionLeader() or
not self.conventionLeaderAddr else
self._convention_member_checks(ct))
if self._conventionRegistration.view(ct).expired():
self._conventionRegistration = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD)
return rmsgs
def _convention_leader_checks(self, ct):
return foldl(lambda x, y: x + y,
[self._missed_checkin_remote_cleanup(R)
for R in [ member
for member in self._conventionMembers.values()
if member.registryValid.view(ct).expired() ]],
[])
def _missed_checkin_remote_cleanup(self, remote_member):
thesplog('%s missed %d checkins (%s); assuming it has died',
str(remote_member),
CONVENTION_REGISTRATION_MISS_MAX,
str(remote_member.registryValid),
level=logging.WARNING, primary=True)
return self._remote_system_cleanup(remote_member.remoteAddress)
#TODO - This is where we need to put re-elect leader logic
def _convention_member_checks(self, ct):
rmsgs = []
thesplog(' *** convention member checks ***', level=logging.DEBUG)
# Re-register with the Convention if it's time
if self.conventionLeaderAddr and \
self._conventionRegistration.view(ct).expired():
if getattr(self, '_conventionLeaderMissCount', 0) >= \
CONVENTION_REGISTRATION_MISS_MAX:
thesplog('Admin convention registration lost @ %s (miss %d)',
self.conventionLeaderAddr,
self._conventionLeaderMissCount,
level=logging.WARNING, primary=True)
rmsgs.extend(self._remote_system_cleanup(self.conventionLeaderAddr))
#TODO - This is where we need to put re-elect leader logic
new_leader_actr_addr = self._initiate_re_election()
# new_leader_actr_addr = ActorAddress(TCPv4ActorAddress(
# self._initiate_re_election(),
# 1900,
# external=True))
rmsgs.append(TransmitIntent(new_leader_actr_addr, ConventionRegister(self.myAddress, self.capabilities, True)))
self._conventionLeaderMissCount = 0
else:
rmsgs.extend(self.setup_convention())
return rmsgs
def _initiate_re_election(self):
# curr_ldr_ip = str(self.conventionLeaderAddr).split('|')[1].split(':')[0]
curr_ldr_addr = self._conventionAddresses[self._currentLeaderIdx]
thesplog(" re-elect away from %s to one of: %s", curr_ldr_addr, list(map(str, self._current_avlbl_leaders)), level=logging.WARNING)
self._current_avlbl_leaders = [a for a in self._current_avlbl_leaders if a != curr_ldr_addr]
for (idx,each) in enumerate(self._conventionAddresses):
if each in self._current_avlbl_leaders:
self._currentLeaderIdx = idx
thesplog(' Selected %s (%d) as next new leader', each, idx, level=logging.WARNING)
return each
thesplog('NO ACTIVE LEADERS AVAILABLE', level=logging.ERROR)
return None
# new_ldr = ''
# for each in self._current_avlbl_leaders:
# if curr_ldr_ip == socket.gethostbyname(socket.getfqdn(each['convention_alias'])):
# thesplog(' Marked current leader %s as DOWN', curr_ldr_ip, level=logging.DEBUG)
# each['status'] = 'DOWN'
# each['last_known_ts'] = int(datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3])
# break
# thesplog(' Current leader status :', level=logging.DEBUG)
# for each in self._current_avlbl_leaders:
# thesplog(' %s', each, level=logging.DEBUG)
# for idx, curr_ldr in enumerate(self._current_avlbl_leaders):
# if curr_ldr['status'] == 'UP':
# new_ldr = curr_ldr['convention_alias']
# self._conventionAddress = curr_ldr['convention_alias']
# self._convntn_ipv4_marker = idx
# thesplog(' Selected %s as next new leader', new_ldr, level=logging.DEBUG)
# break
# return socket.gethostbyname(socket.getfqdn(new_ldr))
def _check_preregistered_ping(self, ct, member):
if member.preRegistered and \
member.preRegistered.pingValid.view(ct).expired() and \
not member.preRegistered.pingPending:
member.preRegistered.pingPending = True
# If remote misses a checkin, re-extend the
# invitation. This also helps re-initiate a socket
# connection if a TxOnly socket has been lost.
member.preRegistered.pingValid = ExpirationTimer(
convention_reinvite_adjustment(
CONVENTION_RESTART_PERIOD
if member.registryValid.view(ct).expired()
else CONVENTION_REREGISTRATION_PERIOD))
return [HysteresisSend(member.remoteAddress,
ConventionInvite(),
onSuccess = self._preRegQueryNotPending,
onError = self._preRegQueryNotPending)]
return []
def _preRegQueryNotPending(self, result, finishedIntent):
remoteAddr = finishedIntent.targetAddr
member = self._conventionMembers.find(remoteAddr)
if member and member.preRegistered:
member.preRegistered.pingPending = False
def _remote_system_cleanup(self, registrant):
"""Called when a RemoteActorSystem has exited and all associated
Actors should be marked as exited and the ActorSystem
removed from Convention membership. This is also called on
a First Time connection from the remote to discard any
previous connection information.
"""
thesplog('Convention cleanup or deregistration for %s (known? %s)',
registrant,
bool(self._conventionMembers.find(registrant)),
level=logging.INFO)
rmsgs = [LostRemote(registrant)]
cmr = self._conventionMembers.find(registrant)
if not cmr or cmr.preRegOnly:
return []
# Send exited notification to conventionNotificationHandler (if any)
for each in self._conventionNotificationHandlers:
rmsgs.append(
TransmitIntent(each,
ActorSystemConventionUpdate(cmr.remoteAddress,
cmr.remoteCapabilities,
False))) # errors ignored
# If the remote ActorSystem shutdown gracefully (i.e. sent
# a Convention Deregistration) then it should not be
# necessary to shutdown remote Actors (or notify of their
# shutdown) because the remote ActorSystem should already
# have caused this to occur. However, it won't hurt, and
# it's necessary if the remote ActorSystem did not exit
# gracefully.
for lpa, raa in cmr.hasRemoteActors:
# ignore errors:
rmsgs.append(TransmitIntent(lpa, ChildActorExited(raa)))
# n.b. at present, this means that the parent might
# get duplicate notifications of ChildActorExited; it
# is expected that Actors can handle this.
# Remove remote system from conventionMembers
if not cmr.preRegistered:
if registrant == self.conventionLeaderAddr and self._invited:
self._conventionAddress = None
# Don't clear invited: once invited, that
# perpetually indicates this should be only a
# member and never a leader.
self._conventionMembers.rmv(registrant)
else:
# This conventionMember needs to stay because the
# current system needs to continue issuing
# registration pings. By setting the registryValid
# expiration to forever, this member won't re-time-out
# and will therefore be otherwise ignored... until it
# registers again at which point the membership will
# be updated with new settings.
cmr.registryValid = ExpirationTimer(None)
cmr.preRegOnly = True
return rmsgs + [HysteresisCancel(registrant)]
def sentByRemoteAdmin(self, envelope):
for each in self._conventionMembers.values():
if envelope.sender == each.remoteAddress:
return True
return False
def convention_inattention_delay(self, current_time):
return (self._conventionRegistration or
ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD
if self.active_in_convention() or
self.isConventionLeader() else
CONVENTION_RESTART_PERIOD)).view(current_time)
def forward_pending_to_remote_system(self, childClass, envelope, sourceHash, acceptsCaps):
alreadyTried = getattr(envelope.message, 'alreadyTried', [])
ct = currentTime()
remoteCandidates = [
K
for K in self._conventionMembers.values()
if not K.registryValid.view(ct).expired()
and K.remoteAddress != envelope.sender # source Admin
and K.remoteAddress not in alreadyTried
and acceptsCaps(K.remoteCapabilities)]
if not remoteCandidates:
if self.isConventionLeader() or not self.conventionLeaderAddr:
raise NoCompatibleSystemForActor(
childClass,
'No known ActorSystems can handle a %s for %s',
childClass, envelope.message.forActor)
# Let the Convention Leader try to find an appropriate ActorSystem
bestC = self.conventionLeaderAddr
else:
# distribute equally amongst candidates
C = [(K.remoteAddress, len(K.hasRemoteActors))
for K in remoteCandidates]
bestC = foldl(lambda best,possible:
best if best[1] <= possible[1] else possible,
C)[0]
thesplog('Requesting creation of %s%s on remote admin %s',
envelope.message.actorClassName,
' (%s)'%sourceHash if sourceHash else '',
bestC)
if bestC not in alreadyTried:
# Don't send request to this remote again, it has already
# been tried. This would also be indicated by that system
# performing the add of self.myAddress as below, but if
# there is disagreement between the local and remote
# addresses, this addition will prevent continual
# bounceback.
alreadyTried.append(bestC)
if self.myAddress not in alreadyTried:
# Don't send request back to this actor system: it cannot
# handle it
alreadyTried.append(self.myAddress)
envelope.message.alreadyTried = alreadyTried
return [TransmitIntent(bestC, envelope.message)]
def send_to_all_members(self, message, exception_list=None):
thesplog(' ### send_to_all_members: entry ###', level=logging.DEBUG)
return [HysteresisSend(M.remoteAddress, message)
for M in self._conventionMembers.values()
if M.remoteAddress not in (exception_list or [])]
class ConventioneerAdmin(GlobalNamesAdmin):
"""Extends the AdminCore+GlobalNamesAdmin with ActorSystem Convention
functionality to support multi-host configurations.
"""
def __init__(self, *args, **kw):
super(ConventioneerAdmin, self).__init__(*args, **kw)
self._cstate = LocalConventionState(
self.myAddress,
self.capabilities,
self._sCBStats,
getattr(self.transport, 'getAllConventionAddresses', lambda c: None))
#getattr(self.transport, 'resetConventionAddressMarker', lambda c: None))
self._hysteresisSender = HysteresisDelaySender(self._send_intent)
def _updateStatusResponse(self, resp):
self._cstate.updateStatusResponse(resp)
super(ConventioneerAdmin, self)._updateStatusResponse(resp)
def _activate(self):
# Called internally when this ActorSystem has been initialized
# and should be activated for operations.
super(ConventioneerAdmin, self)._activate()
if self.isShuttingDown(): return
self._performIO(self._cstate.setup_convention(True))
def h_ConventionInvite(self, envelope):
if self.isShuttingDown(): return
self._performIO(self._cstate.got_convention_invite(envelope.sender))
return True
def h_ConventionRegister(self, envelope):
if self.isShuttingDown(): return
self._performIO(self._cstate.got_convention_register(envelope.message))
if envelope.sender in self._cstate._conventionAddresses:
if envelope.sender not in self._cstate._current_avlbl_leaders:
self._cstate._current_avlbl_leaders.append(envelope.sender)
return True
def h_ConventionDeRegister(self, envelope):
self._performIO(self._cstate.got_convention_deregister(envelope.message))
if envelope.sender in self._cstate._conventionAddresses:
self._cstate._current_avlbl_leaders = [ a for a in self._cstate._current_avlbl_leaders if a != envelope.sender]
return True
def h_NewLeaderAvailable(self, envelope):
thesplog(' NewLeaderAvailable: sender: %s, message: %s', \
envelope.sender, str(envelope.message), \
level=logging.DEBUG)
if envelope.sender not in self._cstate._current_avlbl_leaders:
self._cstate._current_avlbl_leaders.append(envelope.sender)
# if socket.gethostname() == socket.getfqdn(envelope.message.conventionAlias[0]):
# if envelope.message.lastKnownTS > self._cstate._avlbl_leader_last_knwn_ts:
# thesplog(' Assigning new time stamp %s', str(envelope.message.lastKnownTS), level=logging.DEBUG)
# self._cstate._avlbl_leader_last_knwn_ts = envelope.message.lastKnownTS
# else:
# thesplog(' Got a self-message (timestamp= %s), skipping', str(envelope.message.currentTag), level=logging.DEBUG)
# self._current_leader_stats()
# return False
# for curr_leader in self._cstate._current_avlbl_leaders:
# if curr_leader['convention_alias'] == envelope.message.conventionAlias[0]:
# curr_leader['status'] = 'UP'
# curr_leader['last_known_ts'] = envelope.message.lastKnownTS
# thesplog(' Updated status and timestamp for %s', envelope.message.conventionAlias, level=logging.DEBUG)
# break
self._current_leader_stats()
# self._performIO(self._cstate.send_to_all_members(envelope.message, [envelope.sender]))
thesplog(' NewLeaderAvailable: exit', level=logging.WARNING)
return True
def h_SystemShutdown(self, envelope):
self._performIO(self._cstate.got_system_shutdown())
return super(ConventioneerAdmin, self).h_SystemShutdown(envelope)
return True
def _performIO(self, iolist):
thesplog(' <<<_performIO', level=logging.DEBUG)
for msg in iolist:
if isinstance(msg, HysteresisCancel):
self._hysteresisSender.cancelSends(msg.cancel_addr)
elif isinstance(msg, HysteresisSend):
#self._send_intent(msg)
self._hysteresisSender.sendWithHysteresis(msg)
elif isinstance(msg, LogAggregator):
if getattr(self, 'asLogger', None):
thesplog('Setting log aggregator of %s to %s', self.asLogger, msg.aggregatorAddress)
self._send_intent(TransmitIntent(self.asLogger, msg))
elif isinstance(msg, LostRemote):
if hasattr(self.transport, 'lostRemote'):
self.transport.lostRemote(msg.lost_addr)
else:
self._send_intent(msg)
thesplog(' >>>', level=logging.DEBUG)
def _current_leader_stats(self):
thesplog(' Current leader stats', level=logging.DEBUG)
for each in self._cstate._current_avlbl_leaders:
thesplog(' %s', each, level=logging.DEBUG)
#Only relevant for convention leaders
thesplog(' Last known TS: %s', str(self._cstate._avlbl_leader_last_knwn_ts), level=logging.DEBUG)
def run(self):
# Main loop for convention management. Wraps the lower-level
# transport with a stop at the next needed convention
# registration period to re-register.
transport_continue = True
try:
while not getattr(self, 'shutdown_completed', False) and \
not isinstance(transport_continue, Thespian__Run_Terminated):
ct = currentTime()
delay = min(self._cstate.convention_inattention_delay(ct),
ExpirationTimer(None).view(ct) if self._hysteresisSender.delay.expired() else
self._hysteresisSender.delay
)
# n.b. delay does not account for soon-to-expire
# pingValids, but since delay will not be longer than
# a CONVENTION_REREGISTRATION_PERIOD, the worst case
# is a doubling of a pingValid period (which should be fine).
thesplog(' <<<Running at %s', self.myAddress, level=logging.DEBUG)
transport_continue = self.transport.run(self.handleIncoming,
delay.remaining())
# Check Convention status based on the elapsed time
self._performIO(self._cstate.check_convention())
self._hysteresisSender.checkSends()
self._remove_expired_sources()
thesplog('>>>', level=logging.DEBUG)
except Exception as ex:
import traceback
thesplog('ActorAdmin uncaught exception: %s', traceback.format_exc(),
level=logging.ERROR, exc_info=True)
thesplog('Admin time to die', level=logging.DEBUG)
# ---- Source Hash Transfers --------------------------------------------------
def h_SourceHashTransferRequest(self, envelope):
sourceHash = envelope.message.sourceHash
src = self._sources.get(sourceHash, None)
if not src or not src.source_valid:
self._send_intent(
TransmitIntent(envelope.sender,
SourceHashTransferReply(sourceHash)))
else:
# Older requests did not have the prefer_original field;
# maintain backward compatibility
orig = getattr(envelope.message, 'prefer_original', False)
self._send_intent(
TransmitIntent(
envelope.sender,
SourceHashTransferReply(
sourceHash,
src.orig_data if orig else src.zipsrc,
src.srcInfo,
original_form = orig)))
return True
def h_SourceHashTransferReply(self, envelope):
sourceHash = envelope.message.sourceHash
if sourceHash not in self._sources:
return True
if envelope.message.isValid():
# nb.. original_form added; use getattr for backward compatibility
if getattr(envelope.message, 'original_form', False):
if self._sourceAuthority:
self._send_intent(
TransmitIntent(
self._sourceAuthority,
ValidateSource(sourceHash,
envelope.message.sourceData,
getattr(envelope.message,
'sourceInfo', None))))
return True
else:
self._loadValidatedActorSource(sourceHash,
envelope.message.sourceData,
# sourceInfo added; backward compat.
getattr(envelope.message,
'sourceInfo', None))
return True
self._cancel_pending_actors(self._sources[sourceHash].pending_actors)
del self._sources[sourceHash]
return True
def h_ValidateSource(self, envelope):
if not envelope.message.sourceData and \
envelope.sender != self._cstate.conventionLeaderAddr:
# Propagate source unload requests to all convention members
self._performIO(
self._cstate.send_to_all_members(
envelope.message,
# Do not propagate if this is where the
# notification came from; prevents indefinite
# bouncing of this message as long as the
# convention structure is a DAG.
[envelope.sender]))
super(ConventioneerAdmin, self).h_ValidateSource(envelope)
return False # might have sent with hysteresis, so break out to local _run
def _acceptsRemoteLoadedSourcesFrom(self, pendingActorEnvelope):
allowed = self.capabilities.get('AllowRemoteActorSources', 'yes')
return allowed.lower() == 'yes' or \
(allowed == 'LeaderOnly' and
pendingActorEnvelope.sender == self._cstate.conventionLeaderAddr)
# ---- Remote Actor interactions ----------------------------------------------
def _not_compatible(self, createActorEnvelope):
# Called when the current Actor System is not compatible with
# the Actor's actorSystemCapabilityCheck. Forward this
# createActor request to another system that it's compatible
# with.
sourceHash = createActorEnvelope.message.sourceHash
childRequirements = createActorEnvelope.message.targetActorReq
childCName = createActorEnvelope.message.actorClassName
childClass = actualActorClass(childCName,
partial(loadModuleFromHashSource,
sourceHash,
self._sources)
if sourceHash else None)
acceptsCaps = lambda caps: checkActorCapabilities(childClass, caps,
childRequirements)
if createActorEnvelope.message.forActor is None:
# Request from external; use sender address
createActorEnvelope.message.forActor = createActorEnvelope.sender
iolist = self._cstate.forward_pending_to_remote_system(
childClass, createActorEnvelope, sourceHash, acceptsCaps)
for each in iolist:
# Expected to be only one; if the transmit fails,
# route it back here so that the next possible
# remote can be tried.
each.addCallback(onFailure=self._pending_send_failed)
self._performIO(iolist)
return True
def _get_missing_source_for_hash(self, sourceHash, createActorEnvelope):
# If this request was forwarded by a remote Admin and the
# sourceHash is not known locally, request it from the sending
# remote Admin
if self._cstate.sentByRemoteAdmin(createActorEnvelope) and \
self._acceptsRemoteLoadedSourcesFrom(createActorEnvelope):
self._sources[sourceHash] = PendingSource(sourceHash, None)
self._sources[sourceHash].pending_actors.append(createActorEnvelope)
self._hysteresisSender.sendWithHysteresis(
TransmitIntent(
createActorEnvelope.sender,
SourceHashTransferRequest(sourceHash,
bool(self._sourceAuthority))))
# sent with hysteresis, so break out to local _run
return False
# No remote Admin to send the source, so fail as normal.
return super(ConventioneerAdmin, self)._get_missing_source_for_hash(
sourceHash,
createActorEnvelope)
def _pending_send_failed(self, result, intent):
self.h_PendingActor(ReceiveEnvelope(msg=intent.message, sender=self.myAddress))
def h_NotifyOnSystemRegistration(self, envelope):
if envelope.message.enableNotification:
self._performIO(
self._cstate.add_notification_handler(envelope.sender))
else:
self._cstate.remove_notification_handler(envelope.sender)
return True
def h_PoisonMessage(self, envelope):
self._cstate.remove_notification_handler(envelope.sender)
def _handleChildExited(self, childAddr):
self._cstate.remove_notification_handler(childAddr)
return super(ConventioneerAdmin, self)._handleChildExited(childAddr)
def h_CapabilityUpdate(self, envelope):
msg = envelope.message
updateLocals = self._updSystemCapabilities(msg.capabilityName,
msg.capabilityValue)
if not self.isShuttingDown():
self._performIO(
self._cstate.capabilities_have_changed(self.capabilities))
if updateLocals:
self._capUpdateLocalActors()
return False # might have sent with Hysteresis, so return to _run loop here
#!/usr/bin/env python
from thespian.actors import *
import time
def arnab_test1():
asys1 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)],
'Admin Port': 12300,
'Admin Routing': False},
transientUnique=True)
asys2 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)],
'Admin Port': 12400,
'Admin Routing': False},
transientUnique=True)
asys3 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)],
'Admin Port': 12500,
'Admin Routing': False},
transientUnique=True)
print("Started three systems as 12300, 12400, and 12500")
input("Hit return to shutdown current leader")
asys1.shutdown()
print("Shutdown system at 12300")
print("Please wait 2-3 minutes for previous action to stabilize")
input("Hit return to shutdown system at 12400")
asys2.shutdown()
print("Shutdown system at 12400")
input("Hit return to startup system at 12400")
asys2 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)],
'Admin Port': 12400,
'Admin Routing': False},
transientUnique=True)
print("Started system at 12400")
input("Hit return to startup system at 12300")
asys1 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)],
'Admin Port': 12300,
'Admin Routing': False},
transientUnique=True)
print("Started system at 12300")
input("Hit return to shutdown system at 12400")
asys2.shutdown()
print("Shutdown system at 12400")
input("Hit return to shutdown remaining systems and exit")
asys1.shutdown()
asys3.shutdown()
print("Exiting")
if __name__ == "__main__":
arnab_test1()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment