Skip to content

Instantly share code, notes, and snippets.

@yangchenyun
Created March 18, 2019 22:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yangchenyun/69dc4e36475c88dde2bad5e02c6b786e to your computer and use it in GitHub Desktop.
Save yangchenyun/69dc4e36475c88dde2bad5e02c6b786e to your computer and use it in GitHub Desktop.
# Two problems to solve:
# Q: How to pass state in a DAG
# A: ???
# Q: How to pass between DAGs?
# A: Use event payload.
# Q: How to early terminate a running DAG?
# A: Using priority to terminate existing DAGs.
# Looping different poses
# A -> B -> C
# ^ --------v
# Loading Area - Z
def rotate(target_pose, pose_sequence=['A', 'B', 'C']):
"""
Args:
target_pose: the pose the rotate would navigate to now.
pose_sequence: The sequence of poses to rotate between.
"""
# Validation
assert(target_pose in pose_sequence)
# RobotOperation
nav(pose=target_pose)
# ButtonOperation
show_buttons([
Button(
name='Go to Loading',
event='loading',
payload={
# This allows the new DAG to take over the current running robot.
priority: 10,
},
)
])
# WaitOperation
wait(5*60) # wait 5 minute
ni = (pose_sequence.index(target_pose) + 1) % len(pose_sequence)
next_pose = pose_sequence(ni)
# EventOperation
trigger_event('rotate', {target_pose: next_pose})
# Pre-defined
LOADING_POS = 'Loading Position'
def loading(from_pose):
# RobotOperation
nav(pose=LOADING_POS)
# ...
# ButtonOperation
show_buttons([
Button(
name='Back to Rotation',
event='rotate',
payload={
target_pose: from_pose,
},
)
])
add_event_listener('rotate', rotate)
add_event_listener('loading', loading)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment