Skip to content

Instantly share code, notes, and snippets.

@kquick
Created April 3, 2019 14:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kquick/38a23b58e16f1505720a4a18fece012f to your computer and use it in GitHub Desktop.
Save kquick/38a23b58e16f1505720a4a18fece012f to your computer and use it in GitHub Desktop.
Thespian issue_43 corrected
from thespian.troupe import troupe
from thespian.actors import ActorTypeDispatcher, Actor
from thespian.actors import ActorSystem
import logging
import time
class ActorLogFilter(logging.Filter):
def filter(self, logrecord):
return 'actorAddress' in logrecord.__dict__
class NotActorLogFilter(logging.Filter):
def filter(self, logrecord):
return 'actorAddress' not in logrecord.__dict__
def log_config(log_file_path_1, log_file_path_2):
return {
'version': 1,
'formatters': {
'normal': {'format': '%(levelname)-8s %(message)s'},
'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
'filters': {'isActorLog': {'()': ActorLogFilter},
'notActorLog': {'()': NotActorLogFilter}},
'handlers': {'h1': {'class': 'logging.FileHandler',
'filename': log_file_path_1,
'formatter': 'normal',
'filters': ['notActorLog'],
'level': logging.INFO},
'h2': {'class': 'logging.FileHandler',
'filename': log_file_path_2,
'formatter': 'actor',
'filters': ['isActorLog'],
'level': logging.INFO}, },
'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
}
class PrimaryActor(ActorTypeDispatcher):
def receiveMsg_str(self, msg, sender):
test_data = [
[1, 2, 3, 4, 5] * 2,
[6, 7, 8, 9, 10] * 2,
[11, 12, 13, 14, 15] * 2,
[16, 17, 18, 19, 20] * 2,
[21, 22, 23, 24, 25] * 2,
[1, 2, 3, 4, 5] * 2,
[6, 7, 8, 9, 10] * 2,
[11, 12, 13, 14, 15] * 2,
[16, 17, 18, 19, 20] * 2,
[21, 22, 23, 24, 25] * 2,
[1, 2, 3, 4, 5] * 2
]
if not hasattr(self, "helper"):
self.helper = self.createActor(
SecondaryActor
)
for data in test_data:
self.send(
self.helper,
data
)
@troupe(max_count=200, idle_count=1)
class SecondaryActor(ActorTypeDispatcher):
child_count = 0
children_finished = 0
def receiveMsg_list(self, msg, sender):
self.troupe_work_in_progress = True
if not hasattr(self, "helper"):
self.helper = self.createActor(
TertiaryActor
)
for data in msg:
self.child_count += 1
self.send(
self.helper,
{"from": self.myAddress, "data":data}
)
def receiveMsg_str(self, msg, sender):
self.children_finished += 1
if self.children_finished == self.child_count:
self.troupe_work_in_progress = False
@troupe(max_count=200, idle_count=1)
class TertiaryActor(ActorTypeDispatcher):
def receiveMsg_dict(self, msg, sender):
qa = self.createActor(
QuaternaryActor,
globalName="quaternay"
)
self.send(
qa,
msg["data"]
)
self.send(msg["from"], "done!")
@troupe(max_count=200, idle_count=1)
class QuaternaryActor(ActorTypeDispatcher):
def receiveMsg_int(self, msg, sender):
logging.info("Received message number {0}".format(msg))
thespian_system = ActorSystem(
"multiprocTCPBase",
{},
logDefs=log_config("bug_check_1.log", "bug_check_2.log")
)
try:
primary_actor = thespian_system.createActor(PrimaryActor)
quaternary_actor = thespian_system.createActor(
QuaternaryActor,
globalName="quaternay"
)
print('telling',primary_actor)
thespian_system.tell(primary_actor, "go")
time.sleep(9)
print('leaving')
finally:
thespian_system.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment