Last active May 14, 2021
import dask
import dask.array as da
import distributed
client = distributed.Client()
client
x = da.random.random((10000, 10000))
y = x + x.T - x.mean(axis=0) #+ x.var(axis=0)
y = y.persist()
distributed.wait(y);
df = dask.datasets.timeseries(partition_freq="1H")
df = df.groupby(df.index.minute).agg({
 "name": "max",
 "x": "min",
 "y": "mean",
}).persist()
distributed.wait(df);
groups = client.cluster.scheduler.task_groups
groups
dependencies = {
 k: [ for ds in ts.dependencies if != k]
 for k, ts in groups.items()
 }
dependencies
def get_depth(deps, key):
 if len(deps[key]) == 0:
 return 0
 return max(get_depth(deps, val) + 1 for val in deps[key])
dependencies_depth = {k: get_depth(dependencies, k) for k in dependencies.keys()}
dependencies_depth
dependents = {k: [] for k in dependencies}

for k, v in dependencies.items():
 for dep in v:
 dependents[dep].append(k)
dependents
def toposort_layers(dependencies):
 """Sort the layers in a high level graph topologically
 Parameters
 ----------
 hlg : HighLevelGraph
 The high level graph's layers to sort
 Returns
 -------
 sorted: list
 List of layer names sorted topologically
 """
 degree = {k: len(v) for k, v in dependencies.items()}

 reverse_deps = {k: [] for k in dependencies}

 ready = []
 for k, v in dependencies.items():
 for dep in v:
 reverse_deps[dep].append(k) ## this are the dependents
 if not v:
 ready.append(k)
 ret = []

 
 while len(ready) > 0:
 layer = ready.pop()
 ret.append(layer)
 for rdep in reverse_deps[layer]:
 degree[rdep] -= 1
 if degree[rdep] == 0:
 ready.append(rdep)
 return ret
stack_order = toposort_layers(dependencies)
# groups[stack_order[0]].__dict__
# groups[stack_order[0]].states.values()
# import numpy as np

# np.log(sum(groups[stack_order[0]].states.values()))
def get_ycoords(stack, depth_dict, dependents_dict):
 ycoords = {}
 stack_it = stack[::-1].copy()
 ynext = 0
 while stack_it:
 tg = stack_it.pop()
 if depth_dict[tg] == 0:
 ycoords[tg] = ynext
 ynext += 1 #maybe use whatever is the size of the bar of the prev root

 sort_dependents = [ele for ele in stack if ele in dependents_dict[tg]]
 for dep in sort_dependents:
 if dep not in ycoords:
 #print('Im here')
 ycoords[dep] = ycoords[tg] + sort_dependents.index(dep)
 
 return ycoords
ycoords = get_ycoords(stack_order, dependencies_depth, dependents)
ycoords
from distributed.utils import color_of
import collections
import numpy as np
stack_order
data_nodes = collections.defaultdict(list)
for tg in stack_order:
 data_nodes ['index'].append(tg) 
 data_nodes ['name'].append(groups[tg]
 data_nodes ['x'].append(dependencies_depth[tg])
 data_nodes ['y'].append(ycoords[tg])
 data_nodes ['color'].append(color_of(groups[tg]
 data_nodes ['start'] += dependencies[tg] #start nodes for edges
 data_nodes ['end'] += [tg] * len(dependencies[tg]) #end nodes for edges
 #data_nodes ['top_bar'].append(ycoords[tg] + sum(groups[tg].all_durations.values()))
 data_nodes ['top_bar'].append(ycoords[tg] + max(np.log(sum(groups[tg].states.values())), 1))
 #data_nodes ['all_durations'].append(sum(groups[tg].all_durations.values()))
 data_nodes ['total_tasks'].append(sum(groups[tg].states.values()))
 data_nodes ['log_total_tasks'].append(np.log(sum(groups[tg].states.values())))
data_arrows = {}

data_arrows['start'] = data_nodes.pop('start')
data_arrows['end'] = data_nodes.pop('end')
data_arrows['xs'] = [dependencies_depth[s] for s in data_arrows['start']]
data_arrows['ys'] = [ycoords[s] for s in data_arrows['start']]
data_arrows['xe'] = [dependencies_depth[e] for e in data_arrows['end']]
data_arrows['ye'] = [ycoords[e] for e in data_arrows['end']]
data_arrows
# data['alpha'] += [0.5]*len(stack_order)
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import Plot, Arrow, VBar, VeeHead, ColumnDataSource, GraphRenderer, StaticLayoutProvider, HBar, Ellipse, LabelSet
output_notebook()
#data
#THIS NEEDS TO HAVE A BETTER LOGIC
name_chop = [] 
for name in data_nodes['name']:
 if len(name) <= 6:
 name_chop.append(name)
 else:
 name_chop.append(name[:6])
data_nodes['name_chop'] = name_chop
source_nodes = ColumnDataSource(data_nodes)
source_arrows = ColumnDataSource(data_arrows)
TOOLTIPS = [
 ("tg", "@name"),
 ("num_task","@total_tasks")
 #("all_dur", "@all_durations")
]
plot = figure( title="Graph layout demonstration", tools="", tooltips=TOOLTIPS ,toolbar_location=None, 
 x_range=[-1, 4], y_range=[-1, 4])

plot.square('x', 'y', size=30, color='color', alpha=0.5, source=source_nodes)

plot.add_layout(Arrow(end=VeeHead(size=10), line_color='red', line_alpha=0.5, line_width=2,
 x_start='xs', y_start='ys', x_end='xe', y_end='ye', source=source_arrows))
"source": "We need to think better how to of set root nodes. If we are using bars that represent let's say the amount of tasks (see below) they would overlap if we only offset them by a fix amount (in this case +1)"
"source": "plot_2 = figure( title=\"Graph layout demonstration v2\", tools=\"\", tooltips=TOOLTIPS ,toolbar_location=None, \n x_range=[-1, 4], y_range=[-1, 8])\n\nplot_2.vbar(x=\"x\", top=\"top_bar\", bottom='y', width=0.3, fill_color=\"color\", alpha=0.5, source=source_nodes)\n\n\nplot_2.add_layout(Arrow(end=VeeHead(size=10), line_color='red', line_alpha=0.5, line_width=2,\n x_start='xs', y_start='ys', x_end='xe', y_end='ye', source=source_arrows))\n\nplot_2.add_layout(LabelSet(x='x', y='y', text='name_chop',source=source_nodes, \n background_fill_color=None, x_offset=-1))\nshow(plot_2)",
"source": "",
