Skip to content

Instantly share code, notes, and snippets.

@RomaKoks
Created July 17, 2017 12:09
Show Gist options
  • Save RomaKoks/56d628cf14c5b800cd8526839e505e7d to your computer and use it in GitHub Desktop.
Save RomaKoks/56d628cf14c5b800cd8526839e505e7d to your computer and use it in GitHub Desktop.
from threading import Thread
import json
from subprocess import Popen, PIPE
import sys
import time
import subprocess
import psutil
import pika
import traceback
try:
N_WORKERS = int(sys.argv[1])
except:
N_WORKERS = 1
def kill(proc_pid):
try:
process = psutil.Process(proc_pid)
for proc in process.children(recursive=True):
proc.kill()
process.kill()
except:
print("fail to kill process, maybe it was already dead")
class Worker(Thread):
def __init__(self,task_queue_name, result_queue_name):
super(Worker, self).__init__()
self.daemon = True
self.cancelled = False
print("worker creation")
self.result_queue = result_queue_name
self.task_queue = task_queue_name
self.proc = None
self.is_running = False
self.stdout = []
print("worker created")
def run(self):
print("worker running")
self.is_running = True
cmd = ("python fib.py {0} {1}".format(self.task_queue, self.result_queue))
print(cmd)
proc = Popen(cmd.split(" "), stdout=PIPE, stderr=PIPE, universal_newlines=True)
self.proc = proc
print("PID --------------------- ",proc.pid)
for stdout_line in iter(proc.stdout.readline, ""):
self.stdout.append(stdout_line)
print(stdout_line.strip())
sys.stdout.flush()
if not self.is_running:
break
proc.stdout.close()
return_code = proc.wait()
self.is_running = False
if return_code:
print(subprocess.CalledProcessError(return_code, cmd))
def stop(self):
print("Stop worker")
if self.proc:
kill(self.proc.pid)
self.proc = None
def __del__(self):
print("Bye worker")
if self.proc:
self.stop()
def cancel(self):
self.cancelled = True
class Daemon(object):
def __init__(self):
credentials = pika.PlainCredentials('roman', 'qwerty')
connect = False
while not connect:
try:
print("try to connect...")
self.connection = pika.BlockingConnection(pika.ConnectionParameters('termosim.tgtoil.com',4041,'scheduler',credentials))
self.channel = self.connection.channel()
connect = True
except:
connect = False
time.sleep(0.2)
self.channel.exchange_declare(exchange="daemons",
type="fanout")
self.daemons_queue = self.channel.queue_declare(exclusive=True, auto_delete=True)
self.channel.queue_bind(exchange='daemons',
queue=self.daemons_queue.method.queue)
self.msgs = []
print(self.daemons_queue)
start = dict(
action='new_daemon',
result_queue=self.daemons_queue.method.queue
)
self.channel.basic_publish(exchange='',
routing_key='manager',
body=json.dumps(start, sort_keys=True))
self.channel.basic_consume(self.__callback,
queue=self.daemons_queue.method.queue)
self.workers = dict()
self.channel.start_consuming()
def create_workers(self,result_queue,task_queue):
self.workers[result_queue] = list()
for i in range(N_WORKERS):
w = Worker(task_queue, result_queue)
w.start()
self.workers[result_queue].append(w)
def __callback(self,ch, method, properties, body):
print("new message")
try:
data = json.loads(body.decode("utf-8"))
print(data)
action = data["action"]
if action == "start":
task_queue = data["task_queue"]
result_queue = data["result_queue"]
create_workers = Thread(target=self.create_workers(result_queue, task_queue))
create_workers.daemon = True
create_workers.name = result_queue
create_workers.start()
create_workers._stop()
print(task_queue, result_queue)
elif action == "stop":
result_queue = data["result_queue"]
workers_for_remove = self.workers.pop(result_queue)
for worker in workers_for_remove:
worker.cancel()
worker.stop()
ch.basic_ack(delivery_tag=method.delivery_tag)
except:
e = sys.exc_info()[0]
print(e)
traceback.print_exc()
if __name__ == "__main__":
sys.stdout.flush()
daemon = Daemon()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment