Created
June 15, 2021 15:57
-
-
Save kquick/71628639d57de2033f56bb1e606a9f4f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from thespian.actors import * | |
from thespian.system.utilis import (thesplog, checkActorCapabilities, | |
foldl, join, fmap, AssocList, | |
actualActorClass) | |
from thespian.system.timing import ExpirationTimer, currentTime | |
from thespian.system.logdirector import LogAggregator | |
from thespian.system.admin.globalNames import GlobalNamesAdmin | |
from thespian.system.admin.adminCore import PendingSource | |
from thespian.system.transport import (TransmitIntent, ReceiveEnvelope, | |
Thespian__Run_Terminated) | |
from thespian.system.messages.admin import PendingActorResponse | |
from thespian.system.messages.convention import * | |
from thespian.system.sourceLoader import loadModuleFromHashSource | |
from thespian.system.transport.hysteresis import HysteresisDelaySender | |
from functools import partial | |
from datetime import (timedelta, datetime) | |
import os | |
import socket | |
from thespian.system.transport.IPBase import (TCPv4ActorAddress) | |
#TODO - Perhaps this should be configurable? | |
#CONVENTION_REREGISTRATION_PERIOD = timedelta(minutes=7, seconds=22) | |
#CONVENTION_RESTART_PERIOD = timedelta(minutes=3, seconds=22) | |
CONVENTION_REREGISTRATION_PERIOD = timedelta(minutes=1, seconds=00) | |
CONVENTION_RESTART_PERIOD = timedelta(minutes=1, seconds=00) | |
#TODO - Perhaps this should be configurable? | |
CONVENTION_REGISTRATION_MISS_MAX = 1 # # of missing convention registrations before death declared | |
CONVENTION_REINVITE_ADJUSTMENT = 1.1 # multiply by remote checkin expected time for new invite timeout period | |
CURR_CONV_ADDR_IPV4 = 'Convention Address.IPv4' | |
def convention_reinvite_adjustment(t): | |
try: | |
return t * CONVENTION_REINVITE_ADJUSTMENT | |
except TypeError: | |
# Python2 cannot multiply timedelta by a float, so take a longer route | |
return t + (t / int(1 / (CONVENTION_REINVITE_ADJUSTMENT % 1))) | |
class PreRegistration(object): | |
def __init__(self): | |
self.pingValid = ExpirationTimer(timedelta(seconds=0)) | |
self.pingPending = False | |
def refresh(self): | |
self.pingValid = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD) | |
class ConventionMemberData(object): | |
def __init__(self, address, capabilities, preRegOnly=False): | |
self.remoteAddress = address | |
self.remoteCapabilities = capabilities | |
self.hasRemoteActors = [] # (localParent, remoteActor) addresses created remotely | |
# The preRegOnly field indicates that this information is only | |
# from a pre-registration. | |
self.preRegOnly = preRegOnly | |
# preRegistered is not None if the ConventionRegister has the | |
# preRegister flag set. This indicates a call from | |
# preRegisterRemoteSystem. The pingValid is only used for | |
# preRegistered systems and is used to determine how long an | |
# active check of the preRegistered remote is valid. If | |
# pingValid is expired, the local attempts to send a | |
# QueryExists message (which will retry) and a QueryAck will | |
# reset pingValid to another CONVENTION_REGISTRATION_PERIOD. | |
# The pingPending is true while the QueryExists is pending and | |
# will suppress additional pingPending messages. A success or | |
# failure completion of a QueryExists message will reset | |
# pingPending to false. Note that pinging occurs continually | |
# for a preRegistered remote, regardless of whether or not its | |
# Convention membership is currently valid. | |
self.preRegistered = None # or PreRegistration object | |
self._reset_valid_timer() | |
@property | |
def permanentEntry(self): | |
return bool(self.preRegOnly or self.preRegistered) | |
def createdActor(self, localParentAddress, newActorAddress): | |
entry = localParentAddress, newActorAddress | |
if entry not in self.hasRemoteActors: | |
self.hasRemoteActors.append(entry) | |
def refresh(self, remoteCapabilities, preReg=False): | |
self.remoteCapabilities = remoteCapabilities | |
self._reset_valid_timer() | |
if self.preRegistered: | |
self.preRegistered.refresh() | |
def _reset_valid_timer(self): | |
# registryValid is a timer that is usually set to a multiple | |
# of the convention re-registration period. Each successful | |
# convention re-registration resets the timer to the maximum | |
# value (actually, it replaces this structure with a newly | |
# generated structure). If the timer expires, the remote is | |
# declared as dead and the registration is removed (or | |
# quiesced if it is a pre-registration). | |
self.registryValid = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD * | |
CONVENTION_REGISTRATION_MISS_MAX) | |
def __str__(self): | |
return 'ActorSystem @ %s%s, registry valid for %s with %s' % ( | |
str(self.remoteAddress), | |
(' (prereg-only)' if self.preRegOnly else | |
(' (prereg)' if self.preRegistered else '')), | |
str(self.registryValid), | |
str(self.remoteCapabilities)) | |
class HysteresisCancel(object): | |
def __init__(self, cancel_addr): | |
self.cancel_addr = cancel_addr | |
class HysteresisSend(TransmitIntent): pass | |
class LostRemote(object): | |
# tells transport to reset (close sockets, drop buffers, etc.) | |
def __init__(self, lost_addr): | |
self.lost_addr = lost_addr | |
class LocalConventionState(object): | |
def __init__(self, myAddress, capabilities, sCBStats, getAllConventionAddressesFunc): | |
self._myAddress = myAddress | |
self._capabilities = capabilities | |
self._sCBStats = sCBStats | |
self._conventionMembers = AssocList() # key=Remote Admin Addr, value=ConventionMemberData | |
self._conventionNotificationHandlers = [] | |
self._convntn_ipv4_marker = 0 | |
#Rename? | |
self._getConventionAddr = getAllConventionAddressesFunc | |
#self._getConventionAddr = getConventionAddressFunc | |
#self._resetConventionAddrMarker = resetConventionAddressMarkerFunc | |
#self._conventionAddress = getConventionAddressFunc(capabilities) | |
self._conventionAddresses = getAllConventionAddressesFunc(capabilities) | |
self._currentLeaderIdx = 0 | |
self._conventionAddress = getAllConventionAddressesFunc(capabilities)[self._convntn_ipv4_marker] | |
self._conventionRegistration = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD) | |
self._has_been_activated = False | |
self._invited = False # entered convention as a result of an explicit invite | |
self._current_avlbl_leaders = [] | |
# if isinstance(self._capabilities.get(CURR_CONV_ADDR_IPV4), list): | |
# self._current_avlbl_leaders = list(map(self._populate_initial_leaders, self._capabilities.get(CURR_CONV_ADDR_IPV4))) | |
self._avlbl_leader_last_knwn_ts = int(datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]) | |
@property | |
def myAddress(self): | |
return self._myAddress | |
@property | |
def capabilities(self): | |
return self._capabilities | |
def updateStatusResponse(self, resp): | |
resp.setConventionLeaderAddress(self.conventionLeaderAddr) | |
#TODO - set convention supporters here | |
resp.setConventionRegisterTime(self._conventionRegistration) | |
for each in self._conventionMembers.values(): | |
resp.addConventioneer(each.remoteAddress, each.registryValid) | |
resp.setNotifyHandlers(self._conventionNotificationHandlers) | |
def active_in_convention(self): | |
# If this is the convention leader, it is automatically | |
# active, otherwise this convention member should have a | |
# convention leader and that leader should have an active | |
# entry in the _conventionMembers table (indicating it has | |
# updated this system with its information) | |
return bool(self.conventionLeaderAddr and | |
self._conventionMembers.find(self.conventionLeaderAddr)) | |
@property | |
def conventionLeaderAddr(self): | |
return self._conventionAddress | |
def isConventionLeader(self): | |
# Might also be the leader if self.conventionLeaderAddr is None | |
return self.conventionLeaderAddr == self.myAddress | |
def _populate_initial_leaders(self, convention_leader): | |
thesplog(' _populate_initial_leaders: %s', convention_leader, level=logging.WARNING) | |
curr_leader = {} | |
curr_leader['convention_alias'] = convention_leader[0] | |
curr_leader['admin_port'] = convention_leader[1] | |
# Status is DOWN unless confirmed otherwise | |
curr_leader['status'] = 'DOWN' | |
# Defaulted to current system up (UTC) | |
curr_leader['last_known_ts'] = int(datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]) | |
return curr_leader | |
def capabilities_have_changed(self, new_capabilities): | |
self._capabilities = new_capabilities | |
return self.setup_convention() | |
def setup_convention(self, activation=False): | |
thesplog(' setup_convention:entry, activation: %s', activation, level=logging.WARNING) | |
self._has_been_activated |= activation | |
rmsgs = [] | |
# If not specified in capabilities, don't override any invites | |
# that may have been received. | |
self._conventionAddress = self._getConventionAddr(self.capabilities)[self._convntn_ipv4_marker] or \ | |
self._conventionAddress | |
leader_is_gone = (self._conventionMembers.find(self.conventionLeaderAddr) is None) \ | |
if self.conventionLeaderAddr else True | |
thesplog(' isConventionLeader:%s, conventionLeaderAddr: %s', self.isConventionLeader(), self.conventionLeaderAddr, level=logging.WARNING) | |
if not self.isConventionLeader() and self.conventionLeaderAddr: | |
thesplog('Admin registering with Convention @ %s (%s)', | |
self.conventionLeaderAddr, | |
'first time' if leader_is_gone else 're-registering', | |
level=logging.INFO, primary=True) | |
rmsgs.append( | |
HysteresisSend(self.conventionLeaderAddr, | |
ConventionRegister(self.myAddress, | |
self.capabilities, | |
leader_is_gone), | |
onSuccess = self._setupConventionCBGood, | |
onError = self._setupConventionCBError)) | |
rmsgs.append(LogAggregator(self.conventionLeaderAddr)) | |
# Check if this is part of convention leaders | |
thesplog(' I am: %s', self.myAddress, level=logging.WARNING) | |
if isinstance(self.capabilities.get(CURR_CONV_ADDR_IPV4), list): | |
# for curr_entry in self.capabilities.get(CURR_CONV_ADDR_IPV4): | |
for curr_entry in self._conventionAddresses: | |
# if socket.gethostname() == socket.getfqdn(curr_entry[0]): | |
thesplog(' checking: %s (%s)', curr_entry, curr_entry == self.myAddress, level=logging.WARNING) | |
if curr_entry != self.myAddress: | |
thesplog(' Telling other leader %s I am available', curr_entry, level=logging.WARNING) | |
rmsgs.append(TransmitIntent(self.myAddress, NewLeaderAvailable(self.myAddress, curr_entry))) | |
self._conventionRegistration = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD) | |
return rmsgs | |
def _setupConventionCBGood(self, result, finishedIntent): | |
self._sCBStats.inc('Admin Convention Registered') | |
if hasattr(self, '_conventionLeaderMissCount'): | |
delattr(self, '_conventionLeaderMissCount') | |
def _setupConventionCBError(self, result, finishedIntent): | |
self._sCBStats.inc('Admin Convention Registration Failed') | |
if hasattr(self, '_conventionLeaderMissCount'): | |
self._conventionLeaderMissCount += 1 | |
else: | |
self._conventionLeaderMissCount = 1 | |
thesplog('Admin cannot register with convention @ %s (miss %d): %s', | |
finishedIntent.targetAddr, | |
self._conventionLeaderMissCount, | |
result, level=logging.WARNING, primary=True) | |
def got_convention_invite(self, sender): | |
thesplog('****** got_convention_invite from %s', sender, level=logging.WARNING) | |
self._conventionAddress = sender | |
self._invited = True | |
return self.setup_convention() | |
def got_convention_register(self, regmsg): | |
# Called when remote convention member has sent a ConventionRegister message | |
self._sCBStats.inc('Admin Handle Convention Registration') | |
if self._invited and not self.conventionLeaderAddr: | |
# Lost connection to an invitation-only convention. | |
# Cannot join again until another invitation is received. | |
return [] | |
# Registrant may re-register if changing capabilities | |
rmsgs = [] | |
registrant = regmsg.adminAddress | |
prereg = getattr(regmsg, 'preRegister', False) # getattr used; see definition | |
existing = self._conventionMembers.find(registrant) | |
thesplog('****** Got Convention %sregistration from %s (%s) (new? %s)', | |
'pre-' if prereg else '', | |
registrant, | |
'first time' if regmsg.firstTime else 're-registering', | |
not existing, | |
level=logging.WARNING) | |
if registrant == self.myAddress: | |
# Either remote failed getting an external address and is | |
# using 127.0.0.1 or else this is a malicious attempt to | |
# make us talk to ourselves. Ignore it. | |
thesplog('Convention registration from %s is an invalid address; ignoring.', | |
registrant, | |
level=logging.WARNING) | |
return rmsgs | |
thesplog(" whoami: %s", self.myAddress, level=logging.WARNING) | |
if isinstance(self.capabilities.get(CURR_CONV_ADDR_IPV4), list): | |
# for curr_entry in self.capabilities.get(CURR_CONV_ADDR_IPV4): | |
for curr_entry in self._conventionAddresses: | |
# if socket.gethostname() == socket.getfqdn(curr_entry[0]): | |
thesplog(" curr check of %s (%s)", curr_entry, curr_entry == self.myAddress, level=logging.WARNING) | |
if curr_entry == self.myAddress: | |
thesplog(' Post convention_register, new leader %s identified', curr_entry, level=logging.WARNING) | |
rmsgs.append(TransmitIntent(self.myAddress, NewLeaderAvailable(self.myAddress, curr_entry))) | |
break | |
else: | |
thesplog(" no new leader found here", level=logging.ERROR) | |
existingPreReg = ( | |
# existing.preRegOnly | |
# or existing.preRegistered | |
existing.permanentEntry | |
) if existing else False | |
notify = (not existing or existing.preRegOnly) and not prereg | |
if regmsg.firstTime or not existing: | |
if existing: | |
existing = None | |
notify = not prereg | |
rmsgs.extend(self._remote_system_cleanup(registrant)) | |
newmember = ConventionMemberData(registrant, | |
regmsg.capabilities, | |
prereg) | |
if prereg or existingPreReg: | |
newmember.preRegistered = PreRegistration() | |
self._conventionMembers.add(registrant, newmember) | |
else: | |
existing.refresh(regmsg.capabilities, prereg or existingPreReg) | |
if not prereg: | |
existing.preRegOnly = False | |
if not self.isConventionLeader(): | |
self._conventionRegistration = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD) | |
# Convention Members normally periodically initiate a | |
# membership message, to which the leader confirms by | |
# responding; if this was a pre-registration, that identifies | |
# this system as the "leader" for that remote. Also, if the | |
# remote sent this because it was a pre-registration leader, | |
# it doesn't yet have all the member information so the member | |
# should respond. | |
#if self.isConventionLeader() or prereg or regmsg.firstTime: | |
if prereg: | |
rmsgs.append(HysteresisCancel(registrant)) | |
rmsgs.append(TransmitIntent(registrant, ConventionInvite())) | |
elif (self.isConventionLeader() or prereg or regmsg.firstTime or \ | |
(existing and existing.permanentEntry)): | |
# If we are the Convention Leader, this would be the point to | |
# inform all other registrants of the new registrant. At | |
# present, there is no reciprocity here, so just update the | |
# new registrant with the leader's info. | |
rmsgs.append( | |
TransmitIntent(registrant, | |
ConventionRegister(self.myAddress, | |
self.capabilities))) | |
if notify: | |
rmsgs.extend(self._notifications_of( | |
ActorSystemConventionUpdate(registrant, | |
regmsg.capabilities, | |
True))) | |
return rmsgs | |
def _notifications_of(self, msg): | |
return [TransmitIntent(H, msg) for H in self._conventionNotificationHandlers] | |
def add_notification_handler(self, addr): | |
if addr not in self._conventionNotificationHandlers: | |
self._conventionNotificationHandlers.append(addr) | |
# Now update the registrant on the current state of all convention members | |
return [TransmitIntent(addr, | |
ActorSystemConventionUpdate(M.remoteAddress, | |
M.remoteCapabilities, | |
True)) | |
for M in self._conventionMembers.values() | |
if not M.preRegOnly] | |
return [] | |
def remove_notification_handler(self, addr): | |
self._conventionNotificationHandlers = [ | |
H for H in self._conventionNotificationHandlers | |
if H != addr] | |
def got_convention_deregister(self, deregmsg): | |
self._sCBStats.inc('Admin Handle Convention De-registration') | |
remoteAdmin = deregmsg.adminAddress | |
if remoteAdmin == self.myAddress: | |
# Either remote failed getting an external address and is | |
# using 127.0.0.1 or else this is a malicious attempt to | |
# make us talk to ourselves. Ignore it. | |
thesplog('Convention deregistration from %s is an invalid address; ignoring.', | |
remoteAdmin, | |
level=logging.WARNING) | |
rmsgs = [] | |
if getattr(deregmsg, 'preRegistered', False): # see definition for getattr use | |
existing = self._conventionMembers.find(remoteAdmin) | |
if existing: | |
existing.preRegistered = None | |
rmsgs.append(TransmitIntent(remoteAdmin, ConventionDeRegister(self.myAddress))) | |
return rmsgs + self._remote_system_cleanup(remoteAdmin) | |
def got_system_shutdown(self): | |
return self.exit_convention() | |
def exit_convention(self): | |
self.invited = False | |
gen_ops = lambda addr: [HysteresisCancel(addr), | |
TransmitIntent(addr, | |
ConventionDeRegister(self.myAddress)), | |
] | |
terminate = lambda a: [ self._remote_system_cleanup(a), gen_ops(a) ][-1] | |
if self.conventionLeaderAddr and \ | |
self.conventionLeaderAddr != self.myAddress: | |
thesplog('Admin de-registering with Convention @ %s', | |
str(self.conventionLeaderAddr), | |
level=logging.INFO, primary=True) | |
# Cache convention leader address because it might get reset by terminate() | |
claddr = self.conventionLeaderAddr | |
terminate(self.conventionLeaderAddr) | |
return gen_ops(claddr) | |
return join(fmap(terminate, | |
[M.remoteAddress | |
for M in self._conventionMembers.values() | |
if M.remoteAddress != self.myAddress])) | |
def check_convention(self): | |
ct = currentTime() | |
rmsgs = [] | |
thesplog(' *** check convention ***', level=logging.DEBUG) | |
if self._has_been_activated: | |
rmsgs = foldl(lambda x, y: x + y, | |
[self._check_preregistered_ping(ct, member) | |
for member in self._conventionMembers.values()], | |
self._convention_leader_checks(ct) | |
if self.isConventionLeader() or | |
not self.conventionLeaderAddr else | |
self._convention_member_checks(ct)) | |
if self._conventionRegistration.view(ct).expired(): | |
self._conventionRegistration = ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD) | |
return rmsgs | |
def _convention_leader_checks(self, ct): | |
return foldl(lambda x, y: x + y, | |
[self._missed_checkin_remote_cleanup(R) | |
for R in [ member | |
for member in self._conventionMembers.values() | |
if member.registryValid.view(ct).expired() ]], | |
[]) | |
def _missed_checkin_remote_cleanup(self, remote_member): | |
thesplog('%s missed %d checkins (%s); assuming it has died', | |
str(remote_member), | |
CONVENTION_REGISTRATION_MISS_MAX, | |
str(remote_member.registryValid), | |
level=logging.WARNING, primary=True) | |
return self._remote_system_cleanup(remote_member.remoteAddress) | |
#TODO - This is where we need to put re-elect leader logic | |
def _convention_member_checks(self, ct): | |
rmsgs = [] | |
thesplog(' *** convention member checks ***', level=logging.DEBUG) | |
# Re-register with the Convention if it's time | |
if self.conventionLeaderAddr and \ | |
self._conventionRegistration.view(ct).expired(): | |
if getattr(self, '_conventionLeaderMissCount', 0) >= \ | |
CONVENTION_REGISTRATION_MISS_MAX: | |
thesplog('Admin convention registration lost @ %s (miss %d)', | |
self.conventionLeaderAddr, | |
self._conventionLeaderMissCount, | |
level=logging.WARNING, primary=True) | |
rmsgs.extend(self._remote_system_cleanup(self.conventionLeaderAddr)) | |
#TODO - This is where we need to put re-elect leader logic | |
new_leader_actr_addr = self._initiate_re_election() | |
# new_leader_actr_addr = ActorAddress(TCPv4ActorAddress( | |
# self._initiate_re_election(), | |
# 1900, | |
# external=True)) | |
rmsgs.append(TransmitIntent(new_leader_actr_addr, ConventionRegister(self.myAddress, self.capabilities, True))) | |
self._conventionLeaderMissCount = 0 | |
else: | |
rmsgs.extend(self.setup_convention()) | |
return rmsgs | |
def _initiate_re_election(self): | |
# curr_ldr_ip = str(self.conventionLeaderAddr).split('|')[1].split(':')[0] | |
curr_ldr_addr = self._conventionAddresses[self._currentLeaderIdx] | |
thesplog(" re-elect away from %s to one of: %s", curr_ldr_addr, list(map(str, self._current_avlbl_leaders)), level=logging.WARNING) | |
self._current_avlbl_leaders = [a for a in self._current_avlbl_leaders if a != curr_ldr_addr] | |
for (idx,each) in enumerate(self._conventionAddresses): | |
if each in self._current_avlbl_leaders: | |
self._currentLeaderIdx = idx | |
thesplog(' Selected %s (%d) as next new leader', each, idx, level=logging.WARNING) | |
return each | |
thesplog('NO ACTIVE LEADERS AVAILABLE', level=logging.ERROR) | |
return None | |
# new_ldr = '' | |
# for each in self._current_avlbl_leaders: | |
# if curr_ldr_ip == socket.gethostbyname(socket.getfqdn(each['convention_alias'])): | |
# thesplog(' Marked current leader %s as DOWN', curr_ldr_ip, level=logging.DEBUG) | |
# each['status'] = 'DOWN' | |
# each['last_known_ts'] = int(datetime.utcnow().strftime('%Y%m%d%H%M%S%f')[:-3]) | |
# break | |
# thesplog(' Current leader status :', level=logging.DEBUG) | |
# for each in self._current_avlbl_leaders: | |
# thesplog(' %s', each, level=logging.DEBUG) | |
# for idx, curr_ldr in enumerate(self._current_avlbl_leaders): | |
# if curr_ldr['status'] == 'UP': | |
# new_ldr = curr_ldr['convention_alias'] | |
# self._conventionAddress = curr_ldr['convention_alias'] | |
# self._convntn_ipv4_marker = idx | |
# thesplog(' Selected %s as next new leader', new_ldr, level=logging.DEBUG) | |
# break | |
# return socket.gethostbyname(socket.getfqdn(new_ldr)) | |
def _check_preregistered_ping(self, ct, member): | |
if member.preRegistered and \ | |
member.preRegistered.pingValid.view(ct).expired() and \ | |
not member.preRegistered.pingPending: | |
member.preRegistered.pingPending = True | |
# If remote misses a checkin, re-extend the | |
# invitation. This also helps re-initiate a socket | |
# connection if a TxOnly socket has been lost. | |
member.preRegistered.pingValid = ExpirationTimer( | |
convention_reinvite_adjustment( | |
CONVENTION_RESTART_PERIOD | |
if member.registryValid.view(ct).expired() | |
else CONVENTION_REREGISTRATION_PERIOD)) | |
return [HysteresisSend(member.remoteAddress, | |
ConventionInvite(), | |
onSuccess = self._preRegQueryNotPending, | |
onError = self._preRegQueryNotPending)] | |
return [] | |
def _preRegQueryNotPending(self, result, finishedIntent): | |
remoteAddr = finishedIntent.targetAddr | |
member = self._conventionMembers.find(remoteAddr) | |
if member and member.preRegistered: | |
member.preRegistered.pingPending = False | |
def _remote_system_cleanup(self, registrant): | |
"""Called when a RemoteActorSystem has exited and all associated | |
Actors should be marked as exited and the ActorSystem | |
removed from Convention membership. This is also called on | |
a First Time connection from the remote to discard any | |
previous connection information. | |
""" | |
thesplog('Convention cleanup or deregistration for %s (known? %s)', | |
registrant, | |
bool(self._conventionMembers.find(registrant)), | |
level=logging.INFO) | |
rmsgs = [LostRemote(registrant)] | |
cmr = self._conventionMembers.find(registrant) | |
if not cmr or cmr.preRegOnly: | |
return [] | |
# Send exited notification to conventionNotificationHandler (if any) | |
for each in self._conventionNotificationHandlers: | |
rmsgs.append( | |
TransmitIntent(each, | |
ActorSystemConventionUpdate(cmr.remoteAddress, | |
cmr.remoteCapabilities, | |
False))) # errors ignored | |
# If the remote ActorSystem shutdown gracefully (i.e. sent | |
# a Convention Deregistration) then it should not be | |
# necessary to shutdown remote Actors (or notify of their | |
# shutdown) because the remote ActorSystem should already | |
# have caused this to occur. However, it won't hurt, and | |
# it's necessary if the remote ActorSystem did not exit | |
# gracefully. | |
for lpa, raa in cmr.hasRemoteActors: | |
# ignore errors: | |
rmsgs.append(TransmitIntent(lpa, ChildActorExited(raa))) | |
# n.b. at present, this means that the parent might | |
# get duplicate notifications of ChildActorExited; it | |
# is expected that Actors can handle this. | |
# Remove remote system from conventionMembers | |
if not cmr.preRegistered: | |
if registrant == self.conventionLeaderAddr and self._invited: | |
self._conventionAddress = None | |
# Don't clear invited: once invited, that | |
# perpetually indicates this should be only a | |
# member and never a leader. | |
self._conventionMembers.rmv(registrant) | |
else: | |
# This conventionMember needs to stay because the | |
# current system needs to continue issuing | |
# registration pings. By setting the registryValid | |
# expiration to forever, this member won't re-time-out | |
# and will therefore be otherwise ignored... until it | |
# registers again at which point the membership will | |
# be updated with new settings. | |
cmr.registryValid = ExpirationTimer(None) | |
cmr.preRegOnly = True | |
return rmsgs + [HysteresisCancel(registrant)] | |
def sentByRemoteAdmin(self, envelope): | |
for each in self._conventionMembers.values(): | |
if envelope.sender == each.remoteAddress: | |
return True | |
return False | |
def convention_inattention_delay(self, current_time): | |
return (self._conventionRegistration or | |
ExpirationTimer(CONVENTION_REREGISTRATION_PERIOD | |
if self.active_in_convention() or | |
self.isConventionLeader() else | |
CONVENTION_RESTART_PERIOD)).view(current_time) | |
def forward_pending_to_remote_system(self, childClass, envelope, sourceHash, acceptsCaps): | |
alreadyTried = getattr(envelope.message, 'alreadyTried', []) | |
ct = currentTime() | |
remoteCandidates = [ | |
K | |
for K in self._conventionMembers.values() | |
if not K.registryValid.view(ct).expired() | |
and K.remoteAddress != envelope.sender # source Admin | |
and K.remoteAddress not in alreadyTried | |
and acceptsCaps(K.remoteCapabilities)] | |
if not remoteCandidates: | |
if self.isConventionLeader() or not self.conventionLeaderAddr: | |
raise NoCompatibleSystemForActor( | |
childClass, | |
'No known ActorSystems can handle a %s for %s', | |
childClass, envelope.message.forActor) | |
# Let the Convention Leader try to find an appropriate ActorSystem | |
bestC = self.conventionLeaderAddr | |
else: | |
# distribute equally amongst candidates | |
C = [(K.remoteAddress, len(K.hasRemoteActors)) | |
for K in remoteCandidates] | |
bestC = foldl(lambda best,possible: | |
best if best[1] <= possible[1] else possible, | |
C)[0] | |
thesplog('Requesting creation of %s%s on remote admin %s', | |
envelope.message.actorClassName, | |
' (%s)'%sourceHash if sourceHash else '', | |
bestC) | |
if bestC not in alreadyTried: | |
# Don't send request to this remote again, it has already | |
# been tried. This would also be indicated by that system | |
# performing the add of self.myAddress as below, but if | |
# there is disagreement between the local and remote | |
# addresses, this addition will prevent continual | |
# bounceback. | |
alreadyTried.append(bestC) | |
if self.myAddress not in alreadyTried: | |
# Don't send request back to this actor system: it cannot | |
# handle it | |
alreadyTried.append(self.myAddress) | |
envelope.message.alreadyTried = alreadyTried | |
return [TransmitIntent(bestC, envelope.message)] | |
def send_to_all_members(self, message, exception_list=None): | |
thesplog(' ### send_to_all_members: entry ###', level=logging.DEBUG) | |
return [HysteresisSend(M.remoteAddress, message) | |
for M in self._conventionMembers.values() | |
if M.remoteAddress not in (exception_list or [])] | |
class ConventioneerAdmin(GlobalNamesAdmin): | |
"""Extends the AdminCore+GlobalNamesAdmin with ActorSystem Convention | |
functionality to support multi-host configurations. | |
""" | |
def __init__(self, *args, **kw): | |
super(ConventioneerAdmin, self).__init__(*args, **kw) | |
self._cstate = LocalConventionState( | |
self.myAddress, | |
self.capabilities, | |
self._sCBStats, | |
getattr(self.transport, 'getAllConventionAddresses', lambda c: None)) | |
#getattr(self.transport, 'resetConventionAddressMarker', lambda c: None)) | |
self._hysteresisSender = HysteresisDelaySender(self._send_intent) | |
def _updateStatusResponse(self, resp): | |
self._cstate.updateStatusResponse(resp) | |
super(ConventioneerAdmin, self)._updateStatusResponse(resp) | |
def _activate(self): | |
# Called internally when this ActorSystem has been initialized | |
# and should be activated for operations. | |
super(ConventioneerAdmin, self)._activate() | |
if self.isShuttingDown(): return | |
self._performIO(self._cstate.setup_convention(True)) | |
def h_ConventionInvite(self, envelope): | |
if self.isShuttingDown(): return | |
self._performIO(self._cstate.got_convention_invite(envelope.sender)) | |
return True | |
def h_ConventionRegister(self, envelope): | |
if self.isShuttingDown(): return | |
self._performIO(self._cstate.got_convention_register(envelope.message)) | |
if envelope.sender in self._cstate._conventionAddresses: | |
if envelope.sender not in self._cstate._current_avlbl_leaders: | |
self._cstate._current_avlbl_leaders.append(envelope.sender) | |
return True | |
def h_ConventionDeRegister(self, envelope): | |
self._performIO(self._cstate.got_convention_deregister(envelope.message)) | |
if envelope.sender in self._cstate._conventionAddresses: | |
self._cstate._current_avlbl_leaders = [ a for a in self._cstate._current_avlbl_leaders if a != envelope.sender] | |
return True | |
def h_NewLeaderAvailable(self, envelope): | |
thesplog(' NewLeaderAvailable: sender: %s, message: %s', \ | |
envelope.sender, str(envelope.message), \ | |
level=logging.DEBUG) | |
if envelope.sender not in self._cstate._current_avlbl_leaders: | |
self._cstate._current_avlbl_leaders.append(envelope.sender) | |
# if socket.gethostname() == socket.getfqdn(envelope.message.conventionAlias[0]): | |
# if envelope.message.lastKnownTS > self._cstate._avlbl_leader_last_knwn_ts: | |
# thesplog(' Assigning new time stamp %s', str(envelope.message.lastKnownTS), level=logging.DEBUG) | |
# self._cstate._avlbl_leader_last_knwn_ts = envelope.message.lastKnownTS | |
# else: | |
# thesplog(' Got a self-message (timestamp= %s), skipping', str(envelope.message.currentTag), level=logging.DEBUG) | |
# self._current_leader_stats() | |
# return False | |
# for curr_leader in self._cstate._current_avlbl_leaders: | |
# if curr_leader['convention_alias'] == envelope.message.conventionAlias[0]: | |
# curr_leader['status'] = 'UP' | |
# curr_leader['last_known_ts'] = envelope.message.lastKnownTS | |
# thesplog(' Updated status and timestamp for %s', envelope.message.conventionAlias, level=logging.DEBUG) | |
# break | |
self._current_leader_stats() | |
# self._performIO(self._cstate.send_to_all_members(envelope.message, [envelope.sender])) | |
thesplog(' NewLeaderAvailable: exit', level=logging.WARNING) | |
return True | |
def h_SystemShutdown(self, envelope): | |
self._performIO(self._cstate.got_system_shutdown()) | |
return super(ConventioneerAdmin, self).h_SystemShutdown(envelope) | |
return True | |
def _performIO(self, iolist): | |
thesplog(' <<<_performIO', level=logging.DEBUG) | |
for msg in iolist: | |
if isinstance(msg, HysteresisCancel): | |
self._hysteresisSender.cancelSends(msg.cancel_addr) | |
elif isinstance(msg, HysteresisSend): | |
#self._send_intent(msg) | |
self._hysteresisSender.sendWithHysteresis(msg) | |
elif isinstance(msg, LogAggregator): | |
if getattr(self, 'asLogger', None): | |
thesplog('Setting log aggregator of %s to %s', self.asLogger, msg.aggregatorAddress) | |
self._send_intent(TransmitIntent(self.asLogger, msg)) | |
elif isinstance(msg, LostRemote): | |
if hasattr(self.transport, 'lostRemote'): | |
self.transport.lostRemote(msg.lost_addr) | |
else: | |
self._send_intent(msg) | |
thesplog(' >>>', level=logging.DEBUG) | |
def _current_leader_stats(self): | |
thesplog(' Current leader stats', level=logging.DEBUG) | |
for each in self._cstate._current_avlbl_leaders: | |
thesplog(' %s', each, level=logging.DEBUG) | |
#Only relevant for convention leaders | |
thesplog(' Last known TS: %s', str(self._cstate._avlbl_leader_last_knwn_ts), level=logging.DEBUG) | |
def run(self): | |
# Main loop for convention management. Wraps the lower-level | |
# transport with a stop at the next needed convention | |
# registration period to re-register. | |
transport_continue = True | |
try: | |
while not getattr(self, 'shutdown_completed', False) and \ | |
not isinstance(transport_continue, Thespian__Run_Terminated): | |
ct = currentTime() | |
delay = min(self._cstate.convention_inattention_delay(ct), | |
ExpirationTimer(None).view(ct) if self._hysteresisSender.delay.expired() else | |
self._hysteresisSender.delay | |
) | |
# n.b. delay does not account for soon-to-expire | |
# pingValids, but since delay will not be longer than | |
# a CONVENTION_REREGISTRATION_PERIOD, the worst case | |
# is a doubling of a pingValid period (which should be fine). | |
thesplog(' <<<Running at %s', self.myAddress, level=logging.DEBUG) | |
transport_continue = self.transport.run(self.handleIncoming, | |
delay.remaining()) | |
# Check Convention status based on the elapsed time | |
self._performIO(self._cstate.check_convention()) | |
self._hysteresisSender.checkSends() | |
self._remove_expired_sources() | |
thesplog('>>>', level=logging.DEBUG) | |
except Exception as ex: | |
import traceback | |
thesplog('ActorAdmin uncaught exception: %s', traceback.format_exc(), | |
level=logging.ERROR, exc_info=True) | |
thesplog('Admin time to die', level=logging.DEBUG) | |
# ---- Source Hash Transfers -------------------------------------------------- | |
def h_SourceHashTransferRequest(self, envelope): | |
sourceHash = envelope.message.sourceHash | |
src = self._sources.get(sourceHash, None) | |
if not src or not src.source_valid: | |
self._send_intent( | |
TransmitIntent(envelope.sender, | |
SourceHashTransferReply(sourceHash))) | |
else: | |
# Older requests did not have the prefer_original field; | |
# maintain backward compatibility | |
orig = getattr(envelope.message, 'prefer_original', False) | |
self._send_intent( | |
TransmitIntent( | |
envelope.sender, | |
SourceHashTransferReply( | |
sourceHash, | |
src.orig_data if orig else src.zipsrc, | |
src.srcInfo, | |
original_form = orig))) | |
return True | |
def h_SourceHashTransferReply(self, envelope): | |
sourceHash = envelope.message.sourceHash | |
if sourceHash not in self._sources: | |
return True | |
if envelope.message.isValid(): | |
# nb.. original_form added; use getattr for backward compatibility | |
if getattr(envelope.message, 'original_form', False): | |
if self._sourceAuthority: | |
self._send_intent( | |
TransmitIntent( | |
self._sourceAuthority, | |
ValidateSource(sourceHash, | |
envelope.message.sourceData, | |
getattr(envelope.message, | |
'sourceInfo', None)))) | |
return True | |
else: | |
self._loadValidatedActorSource(sourceHash, | |
envelope.message.sourceData, | |
# sourceInfo added; backward compat. | |
getattr(envelope.message, | |
'sourceInfo', None)) | |
return True | |
self._cancel_pending_actors(self._sources[sourceHash].pending_actors) | |
del self._sources[sourceHash] | |
return True | |
def h_ValidateSource(self, envelope): | |
if not envelope.message.sourceData and \ | |
envelope.sender != self._cstate.conventionLeaderAddr: | |
# Propagate source unload requests to all convention members | |
self._performIO( | |
self._cstate.send_to_all_members( | |
envelope.message, | |
# Do not propagate if this is where the | |
# notification came from; prevents indefinite | |
# bouncing of this message as long as the | |
# convention structure is a DAG. | |
[envelope.sender])) | |
super(ConventioneerAdmin, self).h_ValidateSource(envelope) | |
return False # might have sent with hysteresis, so break out to local _run | |
def _acceptsRemoteLoadedSourcesFrom(self, pendingActorEnvelope): | |
allowed = self.capabilities.get('AllowRemoteActorSources', 'yes') | |
return allowed.lower() == 'yes' or \ | |
(allowed == 'LeaderOnly' and | |
pendingActorEnvelope.sender == self._cstate.conventionLeaderAddr) | |
# ---- Remote Actor interactions ---------------------------------------------- | |
def _not_compatible(self, createActorEnvelope): | |
# Called when the current Actor System is not compatible with | |
# the Actor's actorSystemCapabilityCheck. Forward this | |
# createActor request to another system that it's compatible | |
# with. | |
sourceHash = createActorEnvelope.message.sourceHash | |
childRequirements = createActorEnvelope.message.targetActorReq | |
childCName = createActorEnvelope.message.actorClassName | |
childClass = actualActorClass(childCName, | |
partial(loadModuleFromHashSource, | |
sourceHash, | |
self._sources) | |
if sourceHash else None) | |
acceptsCaps = lambda caps: checkActorCapabilities(childClass, caps, | |
childRequirements) | |
if createActorEnvelope.message.forActor is None: | |
# Request from external; use sender address | |
createActorEnvelope.message.forActor = createActorEnvelope.sender | |
iolist = self._cstate.forward_pending_to_remote_system( | |
childClass, createActorEnvelope, sourceHash, acceptsCaps) | |
for each in iolist: | |
# Expected to be only one; if the transmit fails, | |
# route it back here so that the next possible | |
# remote can be tried. | |
each.addCallback(onFailure=self._pending_send_failed) | |
self._performIO(iolist) | |
return True | |
def _get_missing_source_for_hash(self, sourceHash, createActorEnvelope): | |
# If this request was forwarded by a remote Admin and the | |
# sourceHash is not known locally, request it from the sending | |
# remote Admin | |
if self._cstate.sentByRemoteAdmin(createActorEnvelope) and \ | |
self._acceptsRemoteLoadedSourcesFrom(createActorEnvelope): | |
self._sources[sourceHash] = PendingSource(sourceHash, None) | |
self._sources[sourceHash].pending_actors.append(createActorEnvelope) | |
self._hysteresisSender.sendWithHysteresis( | |
TransmitIntent( | |
createActorEnvelope.sender, | |
SourceHashTransferRequest(sourceHash, | |
bool(self._sourceAuthority)))) | |
# sent with hysteresis, so break out to local _run | |
return False | |
# No remote Admin to send the source, so fail as normal. | |
return super(ConventioneerAdmin, self)._get_missing_source_for_hash( | |
sourceHash, | |
createActorEnvelope) | |
def _pending_send_failed(self, result, intent): | |
self.h_PendingActor(ReceiveEnvelope(msg=intent.message, sender=self.myAddress)) | |
def h_NotifyOnSystemRegistration(self, envelope): | |
if envelope.message.enableNotification: | |
self._performIO( | |
self._cstate.add_notification_handler(envelope.sender)) | |
else: | |
self._cstate.remove_notification_handler(envelope.sender) | |
return True | |
def h_PoisonMessage(self, envelope): | |
self._cstate.remove_notification_handler(envelope.sender) | |
def _handleChildExited(self, childAddr): | |
self._cstate.remove_notification_handler(childAddr) | |
return super(ConventioneerAdmin, self)._handleChildExited(childAddr) | |
def h_CapabilityUpdate(self, envelope): | |
msg = envelope.message | |
updateLocals = self._updSystemCapabilities(msg.capabilityName, | |
msg.capabilityValue) | |
if not self.isShuttingDown(): | |
self._performIO( | |
self._cstate.capabilities_have_changed(self.capabilities)) | |
if updateLocals: | |
self._capUpdateLocalActors() | |
return False # might have sent with Hysteresis, so return to _run loop here |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from thespian.actors import * | |
import time | |
def arnab_test1(): | |
asys1 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)], | |
'Admin Port': 12300, | |
'Admin Routing': False}, | |
transientUnique=True) | |
asys2 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)], | |
'Admin Port': 12400, | |
'Admin Routing': False}, | |
transientUnique=True) | |
asys3 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)], | |
'Admin Port': 12500, | |
'Admin Routing': False}, | |
transientUnique=True) | |
print("Started three systems as 12300, 12400, and 12500") | |
input("Hit return to shutdown current leader") | |
asys1.shutdown() | |
print("Shutdown system at 12300") | |
print("Please wait 2-3 minutes for previous action to stabilize") | |
input("Hit return to shutdown system at 12400") | |
asys2.shutdown() | |
print("Shutdown system at 12400") | |
input("Hit return to startup system at 12400") | |
asys2 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)], | |
'Admin Port': 12400, | |
'Admin Routing': False}, | |
transientUnique=True) | |
print("Started system at 12400") | |
input("Hit return to startup system at 12300") | |
asys1 = ActorSystem('multiprocTCPBase', { 'Convention Address.IPv4': [('', 12300), ('', 12400), ('', 12500)], | |
'Admin Port': 12300, | |
'Admin Routing': False}, | |
transientUnique=True) | |
print("Started system at 12300") | |
input("Hit return to shutdown system at 12400") | |
asys2.shutdown() | |
print("Shutdown system at 12400") | |
input("Hit return to shutdown remaining systems and exit") | |
asys1.shutdown() | |
asys3.shutdown() | |
print("Exiting") | |
if __name__ == "__main__": | |
arnab_test1() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment