Skip to content

Instantly share code, notes, and snippets.

@tsu-nera
Created July 25, 2017 15:44
Show Gist options
  • Save tsu-nera/5325a7ce38efa08048fa7b2ee4bc4aae to your computer and use it in GitHub Desktop.
Save tsu-nera/5325a7ce38efa08048fa7b2ee4bc4aae to your computer and use it in GitHub Desktop.
Mindstorms ev3 control with Leap Motion
import Leap, sys
import paho.mqtt.client as mqtt
from Leap import CircleGesture
client = mqtt.Client()
client.connect("localhost",1883,60)
class SampleListener(Leap.Listener):
def on_connect(self, controller):
controller.enable_gesture(Leap.Gesture.TYPE_CIRCLE);
def on_frame(self, controller):
# Get the most recent frame and report some basic information
frame = controller.frame()
if frame.id % 10 != 0:
return
hands = frame.hands
hand = hands[0] # first hand
x = hand.palm_position[0]
y = hand.palm_position[1]
z = hand.palm_position[2]
print hand.palm_position
if z < 0:
client.publish("topic/motor/dt", str(50) + "," + str(50))
elif z > 0:
client.publish("topic/motor/dt", str(-50) + "," + str(-50))
elif x == 0 and y == 0 and z == 0:
client.publish("topic/motor/dt", str(0) + "," + str(0))
for gesture in frame.gestures():
if gesture.type == Leap.Gesture.TYPE_CIRCLE:
circle = Leap.CircleGesture(gesture)
if (circle.pointable.direction.angle_to(circle.normal) <= Leap.PI / 2):
client.publish("topic/motor/dt", str(-50) + "," + str(50))
else:
client.publish("topic/motor/dt", str(50) + "," + str(-50))
def main():
# Create a sample listener and controller
listener = SampleListener()
controller = Leap.Controller()
# Have the sample listener receive events from the controller
controller.add_listener(listener)
# Keep this process running until Enter is pressed
print "Press Enter to quit..."
try:
sys.stdin.readline()
except KeyboardInterrupt:
pass
finally:
# Remove the sample listener when done
controller.remove_listener(listener)
if __name__ == "__main__":
main()
# !/usr/bin/env python3
import paho.mqtt.client as mqtt
from ev3dev.auto import *
ma = Motor(OUTPUT_A)
md = Motor(OUTPUT_D)
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("topic/motor/dt")
def on_message(client, userdata, msg):
msg_str = msg.payload.decode("utf-8")
msg_array = msg_str.split(",")
ma.duty_cycle_sp = msg_array[0]
md.duty_cycle_sp = msg_array[1]
client = mqtt.Client()
client.connect("192.168.3.4" ,1883 ,60)
client.on_connect = on_connect
client.on_message = on_message
ma.run_direct()
md.run_direct()
ma.duty_cycle_sp = 0
md.duty_cycle_sp = 0
client.loop_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment