Skip to content

Instantly share code, notes, and snippets.

@kierdavis
Last active October 12, 2019 15:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kierdavis/ddd6da1c76119841264f20cdaee1958d to your computer and use it in GitHub Desktop.
Save kierdavis/ddd6da1c76119841264f20cdaee1958d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3.7
import sys
import tty
import termios
import subprocess
import re
from pathlib import Path, PurePath
def getch():
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def input_single_char(prompt):
sys.stdout.write(prompt)
sys.stdout.flush()
response = getch()
sys.stdout.write("\n")
return response
class InvalidAssetCodeException(Exception):
def __init__(self, val):
self.val = val
def __str__(self):
return f"Invalid asset code {self.val!r}"
class AllOf:
def __init__(self, *child_rules):
self.child_rules = child_rules
class AnyOf:
def __init__(self, desc, *child_rules):
self.desc = desc
self.child_rules = child_rules
def times(quantity, rule):
return AllOf(*(rule for _ in range(quantity)))
INVENTORIED_EXPECTED_CONTENTS = AllOf(
times(2, "motor-board-mcv4b"),
"ruggeduino",
"screw-shield-pwr-side",
"screw-shield-usb-side",
"power-board-pbv4b",
"servo-board-sbv4b",
"usb-wifi-tl-wn823n",
times(2, "usb-hub-startech"),
"odroid-u3",
AnyOf("battery charger",
"battery-charger-imax-b6",
"battery-charger-e4",
),
AnyOf("battery charger supply",
"battery-charger-imax-b6-supply",
"battery-charger-e4-supply",
),
AnyOf("webcam",
"webcam-logitech-c270",
"webcam-logitech-c500",
),
AnyOf("lipo bag",
"lipo-bag-overlander-22x30",
"lipo-bag-hpi-18x22",
),
times(2,
AnyOf("battery",
"battery-lipo-11.1v2.2",
"battery-lipo-11.1v2.2-turnigy",
),
),
)
NON_INVENTORIED_EXPECTED_CONTENTS = [
"a USB stick",
"3 macro USB cables",
"5 micro USB cables",
"a bag of camcons",
"a screwdriver",
"an odroid power cable",
"a bundle of black power wire",
"a bundle of red power wire",
"a bundle of green power wire",
"a bundle of blue signal wire",
"a bundle of yellow signal wire",
"a kit disclaimer form",
]
class Asset:
@staticmethod
def _find(raw_asset_code) -> Path:
try:
result = subprocess.run(["sr", "inv-findpart", raw_asset_code], check=True, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise InvalidAssetCodeException(raw_asset_code) from e
return Path(result.stdout.strip().decode("utf-8"))
@classmethod
def find(cls, raw_asset_code) -> "Asset":
return cls(cls._find(raw_asset_code))
def __init__(self, path):
self.path = path
@property
def name(self):
return self.path.name
@property
def _name_parts(self):
match = re.match(r"^(.+?)-sr([a-zA-Z0-9]+)$", self.name)
if match is None:
raise RuntimeError("sr inv-findpart returned a weird result")
return match.groups()
@property
def type(self):
return self._name_parts[0]
@property
def code(self):
return self._name_parts[1]
@property
def contents(self):
for sub_path in self.path.iterdir():
if sub_path.name != "info":
yield Asset(sub_path)
@property
def is_empty(self):
try:
next(self.contents)
return False
except StopIteration:
return True
def move_to(self, dest):
subprocess.run(["git", "mv", str(self.path), str(dest)], check=True)
self.path = self._find(self.code)
def move_into(self, other_asset):
self.move_to(other_asset.path)
def __str__(self):
return self.name
def find_asset_with_type(desired_type, assets):
for asset in assets:
if asset.type == desired_type:
return asset
return None
def verify_recursive(rule, assets):
if isinstance(rule, str):
asset_type = rule
asset = find_asset_with_type(asset_type, assets)
if asset is None:
return [rule], assets
assets = list(assets)
assets.remove(asset)
return [], assets
elif isinstance(rule, AllOf):
failed_rules = []
for sub_rule in rule.child_rules:
sub_failed_rules, assets = verify_recursive(sub_rule, assets)
failed_rules += sub_failed_rules
return failed_rules, assets
elif isinstance(rule, AnyOf):
for sub_rule in rule.child_rules:
failed_rules, new_assets = verify_recursive(sub_rule, assets)
if not failed_rules:
return [], new_assets
return [rule], assets
else:
raise ValueError(rule)
def rule_desc(rule):
if isinstance(rule, str):
return rule
elif isinstance(rule, AllOf):
# verify_recursive won't ever return this
raise RuntimeError("unreachable")
elif isinstance(rule, AnyOf):
return rule.desc
else:
raise ValueError(rule)
def verify_inventoried_contents(assets):
failed_rules, remaining_assets = verify_recursive(INVENTORIED_EXPECTED_CONTENTS, assets)
ok = True
if failed_rules:
ok = False
print(f"Missing: {', '.join(rule_desc(rule) for rule in failed_rules)}")
if remaining_assets:
ok = False
for asset in remaining_assets:
print(f"{asset} is in this box, but we didn't expect it to be")
return ok
def get_yes_no(prompt):
while True:
response = input_single_char(prompt)
if response == "y":
return True
elif response == "n":
return False
def verify_non_inventoried_contents():
prompts = [f"Is/are there {desc} in the box? (y/n) " for desc in NON_INVENTORIED_EXPECTED_CONTENTS]
prompts += [
"Are the batteries inside the battery bag? (y/n) ",
"Are the power board, both motor boards, servo board and ruggeduino inside a jiffy bag? (y/n) ",
]
for prompt in prompts:
while not get_yes_no(prompt):
pass # try again mate
return True
def verify_contents(assets):
return verify_inventoried_contents(assets) and verify_non_inventoried_contents()
def input_box() -> Asset:
while True:
try:
box = Asset.find(input("Scan a RUB:\n> "))
except InvalidAssetCodeException as e:
print(e)
continue
if box.type != "box-18l-rub":
print(f"That's a {box.type}, not a box-18l-rub.")
continue
if not box.is_empty:
print(f"Please empty the RUB in the inventory first.")
continue
return box
def find_battery_bag(assets):
for asset in assets:
if asset.name.startswith("lipo-bag-"):
return asset
raise RuntimeError("no battery bag; verification should have caught this already")
def commit_box(box, contents):
box.move_to(".")
battery_bag = find_battery_bag(contents)
for item in contents:
if item.name.startswith("battery-lipo-"):
item.move_into(battery_bag)
else:
item.move_into(box)
def add_to_contents(contents, item_to_add):
for item_in_contents in list(contents):
if item_in_contents.code == item_to_add.code:
print(f"{item_to_add} is already in the box!")
return
contents.append(item_to_add)
def remove_from_contents(contents, item_to_remove):
for item_in_contents in list(contents):
if item_in_contents.code == item_to_remove.code:
contents.remove(item_in_contents)
return
print(f"{item_to_remove} wasn't in the box!")
def ensure_batteries_are_physically_inside_bags():
while not get_yes_no("Are the batteries inside the battery bag? (y/n) "):
pass
def pack_box():
box = input_box()
contents = []
while True:
verify_result = verify_inventoried_contents(contents)
response = input(f"Scan an asset to put it into {box}; or precede asset code with '-' to remove it from {box}; or scan {box} to verify and finish:\n> ")
remove = False
if response.startswith("-"):
remove = True
response = response.lstrip("-")
try:
item = Asset.find(response)
except InvalidAssetCodeException as e:
print(e)
continue
if remove:
remove_from_contents(contents, item)
else:
if item.code == box.code:
verify_result = verify_result and verify_non_inventoried_contents()
if verify_result:
ensure_batteries_are_physically_inside_bags()
print(f"Verification succeeded, updating the inventory...")
commit_box(box, contents)
print(f"Finished packing {box}.")
print()
print()
return
else:
print("Verification failed, fix the problems and try again.")
elif item.type == "box-18l-rub":
print("You can't start a new until you've finished this one.")
else:
add_to_contents(contents, item)
def pack_boxes_forever():
while True:
pack_box()
def main():
pack_boxes_forever()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment