Skip to content

Instantly share code, notes, and snippets.

@dpineiden
Created December 31, 2018 15:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dpineiden/300e0ac00e305b69a19443f0061b75ff to your computer and use it in GitHub Desktop.
Save dpineiden/300e0ac00e305b69a19443f0061b75ff to your computer and use it in GitHub Desktop.
Object declared on a multiprocess manager
import socket
import asyncio
import uvloop
import concurrent.futures
import functools
import sys
from multiprocessing import Manager
from networktools.colorprint import gprint, bprint, rprint
from multiprocessing.managers import SyncManager
from socketmanager.services import Service, ServiceManager
from tasktools.taskloop import (coromask,
renew,
simple_fargs,
simple_fargs_out)
from networktools.library import (my_random_string,
choice_input,
check_type)
class SharedManager(SyncManager):
pass
"""
This Script shows how to enable Service manager to
share services among processes
"""
SharedManager.register('ServiceManager', ServiceManager)
SharedManager.register('Service', Service)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def hola():
servicios = {
'pineiden':{'port': 6688,
'host': '10.54.218.13',
'name': 'pineiden'},
'collector':{'port': 6688,
'host': '10.54.217.15',
'name': 'collector'},
'datawork':{'port': 6688,
'host': '10.54.218.66',
'name': 'datawork'},
}
choices = {i: service for i, service in enumerate(servicios.keys())}
a,b,c = choice_input(choices)
print(a,b,c)
def hola_task(servicios):
loop = asyncio.get_event_loop()
try:
args = [servicios]
# Create instances
task = loop.create_task(
coromask(
hola,
args,
simple_fargs_out)
)
task.add_done_callback(
functools.partial(
renew,
task,
hola,
simple_fargs_out)
)
if not loop.is_running():
loop.run_forever()
except Exception as exec:
raise exec
class MultiService:
def __init__(self,
service_manager: 'Shared Service Manager',
idt_list: 'Shared list',
flag_dict: 'Shared dict',
*args,
**kwargs):
# shared service manager
self.sm = service_manager
self.idt = idt_list
self.flags = flag_dict
# time to sleep
self.ts = 5
print(self.sm.__dict__.keys())
sm0 = self.sm
[print(method_name) for method_name in dir(sm0) if callable(getattr(sm0, method_name))]
async def select_service(self, service_map: dict,
count,
*args):
print("Select Service")
print(self.sm)
bprint("-=-"*30)
for i in range(10):
print("Select service | count -> %s" % count)
choices = {i: service for i, service in enumerate(service_map.keys())}
command, key, msg_type = choice_input(choices)
service = service_map.get(command)
idt_dict = {i: idt for i, idt in enumerate(self.idt)}
if not idt_dict:
idt_dict.update({'default':None})
print("Select idt, assign to process")
print(idt_dict)
print("-"*30)
command_idt, key, msg_type = choice_input(idt_dict)
status_value_str = input("Give me status value to IDT %s\n" %
command_idt)
status_value = check_type(status_value_str)
# add command_service to self.sm
name = service.get('name')
exist_service = self.sm.find_services('name', name)
if not exist_service:
print("Service not exist")
self.sm.add_service(**service)
print(self.sm.get_services())
# set flag on command_idt as status_value
self.flags.update({command_idt: status_value})
await asyncio.sleep(self.ts)
count += 1
return [service_map]
def select_service_task(self, service_map):
loop = asyncio.get_event_loop()
rprint("=================================")
gprint("Gestionando mensajes en Select Service")
rprint("=================================")
try:
args = [service_map, 0]
# Create instances
task = loop.create_task(
coromask(
self.select_service,
args,
simple_fargs_out)
)
task.add_done_callback(
functools.partial(
renew,
task,
self.select_service,
simple_fargs_out)
)
if not loop.is_running():
loop.run_forever()
except Exception as exec:
raise exec
async def show_service(self, xprint, sign, idt, *args):
if self.flags.get(idt):
await asyncio.sleep(self.ts)
xprint(sign*20)
[print(s) for s in self.sm.get_services()]
xprint(sign*20)
return [xprint, sign, idt]
def show_service_task(self,
xprint: print,
sign: str,
idt: str):
loop = asyncio.get_event_loop()
rprint("=================================")
gprint("Gestionando mensajes en engine")
rprint("xprint -> %s" % xprint)
bprint("sign -> %s" % sign)
rprint("=================================")
try:
args = [xprint, sign, idt]
# Create instances
task = loop.create_task(
coromask(
self.show_service,
args,
simple_fargs_out)
)
task.add_done_callback(
functools.partial(
renew,
task,
self.show_service,
simple_fargs_out)
)
if not loop.is_running():
loop.run_forever()
except Exception as exec:
raise exec
def main():
workers = 5
host = socket.gethostbyname(socket.gethostname())
host_port = 9876
address = (host, host_port)
executor = concurrent.futures.ProcessPoolExecutor(workers)
servicios = {
'pineiden':{'port': 6688,
'host': '10.54.218.13',
'name': 'pineiden'},
'collector':{'port': 6688,
'host': '10.54.217.15',
'name': 'collector'},
'datawork':{'port': 6688,
'host': '10.54.218.66',
'name': 'datawork'},
}
idts = {
'A0': ('=-', bprint),
'A1': ('(o)', rprint),
'A2': ('|=|-', gprint)
}
with SharedManager() as manager:
loop = asyncio.get_event_loop()
service_manager = manager.ServiceManager()
idt_list = manager.list(idts.keys())
flags = manager.dict()
flags.update({'default':None})
multis = MultiService(service_manager, idt_list, flags)
task_list = []
for idt, group in idts.items():
sign = group[0]
xprint = group[1]
args = [xprint, sign, idt]
show_task = loop.run_in_executor(
executor,
functools.partial(multis.show_service_task,
*args))
task_list.append(show_task)
select_task = asyncio.ensure_future(multis.select_service(servicios, 0))
task_list.append(select_task)
loop.run_until_complete(asyncio.gather(*task_list))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment