Skip to content

Instantly share code, notes, and snippets.

@abduhbm
Last active July 16, 2021 23:36
Show Gist options
  • Save abduhbm/566a2a539f1d8ed401cb72661903e749 to your computer and use it in GitHub Desktop.
Save abduhbm/566a2a539f1d8ed401cb72661903e749 to your computer and use it in GitHub Desktop.
import os
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer
import click
from notifiers import notify
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.utils import format_time
PAYLOADS = {
"pushover": {
"user": os.environ.pop("PUSHOVER_USER_TOKEN"),
"token": os.environ.pop("PUSHOVER_APP_TOKEN"),
"title": "Tasks Completed.",
"sound": "siren",
},
"slack": {"webhook_url": os.environ.pop("SLACK_WEBHOOK_URL")},
}
def send_notification(srv, payload, msg):
payload["message"] = msg
notify(srv, **payload)
class MyPlugin(SchedulerPlugin):
def __init__(self, scheduler):
self.counter = 0
self.scheduler = scheduler
self.action_time = defaultdict(int)
self.duration = 0
def transition(self, key, start, finish, *args, **kwargs):
if self.duration == 0:
self.duration = default_timer()
if start == "processing":
if key not in self.scheduler.tasks:
return
if finish == "memory":
self.counter += 1
startstops = kwargs.get("startstops", [])
for x in startstops:
self.action_time[x["action"]] += x["stop"] - x["start"]
if self.counter == len(self.scheduler.tasks):
self.duration = default_timer() - self.duration
ts_str = ""
for k in sorted(self.action_time.keys()):
ts_str += f"\n{k} time: {format_time(self.action_time[k])}"
message = (
f"Completed in-memory: "
f"{self.counter} tasks"
f"\n@ scheduler {self.scheduler.address}"
f"\n{ts_str}"
f"\nduration: {format_time(self.duration)}"
)
with ThreadPoolExecutor(max_workers=5) as executor:
_ = [
executor.submit(send_notification, k, v, message)
for k, v in PAYLOADS.items()
]
self.counter = 0
self.duration = 0
@click.command()
def dask_setup(scheduler):
plugin = MyPlugin(scheduler)
scheduler.add_plugin(plugin)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment