Last active
July 16, 2021 23:36
-
-
Save abduhbm/566a2a539f1d8ed401cb72661903e749 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from collections import defaultdict | |
from concurrent.futures import ThreadPoolExecutor | |
from timeit import default_timer | |
import click | |
from notifiers import notify | |
from distributed.diagnostics.plugin import SchedulerPlugin | |
from distributed.utils import format_time | |
PAYLOADS = { | |
"pushover": { | |
"user": os.environ.pop("PUSHOVER_USER_TOKEN"), | |
"token": os.environ.pop("PUSHOVER_APP_TOKEN"), | |
"title": "Tasks Completed.", | |
"sound": "siren", | |
}, | |
"slack": {"webhook_url": os.environ.pop("SLACK_WEBHOOK_URL")}, | |
} | |
def send_notification(srv, payload, msg): | |
payload["message"] = msg | |
notify(srv, **payload) | |
class MyPlugin(SchedulerPlugin): | |
def __init__(self, scheduler): | |
self.counter = 0 | |
self.scheduler = scheduler | |
self.action_time = defaultdict(int) | |
self.duration = 0 | |
def transition(self, key, start, finish, *args, **kwargs): | |
if self.duration == 0: | |
self.duration = default_timer() | |
if start == "processing": | |
if key not in self.scheduler.tasks: | |
return | |
if finish == "memory": | |
self.counter += 1 | |
startstops = kwargs.get("startstops", []) | |
for x in startstops: | |
self.action_time[x["action"]] += x["stop"] - x["start"] | |
if self.counter == len(self.scheduler.tasks): | |
self.duration = default_timer() - self.duration | |
ts_str = "" | |
for k in sorted(self.action_time.keys()): | |
ts_str += f"\n{k} time: {format_time(self.action_time[k])}" | |
message = ( | |
f"Completed in-memory: " | |
f"{self.counter} tasks" | |
f"\n@ scheduler {self.scheduler.address}" | |
f"\n{ts_str}" | |
f"\nduration: {format_time(self.duration)}" | |
) | |
with ThreadPoolExecutor(max_workers=5) as executor: | |
_ = [ | |
executor.submit(send_notification, k, v, message) | |
for k, v in PAYLOADS.items() | |
] | |
self.counter = 0 | |
self.duration = 0 | |
@click.command() | |
def dask_setup(scheduler): | |
plugin = MyPlugin(scheduler) | |
scheduler.add_plugin(plugin) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment