-
-
Save czlr/a7859730299e73ac01c4d85a8a82827d to your computer and use it in GitHub Desktop.
For OrcaSlicer/issues/5661
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import collections | |
import contextlib | |
import dataclasses | |
import functools | |
import inspect | |
import itertools | |
import math | |
import operator | |
import os | |
import pathlib | |
import pprint | |
import re | |
import sys | |
import textwrap | |
import traceback | |
import typing | |
from frozendict import frozendict, deepfreeze | |
import aenum | |
import numpy as np | |
import shapely | |
import shapely.ops | |
import shapely.validation | |
def main(args): | |
lines = read_lines(args.input) | |
config = parse_config(lines) | |
geom_data = parse_geometry(lines) | |
all_processors = [ | |
_p_realign_spiral_zhop(geom_data, 5, add_pre_pause_ms=50) | |
] | |
for proc in all_processors: | |
print(proc) | |
lines = proc(lines) | |
assert lines, 'Bad processor impl didn\'t return any lines!' | |
out_fp = args.input | |
if not args.real_run: | |
# Use temp file | |
out_fp = out_fp.with_stem(out_fp.stem + '__gcpp') | |
print(f'Writing {out_fp=} ...') | |
with open(out_fp, 'w') as f: | |
f.writelines(line + '\n' for line in lines) | |
print('\nSUCCESS -- all done') | |
def _p_realign_spiral_zhop(geom_data, deg, add_pre_pause_ms=0): | |
''' | |
start_angle is relative to the current xy motion vector | |
positive means curving inwards slightly | |
''' | |
rad = np.radians(180 + deg) | |
rot_ccw = np.array([ | |
[np.cos(rad), -np.sin(rad)], | |
[np.sin(rad), np.cos(rad)] | |
]) | |
rot_cw = np.array([ | |
[np.cos(-rad), -np.sin(-rad)], | |
[np.sin(-rad), np.cos(-rad)] | |
]) | |
def _f(g, ms): | |
if not ms.is_in_zhop: | |
return | |
assert g.op != 'G2', 'Not supported yet!' | |
if g.op not in {'G3'}: | |
return | |
wall_vec, _ = geom_data[ms.layer_num].find_nearest_wall(ms.x, ms.y) | |
wall_dist = np.linalg.norm(wall_vec) | |
assert wall_dist > 0 | |
# Prepare new spiral (i,j) relative to the wall, but | |
# retaining original scale | |
ij = g.get_vec('ij') | |
assert not any(v is None for v in ij) | |
scale = np.linalg.norm(ij) / wall_dist | |
new_ij = (wall_vec[0] * scale, wall_vec[1] * scale) | |
# Match rotation direction to current motion | |
# G2 is CW, G3 is CCW | |
rel_angle = rads_between(ms.xy_diff, wall_vec) | |
if rel_angle >= 0: | |
op = 'G3' | |
new_ij = np.dot(rot_ccw, new_ij) | |
else: | |
op = 'G2' | |
new_ij = np.dot(rot_cw, new_ij) | |
assert abs(np.linalg.norm(new_ij) - np.linalg.norm(ij)) < 0.001 | |
tag = f'VecAlign W(x{round(wall_vec[0], 4)}, y{round(wall_vec[1], 4)} d{round(wall_dist, 4)})' | |
res = [ | |
g.derive(op=op, tag=tag, i=new_ij[0], j=new_ij[1]) | |
] | |
if add_pre_pause_ms > 0: | |
res = [ | |
g.derive(op='G4', tag='AddPause', update=False, p=add_pre_pause_ms) | |
] + res | |
return res | |
return functools.partial(apply_lines_processor, line_func=_f) | |
def rads_between(a, b): | |
''' | |
Rads between a and b, where b is the reference like (1, 0) | |
typically. Kind of like (angle(a) - angle(b)). | |
''' | |
# Range of each atan2 is (-pi, pi] | |
# so initial rough range is (-2pi, 2pi) | |
rads = math.atan2(a[1], a[0]) - math.atan2(b[1], b[0]) | |
rads %= 2 * math.pi # [0, 2pi) | |
if rads > math.pi: rads -= 2 * math.pi # (-pi, pi] | |
return rads | |
def as_collection(item, seq_cls, default=None): | |
''' | |
Normalize the item by wrapping it in a simple collection type | |
(list, set, tuple, etc.) if needed. | |
Also supports other "smart" conversions. E.g., empty collection | |
if the item is None, etc. | |
''' | |
match item: | |
case seq_cls(): | |
return item | |
case None: | |
if default is not None: | |
return as_collection(default, seq_cls) | |
return seq_cls() | |
# Match against explicit types since there are too many other | |
# Collection-ish items that are more appropriate kept together | |
# (str, dict, etc.) | |
case list() | set () | tuple(): | |
return seq_cls(item) | |
return seq_cls((item,)) | |
@contextlib.contextmanager | |
def sample_printer(max_times=25, indent=4): | |
''' | |
Generates a function equivalent to 'print', but with | |
functionality specific to data sampling cases -- where | |
it's useful to dump out a few lines, but they shouldn't | |
completely spam the output. | |
''' | |
n = 0 | |
def f(*args, **kwds): | |
nonlocal n | |
n += 1 | |
if n <= max_times: | |
print(' ' * indent, end='') | |
print(*args, **kwds) | |
elif n == (max_times + 1): | |
print(f'{" " * indent}... (extra lines hidden)') | |
try: | |
yield f | |
finally: | |
print(f'{" " * indent}... ({n} total lines)\n') | |
def apply_lines_processor(lines:list[str], line_func) -> list[str]: | |
''' | |
Driver for basic line-by-line processing | |
Just in case, each processor run is fully isolated -- intaking | |
and outputting raw line strings only. | |
The provided line handler function can return None to | |
indicate the current line isn't related to its processing -- | |
and thus have it carried over as-is to the output. | |
Or return a list of new GcodeLine items to add instead. | |
Similarly, an empty list effectively deletes the line. | |
''' | |
output = [] | |
num_changed = 0 | |
with sample_printer() as sampler: | |
for g, ms in iter_line_items(lines): | |
try: | |
res = None | |
if ms.is_main_feature: | |
res = line_func(g, ms) | |
finally: | |
if res is not None: | |
num_changed += 1 | |
for r in as_collection(res, list, g): | |
if r != g: | |
sampler(r.line) | |
assert isinstance(r, GcodeLine) | |
output.append(r.line) | |
assert num_changed > 0, 'Sanity check: each processor must do something...' | |
return output | |
def read_lines(fp): | |
lines = [] | |
with open(fp, 'r') as f: | |
for line in f: | |
# Remove \n, whitespace, etc., but keep left side indents | |
line = line.rstrip() | |
lines.append(line) | |
return lines | |
def iter_line_items(lines): | |
''' | |
Standard boilerplate for iterating over enriched/parsed lines | |
''' | |
ms = MotionState() | |
for line in lines: | |
g = parse_line_cached(line) | |
ms = ms.update(g) | |
yield (g, ms) | |
def parse_config(lines): | |
config = {} | |
in_section = False | |
for line in lines: | |
g = parse_line_cached(line) | |
match g.comment: | |
case 'CONFIG_BLOCK_START': | |
in_section = True | |
case 'CONFIG_BLOCK_END': | |
# Sort by keys for consistency | |
return frozendict(sorted(config.items())) | |
case _ if in_section: | |
kv = g.comment.split('=', 1) | |
assert len(kv) == 2 | |
k, v = kv[0].strip(), kv[1].strip() | |
if v.endswith('%'): | |
v = v[:-1] | |
# For now, only take non-numeric configs | |
try: | |
config[k] = float(v) | |
except ValueError: | |
pass | |
raise RuntimeError | |
def parse_geometry(lines) -> list['LayerGeometry']: | |
print('\nParsing print geometry...') | |
geom_data = [] | |
for layer_num, items in itertools.groupby(iter_line_items(lines), key=lambda gms: gms[1].layer_num): | |
print(f' Layer {layer_num}:') | |
assert len(geom_data) == layer_num | |
items = list(items) | |
geom = LayerGeometry(items) | |
for p in geom.wall_props: | |
print(f' {p}') | |
if layer_num >= 1: | |
assert len(geom.wall_props) >= 1, 'At least 1 wall for each layer...' | |
geom_data.append(geom) | |
print() | |
return geom_data | |
def safe_op(op, a, b, a_default=0, b_default=0): | |
''' | |
Apply an operation, with fallback for None types | |
https://docs.python.org/3/library/operator.html | |
If both values are non-None after resolving defaults, | |
evaluate and return the result of `op`. | |
Otherwise, return either the singular non-None value, | |
or None if both are none. | |
''' | |
a = a_default if a is None else a | |
b = b_default if b is None else b | |
match (a, b): | |
case (None, None): | |
return None | |
case (None, _): | |
return b | |
case (_, None): | |
return a | |
case (_, _): | |
return op(a, b) | |
raise RuntimeError | |
def safe_vec(op, a_items, b_items, *args, **kwds) -> tuple: | |
''' | |
Batched version | |
''' | |
assert len(a_items) == len(b_items) | |
return tuple(safe_op(op, a, b, *args, **kwds) for a, b in zip(a_items, b_items)) | |
@dataclasses.dataclass(frozen=True, slots=True) | |
class MotionState: | |
xyz: tuple[float, float, float] = (None, None, None) | |
xy_diff: tuple[float, float] = (None, None) | |
xy_dist: float = None | |
moved_now: bool = None | |
# Extrusion for just the current segment; since the last retraction (E<0). | |
# <0 means it's still retracted and pending compensation. | |
ext_seg: float = 0 | |
fan: float = None # Primary fan speed (others ignored) | |
accel: float = None # Acceleration in mm/s^2 | |
feature: str = None # 'outer_wall', 'brim', etc. normalized lowercase with _'s | |
line_width: float = None # Current line width in mm | |
layer_num: int = 0 # First layer is 1 | |
layer_z: float = None # Current layer z height in mm | |
layer_height: float = None | |
# Custom stuff, via user-added Machine G-code, postprocessors, etc. | |
user_block: str = '' | |
# Mechanism to delay changes until the next update. | |
# For handling tags that are technically inclusive. E.g., | |
# count a marker like '; END_SECTION' as part of whatever | |
# preceeding group it represents. | |
change_after: frozendict = dataclasses.field(default_factory=frozendict) | |
# Derived stuff | |
is_fully_defined: bool = dataclasses.field(init=False, default=False) | |
# Handles both: | |
# With key: '; LINE_WIDTH: 0.831828', '; FEATURE: Bottom surface' | |
# Without key: '; WIPE_START' | |
BLOCK_MARKER_RE: typing.ClassVar = re.compile(r'(?P<key>[A-Z_]+)(?::\s*(?P<value>[a-zA-Z0-9_. ]+))?') | |
def __post_init__(self): | |
assert isinstance(self.change_after, frozendict) | |
data = dataclasses.astuple(self) | |
# Quick and dirty for now... | |
# This is a relatively good "bad" solution since it will never | |
# mis-identify bad states as valid. | |
is_fully_defined = 'None' not in repr(data) | |
object.__setattr__(self, 'is_fully_defined', is_fully_defined) | |
def update(self, g: 'GcodeLine') -> 'MotionState': | |
changes = dict(self.change_after) | |
change_after = {} | |
changes['change_after'] = change_after | |
changes['moved_now'] = False | |
match g.op: | |
case 'G0' | 'G1' | 'G2' | 'G3': | |
cur_xyz = g.get_vec('xyz', self.xyz) | |
changes['xyz'] = cur_xyz | |
xy_diff = safe_vec(operator.sub, cur_xyz[:2], self.xyz[:2]) | |
xy_dist = np.linalg.norm(xy_diff) | |
if xy_dist > 0.001: # Ignore micro changes | |
changes['xy_diff'] = xy_diff | |
changes['xy_dist'] = xy_dist | |
changes['moved_now'] = True | |
diff_e = g.get('e', 0) | |
if diff_e != 0: | |
if diff_e < 0 and self.ext_seg > 0: | |
# Reset from retraction | |
changes['ext_seg'] = diff_e | |
else: | |
changes['ext_seg'] = diff_e + self.ext_seg | |
case 'M106': | |
if g.get('p', 1) == 1: | |
changes['fan'] = g.get('s', self.fan) | |
case 'M107': | |
if g.get('p', 1) == 1: | |
changes['fan'] = 0 | |
case 'M204': | |
changes['accel'] = g.get('s', self.accel) | |
if g.op is None: | |
m = self.BLOCK_MARKER_RE.fullmatch(g.comment) | |
match None if m is None else m.groups(): | |
case ('FEATURE', value): | |
assert (not self.user_block) or (changes.get('user_block') == ''), 'Feature transition should never happen inside user block' | |
changes['feature'] = value.lower().replace(' ', '_') | |
case ('LINE_WIDTH', value): | |
changes['line_width'] = float(value) | |
# These ones happen in sequence one after another | |
case ('CHANGE_LAYER', value): | |
# Note: feature DOES NOT reset on layer change. E.g., if you get multiple | |
# pure-wall layers one after another | |
changes['layer_num'] = self.layer_num + 1 | |
changes['layer_z'] = None | |
changes['layer_height'] = None | |
case ('Z_HEIGHT', value): | |
changes['layer_z'] = float(value) | |
case ('LAYER_HEIGHT', value): | |
changes['layer_height'] = float(value) | |
# NOT OrcaSlicer standard... added manually in gcode blocks | |
# Format: | |
# ; BEGIN_CUSTOM_GCODE section_name [... optional other stuff] | |
# ; END_CUSTOM_GCODE section_name | |
case ('BEGIN_CUSTOM_GCODE', value): | |
changes['user_block'] = value.strip() | |
case ('END_CUSTOM_GCODE', value): | |
assert self.user_block.startswith(value), 'Incorrect start vs. end block pair' | |
change_after['user_block'] = '' | |
changes['change_after'] = frozendict(changes['change_after']) | |
return dataclasses.replace(self, **changes) | |
@property | |
def xy_unitvec(self): | |
'''Return a 2-tuple representing the current xy unit vector.''' | |
dx, dy = self.xy_diff | |
dist = self.xy_dist | |
return (dx / dist, dy / dist) | |
@property | |
def xy(self): | |
return self.xyz[:2] | |
@property | |
def x(self): | |
return self.xyz[0] | |
@property | |
def y(self): | |
return self.xyz[1] | |
@property | |
def z(self): | |
return self.xyz[2] | |
@property | |
def xy_prev(self): | |
dx, dy = self.xy_diff | |
return (self.x - dx, self.y - dy) | |
@property | |
def is_in_zhop(self): | |
return self.ext_seg <= 0 | |
@property | |
def is_main_feature(self): | |
''' | |
True if currently in the "meat" of printing the object. False | |
if it's other system support stuff like pre-print gcode, etc. | |
''' | |
return ((self.layer_num > 0) | |
and self.is_fully_defined | |
and (not self.user_block)) | |
@property | |
def oval_rect_offset(self): | |
''' | |
Effective line width difference (in mm) between an oval vs. | |
rectangular cross section based approach | |
Orca uses oval line widths. the equivalent rectangular line | |
width is less than that. | |
This varies WRT layer height | |
''' | |
return self.line_width * (1 - (math.pi / 4)) | |
@functools.cache | |
def parse_line_cached(line): | |
'''Thin wrapper for caching''' | |
return GcodeLine(line) | |
@dataclasses.dataclass(frozen=True, slots=True) | |
class GcodeLine: | |
# Raw input line, mostly as-is | |
line: str | |
# Only None iff the line is empty / pure comment | |
op: str | None | |
# Only populated with recognized-valid values | |
kwds: frozendict[str, str] | |
# Comment component | |
comment: str | |
# True iff line doesn't contain any weird stuff like | |
# 'M1002 set_gcode_claim_speed_level : 5' | |
is_simple: bool | |
indent: int | |
# Mirrored from kwds, but parsed into numbers | |
_vals: frozendict[str, float | int] = dataclasses.field(repr=False) | |
OP_RE: typing.ClassVar = re.compile(r'[GMT][0-9]+(?:\.[0-9]+)?') # Supports optional '.N' | |
SIMPLE_KWD_RE: typing.ClassVar = re.compile(r'([A-Z])(.+)') | |
MOVE_OPS: typing.ClassVar = { 'G0', 'G1', 'G2', 'G3' } | |
ARC_OPS: typing.ClassVar = { 'G2', 'G3' } | |
def __init__(self, line: str): | |
assert isinstance(line, str) | |
line = line.rstrip() | |
indent = len(line) - len(line.lstrip()) | |
comment_tokens = line.split(';', 1) | |
assert 1 <= len(comment_tokens) <= 2 | |
comment = comment_tokens[1].strip() if len(comment_tokens) >= 2 else '' | |
is_simple = True | |
op_tokens = comment_tokens[0].split() # Self-handles pre/post whitespace | |
op = None | |
kwds = {} | |
_vals = {} | |
if op_tokens: | |
op = op_tokens[0] | |
if not self.OP_RE.fullmatch(op): | |
raise NotImplementedError(f'Unsupported gcode on {line=}') | |
for kwd in op_tokens[1:]: | |
m = self.SIMPLE_KWD_RE.fullmatch(kwd) | |
if not m: | |
is_simple = False | |
continue | |
key, val = m.groups() | |
assert len(key) == 1 | |
try: | |
# EAFP | |
num = float(val) if ('.' in val) else int(val) | |
if math.isfinite(num): | |
kwds[key] = val | |
_vals[key] = num | |
continue | |
except ValueError: | |
pass | |
is_simple = False | |
object.__setattr__(self, 'line', line) | |
object.__setattr__(self, 'op', op) | |
object.__setattr__(self, 'kwds', frozendict(kwds)) | |
object.__setattr__(self, 'comment', comment) | |
object.__setattr__(self, 'is_simple', is_simple) | |
object.__setattr__(self, 'indent', indent) | |
object.__setattr__(self, '_vals', frozendict(_vals)) | |
@property | |
def is_move(self): | |
return self.op in self.MOVE_OPS | |
@property | |
def is_printing_move(self): | |
# Sometimes scarf joint can emit E0 because of rounding | |
# even though technically that's a travel move... | |
# Hence we check for prescense rather than numeric value here | |
return self.is_move and ('E' in self.kwds) | |
@property | |
def is_travel_move(self): | |
return self.is_move and not self.is_printing_move | |
def get(self, key, default=None) -> float: | |
'''Convenience single-value getter''' | |
return self._vals.get(key.upper(), default) | |
def get_vec(self, keys: str, defaults=None) -> tuple[float]: | |
''' | |
Convenience batch-getter | |
E.g., .get('xyz') -> (0.0, 1.0, 2.0) | |
defaults is a sequence of base values for filling unavailable | |
spots. This can also be used to apply/mask on top of previous | |
values. | |
''' | |
defaults = defaults or ((None,) * len(keys)) | |
assert len(defaults) == len(keys) | |
return tuple(self.get(k, d) for k, d in zip(keys, defaults)) | |
def derive(self, op=None, update=True, tag=None, **kwds) -> 'GcodeLine': | |
''' | |
Derive a new GcodeLine based on the current instance. | |
- op | |
Replacement operator, if provided | |
- update | |
If true, use existing kwds as the basis and merge in new values. | |
Otherwise, start fresh with only the provided kwds. | |
- tag | |
Optional tag identifying the purpose | |
- **kwds | |
Individual values for new kwds/vals | |
''' | |
tokens = [] | |
tokens.append(self.op if op is None else op.upper()) | |
merged = dict(self.kwds) if update else {} | |
for k, v in kwds.items(): | |
assert len(k) == 1 | |
k = k.upper() | |
match v: | |
case float(): | |
merged[k] = f'{v:.4f}' | |
case _: | |
merged[k] = str(v).strip() | |
tokens.extend([f'{k}{v}' for k, v in merged.items()]) | |
tokens.append(f'; _p{"" if tag is None else ":" + tag}' | |
f' < [{self.op}{";" + self.comment if self.comment else ""}]') | |
return GcodeLine((' ' * self.indent) + ' '.join(tokens)) | |
@dataclasses.dataclass(frozen=True, slots=True) | |
class WallProp: | |
tier: int | |
is_outward: bool # aka contour | |
nesting: tuple[int] | |
shell: shapely.LinearRing = dataclasses.field(repr=False) | |
area: float | |
bounds: tuple | |
num_holes: int | |
def __init__(self, tier, nesting, poly): | |
assert isinstance(poly, shapely.Polygon) | |
is_outward = (tier % 2) == 0 | |
if is_outward: | |
shell = poly.exterior | |
else: | |
# Note: gets ignored if no interiors for some reason | |
shell = shapely.unary_union(poly.interiors) | |
if not shell.is_empty: | |
assert shell.is_valid | |
assert isinstance(shell, shapely.LinearRing) or isinstance(shell, shapely.LineString) | |
object.__setattr__(self, 'tier', tier) | |
object.__setattr__(self, 'is_outward', is_outward) | |
object.__setattr__(self, 'nesting', nesting) | |
object.__setattr__(self, 'shell', shell) | |
# Just for sanity/quick reference, not for calculations | |
object.__setattr__(self, 'area', round(poly.area, 2)) | |
object.__setattr__(self, 'bounds', tuple(round(b, 2) for b in shell.bounds)) | |
object.__setattr__(self, 'num_holes', len(poly.interiors)) | |
@property | |
def is_inward(self): | |
# aka hole | |
return not self.is_outward | |
@dataclasses.dataclass(frozen=True) | |
class LayerGeometry: | |
''' | |
Encapsulates polygon information for a single layer. | |
E.g., its wall loops, etc. | |
Shapely notes: | |
within, contains | |
https://shapely.readthedocs.io/en/stable/reference/shapely.within.html | |
'A contains B' if no points of B are outside of A, AND at least one | |
point of B is inside A. | |
within(A, B) == contains(B, A) | |
intersects, disjoint | |
** good general checker | |
https://shapely.readthedocs.io/en/stable/reference/shapely.intersects.html | |
True if A and B share any portion of space | |
intersects(A, B) == ~disjoint(A, B) | |
intersects is a composite of: overlaps OR touches OR within | |
overlaps | |
https://shapely.readthedocs.io/en/stable/reference/shapely.overlaps.html | |
overlaps means having some points in common + same geometry type + other stuff | |
fully contains is not overlapping | |
touches | |
very strict... NOT preferred | |
True if the only points shared between A and B are on the boundary of A and B. | |
https://shapely.readthedocs.io/en/stable/reference/shapely.touches.html | |
covers, covered_by | |
** prefer this over within/contains | |
https://shapely.readthedocs.io/en/stable/reference/shapely.covers.html | |
'A covers B' if no points of B are outside of A | |
covers(A, B) == covered_by(B, A) | |
contains_properly | |
https://shapely.readthedocs.io/en/stable/reference/shapely.contains_properly.html | |
stricter version of covers that disallows common boundary points. | |
''' | |
# https://shapely.readthedocs.io/en/stable/strtree.html | |
wall_tree: shapely.strtree.STRtree = dataclasses.field(repr=False) | |
wall_props: tuple | |
def __init__(self, items): | |
wall_polys = self.get_clean_polys(items) | |
wall_props = [] # Synced indices with wall_tree | |
wall_shells = [] | |
# Resolve tiering | |
exterior_tree = shapely.strtree.STRtree([ shapely.Polygon(p.exterior) for p in wall_polys ]) | |
for i, p in enumerate(wall_polys): | |
# Note: dilation amount must be less than initial pad, otherwise | |
# searching might fail from disappearing point. | |
fuzzy_target = shapely.Polygon(p.exterior).buffer(-0.10) | |
nesting = exterior_tree.query(fuzzy_target, predicate='covered_by') | |
nesting = tuple(nesting.tolist()) # from np array | |
assert len(nesting) >= 1 | |
tier = len(nesting) - 1 | |
wp = WallProp(tier, nesting, p) | |
wall_props.append(wp) | |
wall_shells.append(wp.shell) | |
# (Optional): validate | |
for p in wall_props: | |
expected_nested_tiers = set(range(p.tier + 1)) # E.g., 3 -> {0, 1, 2, 3} | |
actual_nested_tiers = set(wall_props[i].tier for i in p.nesting) | |
assert expected_nested_tiers == actual_nested_tiers | |
wall_tree = shapely.strtree.STRtree(wall_shells) | |
object.__setattr__(self, 'wall_tree', wall_tree) | |
object.__setattr__(self, 'wall_props', deepfreeze(wall_props)) | |
def find_nearest_wall(self, x, y): | |
p_src = shapely.Point(x, y) | |
idx = self.wall_tree.nearest(p_src) | |
prop = self.wall_props[idx] | |
p_wall, _ = shapely.ops.nearest_points(prop.shell, p_src) | |
vec = (float(p_wall.x - p_src.x), float(p_wall.y - p_src.y)) | |
assert any(vec) | |
return (vec, prop) | |
@classmethod | |
def get_clean_polys(cls, items) -> list: | |
''' | |
Get cleaned donuts, after merging random artifacts (thick single segments | |
at tips of spikes, etc.) into larger shapes | |
''' | |
loose_polys = list(cls.iter_all_polys(items)) | |
if len(loose_polys) <= 1: | |
return loose_polys | |
merged = shapely.ops.unary_union(loose_polys) | |
match merged: | |
case shapely.Polygon(): | |
return [ merged ] | |
case shapely.GeometryCollection() | shapely.MultiPolygon(): | |
return [ p for p in merged.geoms if isinstance(p, shapely.Polygon) ] | |
raise NotImplementedError | |
@classmethod | |
def iter_all_polys(cls, items): | |
''' | |
Yields raw wall shapes, converted from paths/segments | |
Typically a donut, but might be lots of random sparse segments | |
if slicing is acting dumb | |
''' | |
for seg in cls.iter_wall_paths(items): | |
# At this point, we have the full outer wall path, possibly | |
# with slight overlap from scarf joint, etc. | |
line_string = shapely.LineString(seg) | |
assert line_string.is_valid | |
# Polygon is a donut-like shape, with its | |
# actual surface roughly matching the printed path | |
# of the wall, with slight width/thickness. | |
# # Wide enough to merge nearby stuff. Keep below min_LW / 2 | |
polygon = line_string.buffer(0.15) | |
assert polygon.is_valid and isinstance(polygon, shapely.Polygon) | |
assert not polygon.is_empty | |
# polygon.interiors can be 0 if narrow ring, 1 if regular donut, 2+ if dogbone | |
yield polygon | |
@classmethod | |
def iter_wall_paths(cls, items): | |
''' | |
Yields line segments representing the printing path of a single | |
wall. Each line segment is a sequence of (x, y) points. | |
''' | |
buffer = [] | |
for g, ms in items: | |
is_building = False | |
try: | |
if ms.feature not in { 'outer_wall', 'overhang_wall' }: | |
continue | |
if ms.is_in_zhop: | |
continue | |
# Sometimes the slicer generates a non-printing G1 | |
# with the same (x,y) as a preceeding printing G1. | |
# In context, there's actually no travel being done... | |
if g.is_travel_move and ms.moved_now: | |
continue | |
is_building = True | |
# Skip over comments etc., but continue parsing | |
if (not ms.is_main_feature) or (not g.is_printing_move): | |
continue | |
assert g.op not in {'G2', 'G3'}, 'Arcs not supported!' | |
# Include initial starting point | |
if not buffer: | |
buffer.append(ms.xy_prev) | |
buffer.append(ms.xy) | |
finally: | |
if (not is_building) and buffer: | |
# print(g, ms) | |
yield tuple(buffer) | |
buffer.clear() | |
# Flush remainder | |
if buffer: | |
yield tuple(buffer) | |
def auto_traceback(f): | |
''' | |
Decorator that automatically dumps local variables when an exception | |
is thrown from the wrapped function. Reduces the need for manually | |
attaching contextual information to assertion failure messages, etc. | |
Note: This DOES NOT swallow the exception itself | |
Usage: | |
@auto_traceback | |
def my_function(...): | |
... | |
''' | |
@functools.wraps(f) | |
def wrapper(*args, **kwds): | |
try: | |
return f(*args, **kwds) | |
except: | |
print('---------------- Frame dump ----------------', end='') | |
# https://docs.python.org/3/reference/datamodel.html#traceback-objects | |
exc = sys.exception() | |
if isinstance(exc, SystemExit) and exc.code == 0: | |
print('\nNot needed for clean SystemExit: 0') | |
raise | |
tb = exc.__traceback__ | |
base_dir = pathlib.Path(__file__).parent.absolute() # Current file dir | |
# https://docs.python.org/3/library/traceback.html#traceback.FrameSummary | |
# https://docs.python.org/3/reference/datamodel.html#frame-objects | |
summaries = traceback.extract_tb(tb) | |
frames = (frame for frame, _ in traceback.walk_tb(tb)) | |
for i, (summary, frame) in enumerate(zip(summaries, frames), 0): | |
# Skip outermost frame (here in this closure) | |
if i <= 0: | |
continue | |
# Depth protection | |
if i >= 10: | |
print(f'\n[{i}] Stopping at depth limit') | |
break | |
# https://docs.python.org/3/reference/datamodel.html#code-objects | |
co = frame.f_code | |
src_path = pathlib.Path(co.co_filename).absolute() | |
is_nearby = src_path.is_relative_to(base_dir) # Quick and dirty qualifier for now | |
sig = None | |
try: | |
func = frame.f_globals[co.co_name] | |
sig = inspect.signature(func) | |
except KeyError: | |
# Not available for local functions, like this decorator wrapper | |
pass | |
print(f'\n[{i}] Function `{co.co_qualname}{sig or "(...)"}`') | |
print(f' Defined in "{src_path.name if is_nearby else src_path}", line {co.co_firstlineno}') | |
# Shorten output in external frames | |
if (not is_nearby) and src_path.match('*/Python*/Lib/*'): | |
print(f' <external_code>') | |
continue | |
def dump_fmt(obj): | |
try: | |
s = repr(obj) | |
except Exception as e: | |
s = f'<repr failed: {e}>' | |
wrapper = textwrap.TextWrapper( | |
width = 80, | |
subsequent_indent = ' ' * 16, | |
max_lines = 10, | |
placeholder = f' ... [{len(s):,} chars]') | |
return wrapper.fill(s) | |
params: dict = sig.parameters if sig else {} | |
if params: | |
print(f' Args:') | |
for k in params.keys(): | |
print(f' {k} := {dump_fmt(frame.f_locals.get(k))}') | |
print(f' Locals:') | |
for k, v in frame.f_locals.items(): | |
if k not in params: | |
print(f' {k} := {dump_fmt(v)}') | |
print(f'\n Trace, line {summary.lineno}:') | |
dent = len(summary._line.rstrip()) - len(summary.line) | |
print(f' {summary.line}') | |
print(f' {" " * (summary.colno - dent)}{"^" * (summary.end_colno - summary.colno)}') | |
traceback.print_exc(limit=0) # Just pretty-formatted exception type | |
print('--------------------------------------------\n') | |
raise | |
return wrapper | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser('Custom GCode postprocessor') | |
parser.add_argument('input', type=pathlib.Path, metavar='IN_GCODE', help='Input file') | |
default_real_run = any(k.startswith('SLIC3R') for k in os.environ) | |
parser.add_argument('--real-run', action='store_true', default=default_real_run, | |
help='DEV-ONLY: Process the file as if it were invoked by the slicer -- ' | |
'with small changes like overwriting in-place, etc. ' | |
'By default, this is automatically inferred via SLIC3R env prescense. ' | |
f'(Optional; Default: currently {default_real_run})') | |
args = parser.parse_args() | |
print(f'{args=}') | |
has_error = True | |
try: | |
# Dynamically applied decorator | |
auto_traceback(main)(args) | |
has_error = False | |
finally: | |
if args.real_run and has_error: | |
input('Press Enter to continue...') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment