Skip to content

Instantly share code, notes, and snippets.

@Ujang360
Created December 15, 2019 10:51
Show Gist options
  • Save Ujang360/9ef60c7208e483bcf9d0b2a60b4627b2 to your computer and use it in GitHub Desktop.
Save Ujang360/9ef60c7208e483bcf9d0b2a60b4627b2 to your computer and use it in GitHub Desktop.
Python Multiprocess Fucker
#!/usr/bin/env python3.7
import signal
import sys
import json
import logging
import time
import multiprocessing as mp
import multiprocessing.connection as mp_conn
logger = logging.getLogger(__name__)
logger.propagate = (
False # This is to remove auto propagation for module logger when root exists
)
logger.setLevel(logging.DEBUG)
log_handler = logging.StreamHandler()
log_handler.setFormatter(
logging.Formatter("%(asctime)s|%(levelname)-8s|%(name)-20s => %(message)s")
)
log_handler.formatter.converter = time.gmtime
log_handler.formatter.datefmt = "%Y-%m-%dT%H:%M:%S%z"
logger.addHandler(log_handler)
class MPWatcher(object):
_process: mp.Process
_signal_tx: mp_conn.Connection
_name: str
_interval: float
_loop_callback = None
def __init__(self, name: str, interval: float, loop_callback):
self._name = name
self._interval = interval
self._loop_callback = loop_callback
@staticmethod
def watcher_loop(signal_rx: mp_conn.Connection, interval: float, loop_callback):
signal.signal(signal.SIGINT, empty_signal_handler)
signal.signal(signal.SIGTERM, empty_signal_handler)
loop_callback()
while not signal_rx.poll(timeout=interval):
loop_callback()
def start(self):
logger.info(f"Starting {self._name}...")
mp.set_start_method("forkserver", force=True)
(signal_rx, signal_tx) = mp.Pipe(duplex=False)
self._signal_tx = signal_tx
self._process = mp.Process(
target=MPWatcher.watcher_loop,
name=self._name,
args=[signal_rx, self._interval, self._loop_callback],
)
self._process.start()
logger.info(f"{self._name} started")
def stop(self):
logger.info(f"Stopping {self._name}...")
self._signal_tx.send(None)
# Since the caller of this stop method is the OS, then we can't call JOIN
while self._process.is_alive():
time.sleep(1)
logger.info(f"{self._name} stopped")
def do_something_periodic():
print("FUCK")
some_watcher = MPWatcher("FUCKER", 5, do_something_periodic)
def start_server():
logger.info("Starting services...")
some_watcher.start()
logger.info("Starting services DONE")
signal.pause()
def stop_server():
logger.info("Stopping services...")
some_watcher.stop()
logger.info("Stopping services DONE")
def signal_handler(sig, frame):
register_signal(True)
print() # aesthetic hack, do not remove
logger.info(f"{signal.Signals(sig).name} caught, stopping sequence initiated...")
stop_server()
logger.info("Exiting...")
sys.exit(0)
def empty_signal_handler(sig, frame):
pass
def register_signal(clear=False):
if clear:
signal.signal(signal.SIGINT, empty_signal_handler)
signal.signal(signal.SIGTERM, empty_signal_handler)
else:
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == "__main__":
register_signal()
start_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment