Last active
June 11, 2021 13:05
-
-
Save ncclementi/c92c97671f3b58c514e2f51e89575ef5 to your computer and use it in GitHub Desktop.
tg_viz.ipynb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "13bb651d", | |
"cell_type": "code", | |
"source": "from dask.distributed import Client, wait\nclient = Client()\nclient", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "4dff4738", | |
"cell_type": "code", | |
"source": "import dask.array as da", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "f1ceb0f7", | |
"cell_type": "code", | |
"source": "# w = da.random.random((10000, 10000))\n# t = w + w.T - w.mean(axis=0) + w.var(axis=0)\n# t = t.persist()\n# wait(t);", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "c2e146d4", | |
"cell_type": "code", | |
"source": "#set(client.cluster.scheduler.task_groups.keys())", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "84c8c335", | |
"cell_type": "code", | |
"source": "from distributed.dashboard.utils import (\n without_property_validation,\n)\nfrom distributed.utils import log_errors\nfrom distributed.diagnostics.tg_graph_utils import toposort_layers\nfrom distributed.diagnostics.progress_stream import color_of", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "7a89538c", | |
"cell_type": "code", | |
"source": "class TGroupGraph():\n \"\"\"\n Task Group Graph\n \"\"\"\n\n def __init__(self, scheduler, **kwargs):\n self.scheduler = scheduler\n self.current_groups = set()\n\n self.layout = {}\n\n self.node_source = ColumnDataSource(\n {\n \"x\": [],\n \"y\": [],\n \"name\": [],\n \"name_short\": [],\n \"tot_tasks\": [],\n \"colors\": [],\n# \"alpha\": [],\n }\n )\n \n self.alpha_source = ColumnDataSource({\"x\": [],\n \"y\": [],\n \"color\": [],\n \"alpha\": [],})\n\n self.arrow_source = ColumnDataSource({\"xs\": [], \"ys\": [], \"xe\": [], \"ye\": []})\n\n self.root = figure(title=\"Task Groups Graph\", **kwargs)\n self.subtitle = Title(text=\" \", text_font_style=\"italic\")\n self.root.add_layout(self.subtitle, \"above\")\n\n #Use rectanle instead of square\n w = 5\n h = 0.5\n rect = self.root.rect(\n x=\"x\",\n y=\"y\",\n width= w,\n height=h,\n color=\"colors\",\n line_color=\"black\", \n fill_alpha=0.1,\n source=self.node_source,\n )\n \n w = 4\n h = 0.25\n \n rect2 = self.root.rect(\n x=\"x\",\n y=\"y\",\n width= w,\n height=h,\n color=\"color\",\n fill_alpha= \"alpha\",\n source=self.alpha_source,\n )\n \n\n self.arrows = Arrow(\n end=VeeHead(size=10),\n line_color=\"black\",\n line_alpha=0.5,\n line_width=1,\n x_start=\"xs\",\n y_start=\"ys\",\n x_end=\"xe\",\n y_end=\"ye\",\n source=self.arrow_source,\n )\n self.root.add_layout(self.arrows)\n\n self.labels = LabelSet(\n x=\"x\",\n y=\"y\",\n x_offset=-10, \n y_offset=20, \n text=\"name_short\",\n text_align=\"left\",\n source=self.node_source,\n background_fill_color=None,\n ) # We probably need an offset like x_offset=-1, also need to chop the name somehow\n self.root.add_layout(self.labels)\n\n self.root.xgrid.grid_line_color = None\n self.root.ygrid.grid_line_color = None\n self.root.x_range.range_padding = 0.5\n self.root.y_range.range_padding = 0.5\n\n hover = HoverTool(\n point_policy=\"snap_to_data\",\n tooltips=[(\"tg\", \"@name\"), (\"num_task\", \"@tot_tasks\")],\n #tooltips=[(\"tg\", \"@name\"), (\"num_task\", \"@tot_tasks\"), (\"frac\", \"@alpha\")],\n renderers=[rect],\n )\n \n hover_alpha = HoverTool(\n point_policy=\"follow_mouse\",\n tooltips=[(\"frac\", \"@alpha\")],\n renderers=[rect2],\n )\n \n self.root.add_tools(hover)\n self.root.add_tools(hover_alpha)\n\n \n \n @without_property_validation\n def update_layout(self):\n \n with log_errors():\n # Always update for now -- TODO: split up layout and internal node updates\n if self.scheduler.task_groups.keys():\n # Update current set of task groups\n self.current_groups = set(self.scheduler.task_groups.keys())\n\n # get dependecies per task group\n dependencies = {\n k: [\n ds.name for ds in ts.dependencies if ds.name != k\n ] # in some cases there are tg that have themeselves as\n for k, ts in self.scheduler.task_groups.items() # dependencies, we remove those.\n }\n\n\n # get dependents per task group\n dependents = {k: [] for k in dependencies}\n for k, v in dependencies.items():\n for dep in v:\n dependents[dep].append(k)\n\n stack_order = toposort_layers(dependencies)\n stack_it = stack_order[::-1].copy()\n\n layout_utils = {\"nstart\": [],\n \"nend\": [],\n \"tg_stack\": []}\n y_next = 0\n collision = {}\n \n for key in stack_order:\n self.layout[key] = {}\n \n while stack_it:\n tg = stack_it.pop() \n \n# self.layout[\"alpha\"][tg] = completed[tg]\n if not dependencies[tg]:\n self.layout[tg][\"x\"] = 0\n self.layout[tg][\"y\"] = y_next\n y_next += 3\n else:\n self.layout[tg][\"x\"] = (\n max(self.layout[dep][\"x\"] for dep in dependencies[tg]) + 1 + 5\n )\n\n sort_dependents = [\n ele for ele in stack_order if ele in dependents[tg]\n ]\n \n for dep in sort_dependents:\n if \"y\" in self.layout[dep]:\n continue\n else:\n self.layout[dep][\"y\"] = self.layout[tg][\"y\"] + sort_dependents.index(dep) \n\n if (self.layout[tg][\"x\"], self.layout[tg][\"y\"]) in collision:\n \n old_x, old_y = self.layout[tg][\"x\"], self.layout[tg][\"y\"]\n self.layout[tg][\"x\"], self.layout[tg][\"y\"] = collision[(self.layout[tg][\"x\"], \n self.layout[tg][\"y\"])]\n \n self.layout[tg][\"y\"] += 0.5 ##need to change when changing size of squares.\n collision[old_x, old_y] = (\n self.layout[tg][\"x\"],\n self.layout[tg][\"y\"],\n )\n else:\n collision[\n (self.layout[tg][\"x\"], self.layout[tg][\"y\"])\n ] = (\n self.layout[tg][\"x\"],\n self.layout[tg][\"y\"],\n )\n\n layout_utils[\"nstart\"] += dependencies[tg]\n layout_utils[\"nend\"] += [tg] * len(dependencies[tg])\n layout_utils[\"tg_stack\"].append(tg)\n\n self.add_nodes_arrows(layout_utils)\n\n if not self.scheduler.task_groups:\n self.subtitle.text = \"Scheduler is empty.\"\n else:\n self.subtitle.text = \" \"\n\n @without_property_validation\n def add_nodes_arrows(self, layout_utils):\n if not layout_utils[\"tg_stack\"]:\n self.node_source.data.clear()\n self.arrow_source.data.clear()\n else:\n node_x = []\n node_y = []\n node_name = []\n node_short_name = []\n node_color = []\n #node_alpha = []\n node_tot_tasks = []\n\n # coords for arrows\n arrow_xs = [self.layout[s][\"x\"] for s in layout_utils[\"nstart\"]]\n arrow_ys = [self.layout[s][\"y\"] for s in layout_utils[\"nstart\"]]\n arrow_xe = [self.layout[e][\"x\"] for e in layout_utils[\"nend\"]]\n arrow_ye = [self.layout[e][\"y\"] for e in layout_utils[\"nend\"]]\n\n groups = self.scheduler.task_groups\n\n for key in layout_utils[\"tg_stack\"]:\n try:\n tg = groups[key]\n except KeyError:\n continue\n \n x = self.layout[key][\"x\"] #x[key] \n y = self.layout[key][\"y\"] #y[key]\n \n node_x.append(x)\n node_y.append(y)\n node_name.append(tg.prefix.name)\n node_short_name.append(\n tg.prefix.name[:10]\n ) # need to change how to choose the short name\n node_color.append(color_of(tg.prefix.name))\n #keys = set(self.layout[\"alpha\"].keys())\n # raise Exception(str(keys) + key)\n #node_alpha.append(self.layout[\"alpha\"][key])\n node_tot_tasks.append(sum(tg.states.values()))\n\n node = {\n \"x\": node_x,\n \"y\": node_y,\n \"name\": node_name,\n \"name_short\": node_short_name,\n \"colors\": node_color,\n #\"alpha\": node_alpha,\n \"tot_tasks\": node_tot_tasks,\n }\n\n arrow = {\n \"xs\": arrow_xs,\n \"ys\": arrow_ys,\n \"xe\": arrow_xe,\n \"ye\": arrow_ye,\n }\n self.node_source.data.update(node)\n self.arrow_source.data.update(arrow)\n\n \n \n @without_property_validation\n def update(self):\n if set(self.layout) != set(self.scheduler.task_groups):\n print('update layout')\n self.update_layout()\n #else:\n # print('layout = scheduler.tg, no update')\n \n data_alpha = {\n \"x\": [],\n \"y\": [],\n \"color\": [],\n \"alpha\":[],\n }\n\n completed = {\n k: 1.0\n - (\n tg.states[\"waiting\"]\n + tg.states[\"no-worker\"]\n + tg.states[\"processing\"]\n )\n / sum(tg.states.values())\n for k, tg in self.scheduler.task_groups.items()\n }\n\n for tg in self.scheduler.task_groups:\n tg_d = self.scheduler.task_groups[tg]\n x = self.layout[tg][\"x\"]\n y = self.layout[tg][\"y\"]\n\n data_alpha[\"x\"].append(x)\n data_alpha[\"y\"].append(y)\n data_alpha[\"alpha\"].append(completed[tg])\n data_alpha[\"color\"].append(color_of(tg_d.prefix.name))\n\n self.alpha_source.data.update(data_alpha)\n", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "e9231a30", | |
"cell_type": "code", | |
"source": "from bokeh.plotting import show, output_notebook\nfrom bokeh.plotting import figure\nfrom bokeh.models import (\n Arrow,\n ColumnDataSource,\n HoverTool,\n LabelSet,\n Title,\n VeeHead,\n)\noutput_notebook()", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "00d3e8e1", | |
"cell_type": "code", | |
"source": "n = da.random.random((10000, 10000))\nm = da.random.random((10000, 10000))\np = n.dot(m.T).persist()", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "5fca9397", | |
"cell_type": "code", | |
"source": "tgg=TGroupGraph(client.cluster.scheduler, sizing_mode='stretch_both')", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "09bcb813", | |
"cell_type": "code", | |
"source": "tgg.update()\nshow(tgg.root)", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "a8ee3ce6", | |
"cell_type": "code", | |
"source": "import dask.array as da", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "c64a6cac", | |
"cell_type": "code", | |
"source": "w = da.random.random((10000, 10000))\nt = w + w.T - w.mean(axis=0) + w.var(axis=0)\nt = t.persist()", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "fbf9cb34", | |
"cell_type": "code", | |
"source": "tgg.update()\nshow(tgg.root)", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"id": "f999acca", | |
"cell_type": "code", | |
"source": "", | |
"execution_count": null, | |
"outputs": [] | |
} | |
], | |
"metadata": { | |
"_draft": { | |
"nbviewer_url": "https://gist.github.com/c92c97671f3b58c514e2f51e89575ef5" | |
}, | |
"gist": { | |
"id": "c92c97671f3b58c514e2f51e89575ef5", | |
"data": { | |
"description": "tg_viz.ipynb", | |
"public": true | |
} | |
}, | |
"kernelspec": { | |
"name": "python3", | |
"display_name": "Python 3", | |
"language": "python" | |
}, | |
"language_info": { | |
"name": "python", | |
"version": "3.8.10", | |
"mimetype": "text/x-python", | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"pygments_lexer": "ipython3", | |
"nbconvert_exporter": "python", | |
"file_extension": ".py" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment