Skip to content

Instantly share code, notes, and snippets.

@Lh4cKg
Created May 25, 2020 10:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lh4cKg/4d80d697f4b376a1161fb3965d084909 to your computer and use it in GitHub Desktop.
Save Lh4cKg/4d80d697f4b376a1161fb3965d084909 to your computer and use it in GitHub Desktop.
Python Multiprocessing Workers Architecture
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
import time
import json
import logging
from .workers import Worker
logger = logging.getLogger(__name__)
class BaseProcess(Worker):
def __init__(self, manager):
super().__init__(manager)
logger.info('---- START WORKER ----')
def df_processing(self, *args, **kwargs):
raise NotImplementedError
def execute(self):
while True:
logger.info(f'QUEUE SIZE: {self.queue.qsize()}')
value = self.queue.get()
if value:
s = time.time()
try:
self.df_processing(value)
logger.info(f'QUEUE EXECUTION TIME: {time.time()-s}')
except Exception as e:
logger.exception(e)
else:
time.sleep(0.1)
class YourProcessClass(BaseProcess):
def df_processing(self, value, **kwargs):
# write logic here
import logging
from . import settings
from .managers import QueueManager
logger = logging.getLogger(__name__)
def get_queue(worker_host, worker_port, auth_key):
"""
:param worker_host:
:param worker_port:
:param auth_key:
:return:
"""
QueueManager.register('get_queue')
global_manager = QueueManager(
address=(worker_host, worker_port),
authkey=auth_key
)
global_manager.connect()
return global_manager.get_queue()
def serve_queue():
"""
:return:
"""
manager = QueueManager(
address=(
settings.WORKER_ADDRESS_HOST, settings.WORKER_ADDRESS_PORT
),
authkey=settings.WORKER_AUTH_KEY
)
server = manager.get_server()
logger.info('---- RUNNING SERVER ----')
server.serve_forever()
logger.info('---- END SERVER ----')
def get_server_manager():
QueueManager.register('get_queue')
manager = QueueManager(
address=(
settings.WORKER_ADDRESS_HOST, settings.WORKER_ADDRESS_PORT
),
authkey=settings.WORKER_AUTH_KEY
)
manager.connect()
return manager
import time
import logging
import multiprocessing as mp
from queue import Queue
from .managers import QueueManager
from .queues import serve_queue, get_server_manager
from .workers import init_workers, spawning
logger = logging.getLogger(__name__)
class Register:
def __init__(self, register=None):
if register:
if callable(register):
register()
else:
logger.info('Register is not callable')
self.default_register()
else:
self.default_register()
@staticmethod
def default_register():
"""
:return:
"""
queue = Queue()
QueueManager.register('get_queue', lambda: queue)
mp.Process(target=serve_queue, daemon=True).start()
time.sleep(1)
kw = {'manager': get_server_manager()}
init_workers(**kw)
while True:
spawning()
PROCESS_CLASS = 'module_name.process.YourProcessClass'
NUMBER_OF_WORKERS = 10
WORKER_ADDRESS_HOST = '127.0.0.1'
WORKER_ADDRESS_PORT = 50000
import os
import time
import logging
import multiprocessing as mp
from ctypes import c_int64
from . import settings
from .utils import import_string
logger = logging.getLogger(__name__)
class Worker:
def __init__(self, manager):
self.manager = manager
self.queue = self.manager.get_queue()
def execute(self):
raise NotImplementedError
def start(self):
self.execute()
def start_process(**kw):
matching = import_string(settings.PROCESS_CLASS)(manager=kw['manager'])
mp.Process(target=matching.start, daemon=True).start()
def spawning(**kw):
pid, status = os.waitpid(-1, os.WNOHANG)
if pid:
start_process(**kw)
else:
time.sleep(0.1)
def init_workers(**kw):
logger.info(f'Number Of Workers: {settings.NUMBER_OF_WORKERS}')
for _ in range(settings.NUMBER_OF_WORKERS):
start_process(**kw)
def create_proxy_objects(m):
# global current_time
current_time = time.time()
return dict(current_time=m.Value(c_int64, current_time))
# if __name__ == '__main__':
# queue = Queue()
# QueueManager.register('get_queue', lambda: queue)
# mp.Process(target=serve_queue, daemon=True).start()
# time.sleep(1)
# kw = {'manager': get_server_manager()}
# init_workers(**kw)
# while True:
# spawning()
# workers_register()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment