#!/usr/bin/python -i | |
from concurrent import futures | |
import logging | |
import time | |
from os import getpid | |
import datetime | |
from circuits import Component, Event, Debugger, handler, Worker, BaseComponent | |
from circuits.web import Server, Logger, Controller | |
import concurrent.futures | |
import pykka | |
import json | |
# Logging | |
logging.basicConfig( | |
format='%(asctime)s %(name)s %(levelname)s %(process)d %(thread)d %(funcName)s() %(threadName)s %(message)s', | |
datefmt='%m/%d/%Y %I:%M:%S %p' | |
) | |
logger = logging.getLogger("test") | |
logger.setLevel(logging.DEBUG) | |
logging.getLogger().setLevel(logging.DEBUG) | |
# Events | |
## | |
class work(Event): | |
""" | |
There is work to do | |
""" | |
complete = True | |
success = True | |
class dm_hello(Event): | |
""" | |
Hello Deployment Master | |
""" | |
complete = True | |
success = True | |
# Deployement task | |
## | |
def deploy(): | |
""" | |
Ugly and heavy 20-30 minutes long task | |
:return: | |
""" | |
logger.info("Started deployment") | |
for n in range(1, 20): | |
logger.info(n) | |
logger.info(datetime.datetime.now()) | |
time.sleep(1) | |
logger.info("End of deployement") | |
return n | |
# Actors | |
## | |
class Pet(pykka.ThreadingActor): | |
""" | |
I do job | |
""" | |
_executor = None | |
_in_future = None | |
_component = None | |
class CircuitClient(BaseComponent): | |
""" | |
Pet's way to send/receive event | |
""" | |
_parent = None | |
_actor = None | |
def init(self, parent, actor): | |
logger.info("Pet circuits client is starting") | |
#Add A CircuitClient to the main app composition | |
self.register(parent) | |
self._parent = parent | |
self._actor = actor | |
def started(self, manager): | |
logger.info("Pet circuits client started") | |
def get_ref(self): | |
logger.info("Pet circuits client get_ref()") | |
return self | |
def __init__(self, component): | |
logger.info("Pet is starting") | |
super(Pet, self).__init__() | |
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) | |
self._in_future = self.actor_ref.proxy() | |
self._component = Pet.CircuitClient(parent=component, actor=self._in_future).register(component) | |
def on_start(self): | |
logger.info("Pet is started") | |
def deploy(self): | |
""" | |
Deploy stuff | |
:return: | |
""" | |
#start deployement | |
future = self._executor.submit(deploy) | |
#Tell the world how happy we are | |
self._component.fire(work()) | |
self._component.fire(dm_hello()) | |
self._component.get_ref() | |
#I must go to slepp for eternity | |
self.stop() | |
return future.result() | |
def on_stop(self): | |
self._component.stop() | |
self._component.unregister() | |
class Master(pykka.ThreadingActor): | |
""" | |
I'am the one above all | |
""" | |
_component = None | |
_client = None | |
_in_future = None | |
class CircuitClient(BaseComponent): | |
_parent = None | |
_actor = None | |
def init(self, parent, actor): | |
logger.info("Master circuits client is starting") | |
#Add A CircuitClient to the main app composition | |
self.register(parent) | |
self._parent = parent | |
self._actor = actor | |
def started(self, manager): | |
logger.info("Master circuits client started") | |
def _get_ref(self): | |
logger.info("Master circuits client get_ref()") | |
return self | |
@handler("dm_hello") | |
def dm_hello(self): | |
logger.info("Master circuits client dm_hello()") | |
self._actor.hello() | |
def __init__(self, component): | |
super(Master, self).__init__() | |
self._component = component | |
self._in_future = self.actor_ref.proxy() | |
self._client = Master.CircuitClient(parent=component, actor=self._in_future).register(component) | |
def create_deployment(self): | |
self._client._get_ref() | |
pet = Pet | |
pet.use_daemon_thread = True | |
pet = pet.start(self._component) | |
logger.info(pet.actor_urn) | |
return pet.actor_urn | |
def hello(self): | |
logger.info("I'm a lion in a strange land.") | |
# Main app | |
## | |
class App(Component): | |
""" | |
Das main server | |
""" | |
_master = None | |
def init(self): | |
logger.info("Main app is starting") | |
Debugger().register(self) | |
#Create master actors | |
self._master = Master.start(component=self).proxy() | |
#Start webapp | |
Server(('0.0.0.0', 11111)).register(self) | |
Logger().register(self) | |
JSONSerializer().register(self) | |
MyController(master_actor=self._master).register(self) | |
def started(self, manager): | |
logger.info("Webservice started") | |
def signal(self,signo, stack): | |
logger.info("Signal received") | |
if signo == 2: | |
logger.info("Ending actors") | |
pykka.ActorRegistry.stop_all() | |
def work(self): | |
logger.info("Yes my lord ...") | |
def exception(self, value, traceback, handler=None, fevent=None): | |
pykka.ActorRegistry.stop_all() | |
self.stop() | |
# Json webservice helper | |
## | |
class JSONSerializer(Component): | |
channel = "web" | |
# 1 higher than the default response handler | |
@handler("response", priority=1.0) | |
def serialize_response_body(self, response): | |
response.headers["Content-Type"] = "application/json" | |
response.body = json.dumps(response.body) | |
class MyController(Controller): | |
channel = "/deployable" | |
_worker = Worker() | |
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) | |
_master = None | |
def __init__(self, master_actor, *args, **kwargs): | |
super(MyController, self).__init__(*args, **kwargs) | |
self._master = master_actor | |
def POST(self, *args, **kwargs): | |
data = str(self.request.body.read().decode()) | |
logger.info(data) | |
logger.info("Start Task") | |
pet = self._master.create_deployment().get() | |
logger.info(pet) | |
actor = pykka.ActorRegistry.get_by_urn(pet) | |
proxy = pykka.ActorProxy(actor) | |
proxy.deploy() | |
return { | |
"yoda": "To the hand you talk, because listening i'm not.", | |
"pid": getpid() | |
} | |
if __name__ == "__main__": | |
try: | |
app = App() | |
app.run() | |
except KeyboardInterrupt: | |
app.stop(0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment