Skip to content

Instantly share code, notes, and snippets.

@ncclementi
Last active June 11, 2021 13:05
Show Gist options
  • Save ncclementi/c92c97671f3b58c514e2f51e89575ef5 to your computer and use it in GitHub Desktop.
Save ncclementi/c92c97671f3b58c514e2f51e89575ef5 to your computer and use it in GitHub Desktop.
tg_viz.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {
"trusted": true
},
"id": "13bb651d",
"cell_type": "code",
"source": "from dask.distributed import Client, wait\nclient = Client()\nclient",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "4dff4738",
"cell_type": "code",
"source": "import dask.array as da",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "f1ceb0f7",
"cell_type": "code",
"source": "# w = da.random.random((10000, 10000))\n# t = w + w.T - w.mean(axis=0) + w.var(axis=0)\n# t = t.persist()\n# wait(t);",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "c2e146d4",
"cell_type": "code",
"source": "#set(client.cluster.scheduler.task_groups.keys())",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "84c8c335",
"cell_type": "code",
"source": "from distributed.dashboard.utils import (\n without_property_validation,\n)\nfrom distributed.utils import log_errors\nfrom distributed.diagnostics.tg_graph_utils import toposort_layers\nfrom distributed.diagnostics.progress_stream import color_of",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "7a89538c",
"cell_type": "code",
"source": "class TGroupGraph():\n \"\"\"\n Task Group Graph\n \"\"\"\n\n def __init__(self, scheduler, **kwargs):\n self.scheduler = scheduler\n self.current_groups = set()\n\n self.layout = {}\n\n self.node_source = ColumnDataSource(\n {\n \"x\": [],\n \"y\": [],\n \"name\": [],\n \"name_short\": [],\n \"tot_tasks\": [],\n \"colors\": [],\n# \"alpha\": [],\n }\n )\n \n self.alpha_source = ColumnDataSource({\"x\": [],\n \"y\": [],\n \"color\": [],\n \"alpha\": [],})\n\n self.arrow_source = ColumnDataSource({\"xs\": [], \"ys\": [], \"xe\": [], \"ye\": []})\n\n self.root = figure(title=\"Task Groups Graph\", **kwargs)\n self.subtitle = Title(text=\" \", text_font_style=\"italic\")\n self.root.add_layout(self.subtitle, \"above\")\n\n #Use rectanle instead of square\n w = 5\n h = 0.5\n rect = self.root.rect(\n x=\"x\",\n y=\"y\",\n width= w,\n height=h,\n color=\"colors\",\n line_color=\"black\", \n fill_alpha=0.1,\n source=self.node_source,\n )\n \n w = 4\n h = 0.25\n \n rect2 = self.root.rect(\n x=\"x\",\n y=\"y\",\n width= w,\n height=h,\n color=\"color\",\n fill_alpha= \"alpha\",\n source=self.alpha_source,\n )\n \n\n self.arrows = Arrow(\n end=VeeHead(size=10),\n line_color=\"black\",\n line_alpha=0.5,\n line_width=1,\n x_start=\"xs\",\n y_start=\"ys\",\n x_end=\"xe\",\n y_end=\"ye\",\n source=self.arrow_source,\n )\n self.root.add_layout(self.arrows)\n\n self.labels = LabelSet(\n x=\"x\",\n y=\"y\",\n x_offset=-10, \n y_offset=20, \n text=\"name_short\",\n text_align=\"left\",\n source=self.node_source,\n background_fill_color=None,\n ) # We probably need an offset like x_offset=-1, also need to chop the name somehow\n self.root.add_layout(self.labels)\n\n self.root.xgrid.grid_line_color = None\n self.root.ygrid.grid_line_color = None\n self.root.x_range.range_padding = 0.5\n self.root.y_range.range_padding = 0.5\n\n hover = HoverTool(\n point_policy=\"snap_to_data\",\n tooltips=[(\"tg\", \"@name\"), (\"num_task\", \"@tot_tasks\")],\n #tooltips=[(\"tg\", \"@name\"), (\"num_task\", \"@tot_tasks\"), (\"frac\", \"@alpha\")],\n renderers=[rect],\n )\n \n hover_alpha = HoverTool(\n point_policy=\"follow_mouse\",\n tooltips=[(\"frac\", \"@alpha\")],\n renderers=[rect2],\n )\n \n self.root.add_tools(hover)\n self.root.add_tools(hover_alpha)\n\n \n \n @without_property_validation\n def update_layout(self):\n \n with log_errors():\n # Always update for now -- TODO: split up layout and internal node updates\n if self.scheduler.task_groups.keys():\n # Update current set of task groups\n self.current_groups = set(self.scheduler.task_groups.keys())\n\n # get dependecies per task group\n dependencies = {\n k: [\n ds.name for ds in ts.dependencies if ds.name != k\n ] # in some cases there are tg that have themeselves as\n for k, ts in self.scheduler.task_groups.items() # dependencies, we remove those.\n }\n\n\n # get dependents per task group\n dependents = {k: [] for k in dependencies}\n for k, v in dependencies.items():\n for dep in v:\n dependents[dep].append(k)\n\n stack_order = toposort_layers(dependencies)\n stack_it = stack_order[::-1].copy()\n\n layout_utils = {\"nstart\": [],\n \"nend\": [],\n \"tg_stack\": []}\n y_next = 0\n collision = {}\n \n for key in stack_order:\n self.layout[key] = {}\n \n while stack_it:\n tg = stack_it.pop() \n \n# self.layout[\"alpha\"][tg] = completed[tg]\n if not dependencies[tg]:\n self.layout[tg][\"x\"] = 0\n self.layout[tg][\"y\"] = y_next\n y_next += 3\n else:\n self.layout[tg][\"x\"] = (\n max(self.layout[dep][\"x\"] for dep in dependencies[tg]) + 1 + 5\n )\n\n sort_dependents = [\n ele for ele in stack_order if ele in dependents[tg]\n ]\n \n for dep in sort_dependents:\n if \"y\" in self.layout[dep]:\n continue\n else:\n self.layout[dep][\"y\"] = self.layout[tg][\"y\"] + sort_dependents.index(dep) \n\n if (self.layout[tg][\"x\"], self.layout[tg][\"y\"]) in collision:\n \n old_x, old_y = self.layout[tg][\"x\"], self.layout[tg][\"y\"]\n self.layout[tg][\"x\"], self.layout[tg][\"y\"] = collision[(self.layout[tg][\"x\"], \n self.layout[tg][\"y\"])]\n \n self.layout[tg][\"y\"] += 0.5 ##need to change when changing size of squares.\n collision[old_x, old_y] = (\n self.layout[tg][\"x\"],\n self.layout[tg][\"y\"],\n )\n else:\n collision[\n (self.layout[tg][\"x\"], self.layout[tg][\"y\"])\n ] = (\n self.layout[tg][\"x\"],\n self.layout[tg][\"y\"],\n )\n\n layout_utils[\"nstart\"] += dependencies[tg]\n layout_utils[\"nend\"] += [tg] * len(dependencies[tg])\n layout_utils[\"tg_stack\"].append(tg)\n\n self.add_nodes_arrows(layout_utils)\n\n if not self.scheduler.task_groups:\n self.subtitle.text = \"Scheduler is empty.\"\n else:\n self.subtitle.text = \" \"\n\n @without_property_validation\n def add_nodes_arrows(self, layout_utils):\n if not layout_utils[\"tg_stack\"]:\n self.node_source.data.clear()\n self.arrow_source.data.clear()\n else:\n node_x = []\n node_y = []\n node_name = []\n node_short_name = []\n node_color = []\n #node_alpha = []\n node_tot_tasks = []\n\n # coords for arrows\n arrow_xs = [self.layout[s][\"x\"] for s in layout_utils[\"nstart\"]]\n arrow_ys = [self.layout[s][\"y\"] for s in layout_utils[\"nstart\"]]\n arrow_xe = [self.layout[e][\"x\"] for e in layout_utils[\"nend\"]]\n arrow_ye = [self.layout[e][\"y\"] for e in layout_utils[\"nend\"]]\n\n groups = self.scheduler.task_groups\n\n for key in layout_utils[\"tg_stack\"]:\n try:\n tg = groups[key]\n except KeyError:\n continue\n \n x = self.layout[key][\"x\"] #x[key] \n y = self.layout[key][\"y\"] #y[key]\n \n node_x.append(x)\n node_y.append(y)\n node_name.append(tg.prefix.name)\n node_short_name.append(\n tg.prefix.name[:10]\n ) # need to change how to choose the short name\n node_color.append(color_of(tg.prefix.name))\n #keys = set(self.layout[\"alpha\"].keys())\n # raise Exception(str(keys) + key)\n #node_alpha.append(self.layout[\"alpha\"][key])\n node_tot_tasks.append(sum(tg.states.values()))\n\n node = {\n \"x\": node_x,\n \"y\": node_y,\n \"name\": node_name,\n \"name_short\": node_short_name,\n \"colors\": node_color,\n #\"alpha\": node_alpha,\n \"tot_tasks\": node_tot_tasks,\n }\n\n arrow = {\n \"xs\": arrow_xs,\n \"ys\": arrow_ys,\n \"xe\": arrow_xe,\n \"ye\": arrow_ye,\n }\n self.node_source.data.update(node)\n self.arrow_source.data.update(arrow)\n\n \n \n @without_property_validation\n def update(self):\n if set(self.layout) != set(self.scheduler.task_groups):\n print('update layout')\n self.update_layout()\n #else:\n # print('layout = scheduler.tg, no update')\n \n data_alpha = {\n \"x\": [],\n \"y\": [],\n \"color\": [],\n \"alpha\":[],\n }\n\n completed = {\n k: 1.0\n - (\n tg.states[\"waiting\"]\n + tg.states[\"no-worker\"]\n + tg.states[\"processing\"]\n )\n / sum(tg.states.values())\n for k, tg in self.scheduler.task_groups.items()\n }\n\n for tg in self.scheduler.task_groups:\n tg_d = self.scheduler.task_groups[tg]\n x = self.layout[tg][\"x\"]\n y = self.layout[tg][\"y\"]\n\n data_alpha[\"x\"].append(x)\n data_alpha[\"y\"].append(y)\n data_alpha[\"alpha\"].append(completed[tg])\n data_alpha[\"color\"].append(color_of(tg_d.prefix.name))\n\n self.alpha_source.data.update(data_alpha)\n",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "e9231a30",
"cell_type": "code",
"source": "from bokeh.plotting import show, output_notebook\nfrom bokeh.plotting import figure\nfrom bokeh.models import (\n Arrow,\n ColumnDataSource,\n HoverTool,\n LabelSet,\n Title,\n VeeHead,\n)\noutput_notebook()",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "00d3e8e1",
"cell_type": "code",
"source": "n = da.random.random((10000, 10000))\nm = da.random.random((10000, 10000))\np = n.dot(m.T).persist()",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "5fca9397",
"cell_type": "code",
"source": "tgg=TGroupGraph(client.cluster.scheduler, sizing_mode='stretch_both')",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "09bcb813",
"cell_type": "code",
"source": "tgg.update()\nshow(tgg.root)",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "a8ee3ce6",
"cell_type": "code",
"source": "import dask.array as da",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "c64a6cac",
"cell_type": "code",
"source": "w = da.random.random((10000, 10000))\nt = w + w.T - w.mean(axis=0) + w.var(axis=0)\nt = t.persist()",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "fbf9cb34",
"cell_type": "code",
"source": "tgg.update()\nshow(tgg.root)",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"id": "f999acca",
"cell_type": "code",
"source": "",
"execution_count": null,
"outputs": []
}
],
"metadata": {
"_draft": {
"nbviewer_url": "https://gist.github.com/c92c97671f3b58c514e2f51e89575ef5"
},
"gist": {
"id": "c92c97671f3b58c514e2f51e89575ef5",
"data": {
"description": "tg_viz.ipynb",
"public": true
}
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3",
"language": "python"
},
"language_info": {
"name": "python",
"version": "3.8.10",
"mimetype": "text/x-python",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"pygments_lexer": "ipython3",
"nbconvert_exporter": "python",
"file_extension": ".py"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment