Skip to content

Instantly share code, notes, and snippets.

@JoeKarlsson
Last active August 9, 2022 20:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JoeKarlsson/011d893c3283728390876374803f3c22 to your computer and use it in GitHub Desktop.
Save JoeKarlsson/011d893c3283728390876374803f3c22 to your computer and use it in GitHub Desktop.
Viam Dofbot Config
from viam.components.arm import ArmClient
from viam.components.gripper import GripperClient
from viam.proto.api.component.arm import JointPositions
from viam.robot.client import RobotClient
from viam.components.camera import CameraClient, Camera
from viam.services.vision import DetectorConfig, DetectorType, Detection
from viam.services.types import ServiceType
from viam.rpc.dial import Credentials, DialOptions
import time
import asyncio
import os
from math import degrees
from PIL import Image, ImageDraw
# Connect our robot to the Viam app in the cloud
async def setupRobot():
ADDRESS_SECRET = "Dofbot-main.dt0gzquiy2.local.viam.cloud:8080"
PAYLOAD_SECRET = "8r7b0za66679nxqe58wxkciys9slsl7p5q7rudg541u2d6b3"
# ADDRESS_SECRET = "[ADD YOUR ROBOT ADDRESS HERE. YOU CAN FIND THIS ON THE CONNECT TAB]"
# PAYLOAD_SECRET = "[PLEASE ADD YOUR SECRET HERE. YOU CAN FIND THIS ON THE CONNECT TAB]"
creds = Credentials(type='robot-location-secret', payload=PAYLOAD_SECRET)
opts = RobotClient.Options(refresh_interval=0, dial_options=DialOptions(credentials=creds))
robot = await RobotClient.at_address(ADDRESS_SECRET, opts)
# print(robot.resource_names)
return robot
# initialize the robot arm in a vertical straight position
async def resetRobotArmPosition(robot: RobotClient, arm: ArmClient, gripper: GripperClient):
init_position = JointPositions(values=[0, -90, 45, 45, 0]) # bent at 90 degree angle
await arm.move_to_joint_positions(init_position)
await gripper.open()
return robot
async def cleanUpOldPhotos():
mydir = "data"
for f in os.listdir(mydir):
if not f.endswith(".png"):
continue
os.remove(os.path.join(mydir, f))
# Moves the arm left or right when the triggers are pressed and held
async def handleLeftRightTrigger(arm: ArmClient, leftOrRight):
currentPosition = await arm.get_joint_positions()
speed = 11.0
if leftOrRight == 'left':
newServoOnePosition = (currentPosition.values[0]) + speed
else:
newServoOnePosition = (currentPosition.values[0]) - speed
if newServoOnePosition >= -90 and newServoOnePosition <= 90:
newArmPositions = [newServoOnePosition,
currentPosition.values[1],
currentPosition.values[2],
currentPosition.values[3],
currentPosition.values[4]
]
newArmJointPositions = JointPositions(values=newArmPositions)
await arm.move_to_joint_positions(newArmJointPositions)
# Returns the vision service with an added detector
async def getVisionService(robot: RobotClient):
vis = robot.get_service(ServiceType.VISION)
detectors = await vis.get_detector_names()
paramDict = {"model_path": "/home/viamRobot1/models/effdet0.tflite", "num_threads": 2, "label_path": "/home/viamRobot1/models/labels.txt"}
tflite = DetectorConfig(name="ml-detector", type=DetectorType("tflite"), parameters=paramDict)
await vis.add_detector(tflite)
return vis
# takePic returns a PIL image from the camera
async def takePic(robot):
camera = Camera.from_robot(robot, "camera")
frame = await camera.get_frame()
return frame
# Saves the photo as a PNG to local storage
def savePic(frame):
timestr = time.strftime("%Y%m%d-%H%M%S")
frame.save("data/lastFrame" + timestr + ".png")
width, height = frame.size
frame.show()
return frame
# Draws the detection box on the frame
async def drawFancyBox(d, frame):
w, h = frame.size
rect = [d.x_min, d.y_min , d.x_max, d.y_max]
img1 = ImageDraw.Draw(frame)
img1.rectangle(rect, outline="red")
img1.text((d.x_min+10,d.y_min+10),d.class_name)
return frame
# Takes a pic, and uses the ml-detector to see what's in it
# Only considers the top 5 (according to model) most likely detections.
async def takePhotoAndDetectImageContents(robot: RobotClient):
vis = await getVisionService(robot)
frame = await takePic(robot)
cleanImage = frame.convert('RGB')
detections = await vis.get_detections_from_camera('camera', "ml-detector")
for d in detections[0:5]:
frame2 = await drawFancyBox(d,frame)
# savePic(frame2)
return frame2
# Determines whether the arm should move left or right based on the largest detection box
async def leftOrRight(robot, detections, midpoint):
# Get largest box
largest_personbox_area = 0
largest = Detection()
if not detections:
print("Joe - it isnt detecting")
return -1
for d in detections:
a = (d.x_max - d.x_min) * (d.y_max-d.y_min)
if d.class_name == 'Person':
if a > largest_personbox_area:
a = largest_personbox_area
largest = d
largestCenterX = (largest.x_min + largest.x_max) / 2
leftMidPointX = midpoint - (midpoint / 6)
rightMidPointX = midpoint + (midpoint / 6)
if largestCenterX > leftMidPointX and largestCenterX < rightMidPointX:
return 1 # basically centered
if largestCenterX < rightMidPointX:
return 0 # on the right
else:
return 2 # on the left
async def findAndFollowMe(robot: RobotClient, arm: ArmClient):
vis = await getVisionService(robot)
vis = await getVisionService(robot)
while (True):
frame = await takePic(robot)
cleanImage = frame.convert('RGB')
detections = await vis.get_detections(cleanImage, "ml-detector")
await takePhotoAndDetectImageContents(robot)
answer = await leftOrRight(robot, detections, frame.size[0] / 2)
if answer == 0:
print("left")
await handleLeftRightTrigger(arm, 'left')
if answer == 1:
print("center")
if answer == 2:
print("right")
await handleLeftRightTrigger(arm, 'right')
await asyncio.sleep(0.05)
async def main():
robot = await setupRobot()
await cleanUpOldPhotos()
arm = ArmClient.from_robot(robot=robot, name='myarm')
gripper = GripperClient.from_robot(robot=robot, name='mygripper')
await resetRobotArmPosition(robot, arm, gripper)
await findAndFollowMe(robot, arm)
await robot.close()
if __name__ == '__main__':
print("Starting up... ")
asyncio.run(main())
print("Done.")
from viam.components.arm import ArmClient
from viam.components.gripper import GripperClient
from viam.proto.api.component.arm import JointPositions
import viam.proto.api.component.inputcontroller as ic
from viam.robot.client import RobotClient
from viam.components.camera import CameraClient, Camera
from viam.services.vision import DetectorConfig, DetectorType, Detection
from viam.services.motion import MotionClient
from viam.services.types import ServiceType
from viam.proto.api.common import Pose, PoseInFrame, WorldState, GeometriesInFrame, Geometry, RectangularPrism, ResourceName
from viam.rpc.dial import Credentials, DialOptions
import time
import asyncio
import random
from math import degrees
from PIL import Image, ImageDraw
# Connect our robot to the Viam app in the cloud
async def setupRobot():
ADDRESS_SECRET = "Dofbot-main.dt0gzquiy2.local.viam.cloud:8080"
PAYLOAD_SECRET = "8r7b0za66679nxqe58wxkciys9slsl7p5q7rudg541u2d6b3"
# ADDRESS_SECRET = "[ADD YOUR ROBOT ADDRESS HERE. YOU CAN FIND THIS ON THE CONNECT TAB]"
# PAYLOAD_SECRET = "[PLEASE ADD YOUR SECRET HERE. YOU CAN FIND THIS ON THE CONNECT TAB]"
creds = Credentials(type='robot-location-secret', payload=PAYLOAD_SECRET)
opts = RobotClient.Options(refresh_interval=0, dial_options=DialOptions(credentials=creds))
robot = await RobotClient.at_address(ADDRESS_SECRET, opts)
# print(robot.resource_names)
return robot
# initialize the robot arm in a vertical straight position
async def resetRobotArmPosition(robot: RobotClient, arm: ArmClient, gripper: GripperClient):
init_position = JointPositions(values=[0, -90, 45, 45, 0]) # bent at 90 degree angle
# init_position = JointPositions(values=[0, -90, 0, 0, 0]) # stick straight up in the air
await arm.move_to_joint_positions(init_position)
await gripper.open()
return robot
# Opens the gripper and closes it again
async def openAndCloseGripper(gripper: GripperClient):
await gripper.open()
await asyncio.sleep(0.5)
await gripper.grab()
# Moves the arm left or right when the triggers are pressed and held
async def handleLeftRightTrigger(arm: ArmClient, leftOrRight):
currentPosition = await arm.get_joint_positions()
speed = 20.0
if leftOrRight == 'left':
newServoOnePosition = (currentPosition.values[0]) + speed
else:
newServoOnePosition = (currentPosition.values[0]) - speed
if newServoOnePosition >= -90 and newServoOnePosition <= 90:
newArmPositions = [newServoOnePosition,
currentPosition.values[1],
currentPosition.values[2],
currentPosition.values[3],
currentPosition.values[4]
]
newArmJointPositions = JointPositions(values=newArmPositions)
await arm.move_to_joint_positions(newArmJointPositions)
# generic function to handle all the buttons on a 8bit controller and maps them to the robot arm
async def handleController(robot: RobotClient, arm: ArmClient, gripper: GripperClient):
controller = ic.InputControllerServiceStub(robot._channel)
resp: ic.GetControlsResponse = await controller.GetControls(ic.GetControlsRequest(controller="8bit-do-controller"))
# Show the input controller's buttons/axes
print(f'Controls:\n{resp.controls}')
# Subscribe to AllEvents on All Controls from above.
subEvents = [ic.StreamEventsRequest.Events(events=["AllEvents"], control=c) for c in resp.controls]
request = ic.StreamEventsRequest(controller="8bit-do-controller", events=subEvents)
async with controller.StreamEvents.open() as stream:
await stream.send_message(request, end=True)
async for reply in stream:
# Pretty print the event (time is UTC)
print(f'{reply.event.time.ToDatetime()} - {reply.event.control} - {reply.event.event} - {reply.event.value}')
if reply.event.control == "ButtonStart":
if reply.event.event == "ButtonPress":
await resetRobotArmPosition(robot, arm, gripper)
if reply.event.control == "ButtonSelect":
if reply.event.event == "ButtonPress":
await openAndCloseGripper(gripper)
if reply.event.control == "ButtonLT":
if reply.event.event == "ButtonHold":
await handleLeftRightTrigger(arm, 'left')
# elif reply.event.event == "ButtonRelease":
if reply.event.control == "ButtonRT":
if reply.event.event == "ButtonHold":
await handleLeftRightTrigger(arm, 'right')
# elif reply.event.event == "ButtonRelease":
async def main():
robot = await setupRobot()
arm = ArmClient.from_robot(robot=robot, name='myarm')
gripper = GripperClient.from_robot(robot=robot, name='mygripper')
await resetRobotArmPosition(robot, arm, gripper)
await handleController(robot, arm, gripper)
await robot.close()
if __name__ == '__main__':
print("Starting up... ")
asyncio.run(main())
print("Done.")
from viam.components.arm import ArmClient
from viam.components.gripper import GripperClient
from viam.proto.api.component.arm import JointPositions
from viam.robot.client import RobotClient
from viam.services.types import ServiceType
from viam.proto.api.common import Pose, PoseInFrame, WorldState, GeometriesInFrame, Geometry, RectangularPrism, ResourceName
from viam.rpc.dial import Credentials, DialOptions
import time
import asyncio
import random
from math import degrees
from PIL import Image, ImageDraw
# Connect our robot to the Viam app in the cloud
async def setupRobot():
ADDRESS_SECRET = "Dofbot-main.dt0gzquiy2.local.viam.cloud:8080"
PAYLOAD_SECRET = "8r7b0za66679nxqe58wxkciys9slsl7p5q7rudg541u2d6b3"
# ADDRESS_SECRET = "[ADD YOUR ROBOT ADDRESS HERE. YOU CAN FIND THIS ON THE CONNECT TAB]"
# PAYLOAD_SECRET = "[PLEASE ADD YOUR SECRET HERE. YOU CAN FIND THIS ON THE CONNECT TAB]"
creds = Credentials(type='robot-location-secret', payload=PAYLOAD_SECRET)
opts = RobotClient.Options(refresh_interval=0, dial_options=DialOptions(credentials=creds))
robot = await RobotClient.at_address(ADDRESS_SECRET, opts)
# print(robot.resource_names)
return robot
# initialize the robot arm in a vertical straight position
async def resetRobotArmPosition(robot: RobotClient, arm: ArmClient, gripper: GripperClient):
init_position = JointPositions(values=[0, -90, 45, 45, 0]) # bent at 90 degree angle
await arm.move_to_joint_positions(init_position)
await gripper.open()
return robot
# Creates a virtual rectangular obstacle in the dofbots world, and moves around while avoiding this virtual obstacle
async def motionServiceTest(robot: RobotClient, arm: ArmClient):
motionServ = robot.get_service(ServiceType.MOTION)
# That will create a 1x1x1 obstacle 5mm below wherever your arm currently is
geom = Geometry(center=Pose(x=410, y=-110, z=527), box=RectangularPrism(width_mm =2, length_mm =5, depth_mm =5))
geomFrame = GeometriesInFrame(reference_frame="myarm", geometries=[geom])
worldstate = WorldState(obstacles=[geomFrame])
# Coordinates of the change in the DOFBOT's arm's pose
x = 0
y = 0
z = 30
# orientation or theta
o_x = 0
o_y = 0
o_z = 1
pos = await arm.get_end_position()
print(pos)
pos.x += x
pos.y += y
pos.z += z
pos.o_x += o_x
pos.o_y += o_y
pos.o_z += o_z
for resname in robot.resource_names:
if resname.name == "myarm":
armRes = resname
# We pass the WorldState above with the geometry. The arm should successfully route around it.
await motionServ.move(component_name=armRes, destination=PoseInFrame(reference_frame="world", pose=pos), world_state=worldstate)
pos = await arm.get_end_position()
pos.x += x
pos.y += y
pos.z += z
pos.o_x += o_x
pos.o_y += o_y
pos.o_z += o_z
await asyncio.sleep(1)
await motionServ.move(component_name=armRes, destination=PoseInFrame(reference_frame="world", pose=pos), world_state=worldstate)
async def main():
robot = await setupRobot()
arm = ArmClient.from_robot(robot=robot, name='myarm')
gripper = GripperClient.from_robot(robot=robot, name='mygripper')
await resetRobotArmPosition(robot, arm, gripper)
await motionServiceTest(robot, arm)
await robot.close()
if __name__ == '__main__':
print("Starting up... ")
asyncio.run(main())
print("Done.")
from viam.components.arm import ArmClient
from viam.components.gripper import GripperClient
from viam.proto.api.component.arm import JointPositions
from viam.robot.client import RobotClient
from viam.rpc.dial import Credentials, DialOptions
import time
import asyncio
import random
from math import degrees
from PIL import Image, ImageDraw
# Connect our robot to the Viam app in the cloud
async def setupRobot():
ADDRESS_SECRET = "Dofbot-main.dt0gzquiy2.local.viam.cloud:8080"
PAYLOAD_SECRET = "8r7b0za66679nxqe58wxkciys9slsl7p5q7rudg541u2d6b3"
# ADDRESS_SECRET = "[ADD YOUR ROBOT ADDRESS HERE. YOU CAN FIND THIS ON THE CONNECT TAB]"
# PAYLOAD_SECRET = "[PLEASE ADD YOUR SECRET HERE. YOU CAN FIND THIS ON THE CONNECT TAB]"
creds = Credentials(type='robot-location-secret', payload=PAYLOAD_SECRET)
opts = RobotClient.Options(refresh_interval=0, dial_options=DialOptions(credentials=creds))
robot = await RobotClient.at_address(ADDRESS_SECRET, opts)
# print(robot.resource_names)
return robot
# initialize the robot arm in a vertical straight position
async def resetRobotArmPosition(robot: RobotClient, arm: ArmClient, gripper: GripperClient):
init_position = JointPositions(values=[0, -90, 45, 45, 0]) # bent at 90 degree angle
# init_position = JointPositions(values=[0, -90, 0, 0, 0]) # stick straight up in the air
await arm.move_to_joint_positions(init_position)
await gripper.open()
return robot
# Get a random position of each servo that is within the range of movement that won't completly destroy the arm
def getRandoms():
return [random.randint(-90, 90),
random.randint(-120, -45),
random.randint(-45, 45),
random.randint(-45, 45),
random.randint(-45, 45)]
# Opens the gripper and closes it again
async def openAndCloseGripper(gripper: GripperClient):
await gripper.open()
await asyncio.sleep(0.5)
await gripper.grab()
# Moves the arm into a new random position every second
async def randomMovement(robot: RobotClient, arm: ArmClient, gripper: GripperClient):
while (True):
await openAndCloseGripper(gripper)
randomPositions = getRandoms()
newRandomArmJointPositions = JointPositions(values=randomPositions)
await arm.move_to_joint_positions(newRandomArmJointPositions)
await asyncio.sleep(1)
return
async def main():
robot = await setupRobot()
arm = ArmClient.from_robot(robot=robot, name='myarm')
gripper = GripperClient.from_robot(robot=robot, name='mygripper')
await resetRobotArmPosition(robot, arm, gripper)
await randomMovement(robot, arm, gripper)
await robot.close()
if __name__ == '__main__':
print("Starting up... ")
asyncio.run(main())
print("Done.")
{
"components": [
{
"attributes": {
"i2cs": [
{
"bus": "1",
"name": "bus1"
}
]
},
"depends_on": [],
"model": "pi",
"name": "local",
"service_config": [],
"type": "board"
},
{
"attributes": {
"camera_parameters": {
"distortion": {
"rk1": 0,
"rk2": 0,
"rk3": 0,
"tp1": 0,
"tp2": 0
},
"fx": 0,
"fy": 0,
"height": 720,
"ppx": 0,
"ppy": 0,
"width": 720
},
"debug": false,
"dump": false,
"format": "",
"height": 0,
"hide": false,
"path": "/dev/video0",
"path_pattern": "",
"source": "",
"stream": "",
"width": 0
},
"depends_on": [],
"model": "webcam",
"name": "camera",
"service_config": [
{
"attributes": {
"capture_methods": []
},
"type": "data_manager"
}
],
"type": "camera"
},
{
"attributes": {
"board": "local",
"i2c": "bus1"
},
"depends_on": [
"local"
],
"model": "yahboom-dofbot",
"name": "myarm",
"service_config": [],
"type": "arm",
"frame": {
"parent": "world"
}
},
{
"attributes": {
"arm": "myarm"
},
"depends_on": [
"myarm"
],
"model": "yahboom-dofbot",
"name": "mygripper",
"service_config": [],
"type": "gripper"
},
{
"attributes": {
"auto_reconnect": true
},
"depends_on": [],
"model": "gamepad",
"name": "8bit-do-controller",
"service_config": [],
"type": "input_controller"
}
],
"services": [
{
"attributes": {
"base": "yahboom-base",
"input_controller": "8bit"
},
"name": "yahboom_gamepad_control",
"type": "base_remote_control"
},
{
"attributes": {},
"name": "motion",
"type": "motion"
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment