Created
January 18, 2019 15:31
-
-
Save dantimofte/751b2d70a337a3488055e4052b2efee3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import logging | |
import signal | |
import time | |
import asyncio | |
class BOT: | |
def __init__(self, name): | |
self.name = name | |
self.counter = 0 | |
async def of_course_i_still_love_you(self): | |
while True: | |
self.counter += 1 | |
print(f"{self.name} : {self.counter}") | |
await asyncio.sleep(1) | |
def just_read_the_instructions(self): | |
print(f'Stoping bot {self.name}') | |
class BotMaster: | |
def __init__(self): | |
signal.signal(signal.SIGINT, self.cb_sigint_handler) | |
self.counter = 0 | |
self.byob_bot = BOT('BYOB') | |
self.atwa_bot = BOT('ATWA') | |
self.task_byob = None | |
self.task_atwa = None | |
async def start_tasks(self): | |
self.task_byob = asyncio.ensure_future( self.byob_bot.of_course_i_still_love_you() ) | |
self.task_atwa = asyncio.ensure_future( self.atwa_bot.of_course_i_still_love_you() ) | |
def of_course_i_still_love_you(self): | |
while True: | |
self.counter += 1 | |
print(f"Botmaster running : {self.counter}") | |
time.sleep(1) | |
def cb_sigint_handler(self, signum, frame): | |
sys.stdout.write('\b\b\r') # delete ^C from screen | |
self.byob_bot.just_read_the_instructions() | |
self.atwa_bot.just_read_the_instructions() | |
print(f'Ctrl+C detected, Bots stoped ') | |
exit() | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
bot_master = BotMaster() | |
tasks = [ | |
bot_master.byob_bot.of_course_i_still_love_you(), | |
bot_master.atwa_bot.of_course_i_still_love_you() | |
] | |
loop.run_until_complete( | |
asyncio.wait(tasks) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment