Skip to content

Instantly share code, notes, and snippets.

@belyak
Created July 4, 2021 04:27
Show Gist options
  • Save belyak/758039f33d3906187c0104ddfa65eb16 to your computer and use it in GitHub Desktop.
Save belyak/758039f33d3906187c0104ddfa65eb16 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import itertools
import math
import os
import unittest
from typing import List, Optional
import yaml
def get_files_iter():
return iter((d for d in os.listdir(".") if (d.startswith("docker-compose-") and d.endswith(".yaml"))))
class ConnectionPoint:
def __init__(self, x, y):
self.x = x
self.y = y
def __iter__(self):
return iter([self.x, self.y])
def __repr__(self):
return f"{self.__class__.__name__}({self.x}, {self.y})"
def __add__(self, other):
return ConnectionPoint(self.x + other.x, self.y + other.y)
@classmethod
def distance(cls, p1:'ConnectionPoint', p2: 'ConnectionPoint'):
return math.sqrt((p1.x - p2.x)**2 + (p1.y - p2.y)**2)
class Component:
_extra = 8
_default_nested_spaces = 0
def __init__(self, _name, _ip):
self.name = _name
self._ip = _ip
self.lines = [self.name, self._ip, ""]
self._nested_spaces = self._default_nested_spaces
self.top_margin = 1
self.bottom_margin = 1
self.left_margin = 1
self.right_margin = 1
self.comp_height = len(self.lines) + 2 + self.top_margin + self.bottom_margin
max_content_len = max(len(line) for line in self.lines)
self.comp_weight = self.left_margin + self.right_margin + self._nested_spaces + max_content_len + (self._extra * 2) + 2
def get_connection_points(self) -> List[ConnectionPoint]:
middle_y = self.comp_height // 2
middle_x = self.comp_weight // 2
points = [
ConnectionPoint(self.left_margin, middle_y),
ConnectionPoint(self.comp_weight - self.right_margin - 1, middle_y),
ConnectionPoint(middle_x, self.top_margin),
ConnectionPoint(middle_x, self.comp_height - self.bottom_margin - 1,),
]
return points
@property
def nested_spaces(self):
return self._nested_spaces
@nested_spaces.setter
def nested_spaces(self, new_value):
self._nested_spaces = new_value
def add_line(self, content: str):
self.lines.append(content)
@property
def max_content_len(self):
return max(len(s) for s in self.lines)
@property
def top_down_line(self):
return f"{self.left_margin_cnt}{self.sp_line}+{'-' * (self.max_content_len + self._extra * 2)}+{self.right_margin_cnt}"
@property
def left_margin_cnt(self):
return ' ' * self.left_margin
@property
def right_margin_cnt(self):
return ' ' * self.right_margin
@property
def sp_line(self):
return str(' ' * self._nested_spaces)
def render_all_lines(self) -> str:
needed_len = self.max_content_len + (self._extra * 2)
res = []
for line in self.lines:
item_tpl = '+{:^' + str(needed_len) + '}+'
item = item_tpl.format(line)
item = f"{self.left_margin_cnt}{self.sp_line}{item}"
res.append(item)
return os.linesep.join(res)
def rendered_lines(self) -> List[str]:
_ = self.top_down_line
needed_len = self.max_content_len + (self._extra * 2)
res = []
for line in self.lines:
item_tpl = '+{:^' + str(needed_len) + '}+'
item = item_tpl.format(line)
item = f"{self.sp_line}{self.left_margin_cnt}{item}{self.right_margin_cnt}"
res.append(item)
top_margin_block = ['',] * self.top_margin
bottom_margin_block = ['',] * self.bottom_margin
return top_margin_block + [self.top_down_line] + res + [self.top_down_line] + bottom_margin_block
@property
def width(self):
if len(self.lines):
return max(len(line) for line in self.lines) + 2
else:
return 0
@property
def height(self):
return 2 + len(self.lines) if len(self.lines) != 0 else 0
def __str__(self):
all_lines = self.render_all_lines()
_ = self.top_down_line
parts = [
self.top_down_line,
all_lines,
self.top_down_line
]
return os.linesep.join(parts)
def print(self):
return print(self)
class AbstractServiceDrawer:
_default_nested_spaces = 5
def __init__(self, components: Optional[List['Component']], **kwargs):
self.components = components or []
self.components_map = {c.name: c for c in self.components}
def draw(self, *args, **kwargs):
raise NotImplementedError
class ListServiceDrawer(AbstractServiceDrawer):
def draw(self, *args, **kwargs) -> str:
component_reprs = []
for c in self.components:
c.nested_spaces = self._default_nested_spaces
parts = [
""
"",
c.__str__(),
"",
]
component_repr = os.linesep.join(parts)
component_reprs.append(component_repr)
return os.linesep.join(component_reprs)
class ASCIICanvas:
def __init__(self, width, height, title=None, **kwargs):
self.width = width
self.height = height
self.lines = self._gen_lines()
self.title = title
def draw_at_pos(self, x, y, xs, ys, lines):
"""
Takes another rectangular object (as a list of equal length lines)
and prints it to own rectangular ASCII area (canvas)
"""
print(f"draw at pos x: {x} y: {y}, x scale: {xs}, y scale: {ys}, lines len: {len(lines)}")
for l_ix, line in enumerate(lines):
try:
cnv_y = l_ix + y * ys
cnv_x = x * xs
target_line = self.lines[cnv_y]
self.lines[cnv_y] = target_line[:cnv_x] + line + target_line[cnv_x + len(line):]
except Exception as e:
sss = e
def draw_symbol_at_pos(self, x, y, c):
self.lines[y] = self.lines[y][:x] + c + self.lines[y][x + 1:]
def __str__(self):
def lines_to_component(_lines):
top_bottom = ''.join(['+' + ('-' * self.width) + '+'])
_p_lines = [f"|{line}|" for line in _lines]
return [top_bottom] + _p_lines + [top_bottom]
dbg_info = [
f"canvas width: {self.width}, lines count: {len(self.lines)}",
f"canvas height: {self.height} lines lens: [{', '.join([f'{len(ln)}' for ln in self.lines])}]",
f"canvas debug info end.",
"",
]
if self.title:
lines = ["--- *** ---", self.title, "--- ***** ---"] + lines_to_component(self.lines)
else:
lines = lines_to_component(self.lines)
dbg_end = ["--- end ---", " * --- *"]
return "\n".join(dbg_info + lines + dbg_end)
def _gen_lines(self) -> List[str]:
res = []
for _ in range(self.height):
res.append(' ' * self.width)
return res
class LayoutServiceDrawer(AbstractServiceDrawer):
def __init__(self, components: Optional[List['Component']],
layout,
title=None):
super().__init__(components)
self.title = title
self.layout = layout
self.connections = layout.get('connections', [])
def draw(self, *args, **kwargs):
req_w, req_h, min_x, min_y, = self._calc_required_size()
print(f"min x: {min_x} min y: {min_y}")
xs, ys = self._calc_required_xy_scales()
canvas = ASCIICanvas(title=self.title,
width=req_w * xs,
height=req_h * ys)
for component in self.components:
_xc, _yc = self.get_component_corr_x_y(component, min_x, min_y)
print(f"component name: {component.name} _xc, _yc: ({_xc},{_yc})")
canvas.draw_at_pos(_xc, _yc, xs, ys, component.rendered_lines())
for comp_a, _data, in self.connections.items():
if comp_a == component.name:
for comp_b, _add_d, in _data.items():
if comp_b not in [c.name for c in self.components]:
raise ValueError(f"Incorrect target component {comp_b}!")
def _process_points(pp: List[ConnectionPoint], comp: Component):
res = []
for cp in pp:
res.append(
self.component_point_to_canvas(cp, comp)
)
return res
comp_a_cp = _process_points(self.components_map[comp_a].get_connection_points(),
comp=self.components_map[comp_a])
comp_b_cp = _process_points(self.components_map[comp_b].get_connection_points(),
comp=self.components_map[comp_b])
result = []
for p1, p2, in itertools.product(comp_a_cp, comp_b_cp):
result.append((ConnectionPoint.distance(p1, p2), p1, p2,))
best_entry = min(result, key=lambda t: t[0])
_, p1, p2 = best_entry
canvas.draw_symbol_at_pos(p1.x, p1.y, 'Z')
canvas.draw_symbol_at_pos(p2.x, p2.y, 'z')
return str(canvas)
def component_point_to_canvas(self, point: ConnectionPoint, component: Component):
*_, min_x, min_y, = self._calc_required_size()
comp_x_pos, comp_y_pos = self.get_component_corr_x_y(component, min_x, min_y)
fx, fy = self.component_x_y_to_canvas(point.x, point.y, comp_x_pos, comp_y_pos)
return ConnectionPoint(fx, fy)
def component_x_y_to_canvas(self, comp_x, comp_y, comp_x_pos, comp_y_pos):
xs, ys = self._calc_required_xy_scales()
fin_x = comp_x + comp_x_pos * xs
fin_y = comp_y + comp_y_pos * ys
return fin_x, fin_y,
def get_component_pos_x_y(self, component: Component):
return self.layout['services'][component.name]['position']
def get_component_corr_x_y(self, component: Component, min_x, min_y):
x, y = self.get_component_pos_x_y(component)
x_corr, y_corr = x - min_x, y - min_y
return x_corr, y_corr
def _calc_required_xy_scales(self):
comp_ws, comp_hs = [], []
for comp in self.components:
comp_ws.append(comp.comp_weight)
comp_hs.append(comp.comp_height)
return max(comp_ws) + 1, max(comp_hs) + 1
def _calc_required_size(self):
xs_vals = []
ys_vals = []
services = self.layout.get('services', {})
for comp_name, comp_position, in services.items():
try:
_px, _py = comp_position['position']
ys_vals.append(_py)
xs_vals.append(_px)
except KeyError:
raise ValueError(
f"for the layout {self.__class__.__name__} "
f"each service has to have 'position key "
f"while {comp_name} hasn't."
)
(min_y, max_y,
min_x, max_x) = (min(ys_vals), max(ys_vals),
min(xs_vals), max(xs_vals))
_w = max_x - min_x + 1
_h = max_y - min_y + 1
print(locals())
return _w, _h, min_x, min_y,
def process_service_component(global_map, global_list, data, name):
try:
cp = data['services'][name]
except KeyError:
raise ValueError("Bad component without 'Services' key!")
_ip = cp['networks']['app_net']['ipv4_address']
component = Component(_name=name,
_ip=_ip)
global_map[name] = component
global_list.append(component)
def process_docker_file(f, drawer_cls=ListServiceDrawer, **drawer_kwargs):
with open(f, 'r') as fd:
data = yaml.load(fd, Loader=yaml.FullLoader)
print()
print(f"docker-compose file: {f}")
global_map = {}
components = []
for service in data.get('services', []):
process_service_component(global_map, components, data, service)
connections = []
for srv_from, data, in data.get('connections', []):
component = global_map[srv_from]
for srv_to, s_data, in data.items():
dest_component = global_map[srv_to]
connections.append([component, dest_component])
if connections != []:
drawer_kwargs['connections'] = connections
d = drawer_cls(components=components, **drawer_kwargs)
print(d.draw())
DC_LAYOUT_MAP = {
'docker-compose-kes.yaml': {
'services': {
'kes_api': {
'position': (-1, 0,)
},
'kes': {
'position': (0, 0,)
},
'ether_stub': {
'position': (0, 1,),
},
},
'connections': {
'kes_api': {
'kes': {'type': 'ZMQ_REQ'}
},
'ether_stub': {
'kes': {'type': 'GRETAP'}
}
}
}
}
def generate_ascii_schemes(compose_files=None, limit=None):
compose_files = compose_files or list(get_files_iter())
if limit:
compose_files = compose_files[limit:]
for file in compose_files:
kwargs = {}
try:
kwargs['layout'] = DC_LAYOUT_MAP[file]
kwargs['title'] = f"Composition for file '{file}':"
drawer_cls = LayoutServiceDrawer
except KeyError:
drawer_cls = ListServiceDrawer
process_docker_file(file, drawer_cls=drawer_cls, **kwargs)
if __name__ == '__main__':
generate_ascii_schemes()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment