Skip to content

Instantly share code, notes, and snippets.

@amir734jj
Last active June 12, 2018 17:44
Show Gist options
  • Save amir734jj/6d08e4c4b5bc036c27b4be38f63b433d to your computer and use it in GitHub Desktop.
Save amir734jj/6d08e4c4b5bc036c27b4be38f63b433d to your computer and use it in GitHub Desktop.
import asyncio
from time import sleep
from threading import Thread, Lock
from .task_model import Task
from typing import List
lock = Lock()
class TaskManager:
def __init__(self):
self.tasks = list()
self.tasks.append(Task("Init", "print('Heartbeat ...')"))
global lock
self.lock: Lock = lock
Thread(target=lambda x: self.start_loop(), args=([""])).start()
def add_task(self, name: str, task_string: str) -> None:
self.tasks.append(Task(name, task_string))
def running_loop(self) -> None:
with self.lock:
while True:
# To prevent infinite loop going wild
sleep(1)
print(len(self.tasks))
for task in list(self.tasks):
task_string = task.script
# Safely sandbox the script
exec(task_string, {}, {})
sleep(1)
def start_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.running_loop())
def get_tasks(self) -> List[Task]:
return self.tasks
def delete_task(self, id_value: str) -> None:
with self.lock:
for task in self.tasks:
if task.id == id_value:
self.tasks.remove(task)
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment