Created
December 15, 2019 10:51
-
-
Save Ujang360/9ef60c7208e483bcf9d0b2a60b4627b2 to your computer and use it in GitHub Desktop.
Python Multiprocess Fucker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.7 | |
import signal | |
import sys | |
import json | |
import logging | |
import time | |
import multiprocessing as mp | |
import multiprocessing.connection as mp_conn | |
logger = logging.getLogger(__name__) | |
logger.propagate = ( | |
False # This is to remove auto propagation for module logger when root exists | |
) | |
logger.setLevel(logging.DEBUG) | |
log_handler = logging.StreamHandler() | |
log_handler.setFormatter( | |
logging.Formatter("%(asctime)s|%(levelname)-8s|%(name)-20s => %(message)s") | |
) | |
log_handler.formatter.converter = time.gmtime | |
log_handler.formatter.datefmt = "%Y-%m-%dT%H:%M:%S%z" | |
logger.addHandler(log_handler) | |
class MPWatcher(object): | |
_process: mp.Process | |
_signal_tx: mp_conn.Connection | |
_name: str | |
_interval: float | |
_loop_callback = None | |
def __init__(self, name: str, interval: float, loop_callback): | |
self._name = name | |
self._interval = interval | |
self._loop_callback = loop_callback | |
@staticmethod | |
def watcher_loop(signal_rx: mp_conn.Connection, interval: float, loop_callback): | |
signal.signal(signal.SIGINT, empty_signal_handler) | |
signal.signal(signal.SIGTERM, empty_signal_handler) | |
loop_callback() | |
while not signal_rx.poll(timeout=interval): | |
loop_callback() | |
def start(self): | |
logger.info(f"Starting {self._name}...") | |
mp.set_start_method("forkserver", force=True) | |
(signal_rx, signal_tx) = mp.Pipe(duplex=False) | |
self._signal_tx = signal_tx | |
self._process = mp.Process( | |
target=MPWatcher.watcher_loop, | |
name=self._name, | |
args=[signal_rx, self._interval, self._loop_callback], | |
) | |
self._process.start() | |
logger.info(f"{self._name} started") | |
def stop(self): | |
logger.info(f"Stopping {self._name}...") | |
self._signal_tx.send(None) | |
# Since the caller of this stop method is the OS, then we can't call JOIN | |
while self._process.is_alive(): | |
time.sleep(1) | |
logger.info(f"{self._name} stopped") | |
def do_something_periodic(): | |
print("FUCK") | |
some_watcher = MPWatcher("FUCKER", 5, do_something_periodic) | |
def start_server(): | |
logger.info("Starting services...") | |
some_watcher.start() | |
logger.info("Starting services DONE") | |
signal.pause() | |
def stop_server(): | |
logger.info("Stopping services...") | |
some_watcher.stop() | |
logger.info("Stopping services DONE") | |
def signal_handler(sig, frame): | |
register_signal(True) | |
print() # aesthetic hack, do not remove | |
logger.info(f"{signal.Signals(sig).name} caught, stopping sequence initiated...") | |
stop_server() | |
logger.info("Exiting...") | |
sys.exit(0) | |
def empty_signal_handler(sig, frame): | |
pass | |
def register_signal(clear=False): | |
if clear: | |
signal.signal(signal.SIGINT, empty_signal_handler) | |
signal.signal(signal.SIGTERM, empty_signal_handler) | |
else: | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
if __name__ == "__main__": | |
register_signal() | |
start_server() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment