Skip to content

Instantly share code, notes, and snippets.

@vo
Created March 5, 2014 07:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vo/9362490 to your computer and use it in GitHub Desktop.
Save vo/9362490 to your computer and use it in GitHub Desktop.
Quick hack to position hold with MAVLink using external webcam
#!/usr/bin/env python
import sys, os, socket, pickle
from time import time
from optparse import OptionParser
UDP_IP = "127.0.0.1"
UDP_PORT = 31200
# tell python where to find mavlink so we can import it
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), '../mavlink'))
from pymavlink import mavutil
class PosHold(object):
def __init__(self, mavmaster):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((UDP_IP, UDP_PORT))
self.sock.setblocking(0)
self.mavmaster = mavmaster
def mainloop(self):
# wait until the enter key is pressed
raw_input("Enter any key to continue")
# enter main loop
timer = time()
thro = 1400
while True:
# default to released control
control = [0] * 8
try:
# grab position data if it's there
data, addr = self.sock.recvfrom(1024)
diff_x, diff_y = pickle.loads(data)
# scaling diff_y
# negative number --> throttle up
# positive number --> throttle down
height = 480.0
max_spd = 10
normalized_y = (diff_y / height * 2.0)
spd = max_spd * -normalized_y
thro = 1400 + int(spd) # hover at 1400
if (time() - timer) >= 0.02:
print "set throttle to", thro
control[2] = thro
send_rc(self.mavmaster, control)
timer = time()
except:
if (time() - timer) >= 1:
# time was allowed to reach 1
# so no data was received
print "released rc"
release_rc(self.mavmaster)
timer = time()
# release control back to the radio
def release_rc(master):
# a value of 0 releases the control to what the radio says
values = [ 0 ] * 8
for i in xrange(1):
master.mav.rc_channels_override_send(master.target_system,
master.target_component, *values)
# attempt to send a control.
# you can pass 0 to refer to what the radio says
# you can pass 0xFFFF to refer to keep the current value
def send_rc(master, data):
for i in xrange(1):
master.mav.rc_channels_override_send(
master.target_system, master.target_component, *data)
print ("sending rc: %s"%data)
def main():
# read command line options
parser = OptionParser("readdata.py [options]")
parser.add_option("--baudrate", dest="baudrate", type='int',
help="master port baud rate", default=115200)
parser.add_option("--device", dest="device", default=None, help="serial device")
parser.add_option("--rate", dest="rate", default=4, type='int', help="requested stream rate")
parser.add_option("--source-system", dest='SOURCE_SYSTEM', type='int',
default=255, help='MAVLink source system for this GCS')
parser.add_option("--showmessages", dest="showmessages", action='store_true',
help="show incoming messages", default=False)
(opts, args) = parser.parse_args()
if opts.device is None:
print("You must specify a serial device")
sys.exit(1)
# create a mavlink serial instance
master = mavutil.mavlink_connection(opts.device, baud=opts.baudrate)
# wait for the heartbeat msg to find the system ID
master.wait_heartbeat()
# request data to be sent at the given rate
master.mav.request_data_stream_send(master.target_system, master.target_component,
mavutil.mavlink.MAV_DATA_STREAM_ALL, opts.rate, 1)
# run the main loop
app = PosHold(master)
app.mainloop()
if __name__ == '__main__':
main()
#!/usr/bin/env python
import cv
import socket, pickle
UDP_IP = "127.0.0.1"
UDP_PORT = 31200
class Target:
def __init__(self):
self.capture = cv.CaptureFromCAM(1)
cv.NamedWindow("Target", 1)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def run(self):
# Capture first frame to get size
frame = cv.QueryFrame(self.capture)
frame_size = cv.GetSize(frame)
color_image = cv.CreateImage(cv.GetSize(frame), 8, 3)
grey_image = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_8U, 1)
moving_average = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_32F, 3)
first = True
while True:
closest_to_left = cv.GetSize(frame)[0]
closest_to_right = cv.GetSize(frame)[1]
color_image = cv.QueryFrame(self.capture)
# Smooth to get rid of false positives
cv.Smooth(color_image, color_image, cv.CV_GAUSSIAN, 3, 0)
if first:
difference = cv.CloneImage(color_image)
temp = cv.CloneImage(color_image)
cv.ConvertScale(color_image, moving_average, 1.0, 0.0)
first = False
# else:
# cv.RunningAvg(color_image, moving_average, 0.020, None)
# Convert the scale of the moving average.
cv.ConvertScale(moving_average, temp, 1.0, 0.0)
# Minus the current frame from the moving average.
cv.AbsDiff(color_image, temp, difference)
# Convert the image to grayscale.
cv.CvtColor(difference, grey_image, cv.CV_RGB2GRAY)
# Convert the image to black and white.
cv.Threshold(grey_image, grey_image, 70, 255, cv.CV_THRESH_BINARY)
# Dilate and erode to get people blobs
cv.Dilate(grey_image, grey_image, None, 18)
cv.Erode(grey_image, grey_image, None, 10)
storage = cv.CreateMemStorage(0)
contour = cv.FindContours(grey_image, storage, cv.CV_RETR_CCOMP, cv.CV_CHAIN_APPROX_SIMPLE)
points = []
while contour:
bound_rect = cv.BoundingRect(list(contour))
contour = contour.h_next()
pt1 = (bound_rect[0], bound_rect[1])
pt2 = (bound_rect[0] + bound_rect[2], bound_rect[1] + bound_rect[3])
points.append(pt1)
points.append(pt2)
cv.Rectangle(color_image, pt1, pt2, cv.CV_RGB(255,0,0), 1)
if len(points):
center_point = reduce(lambda a, b: ((a[0] + b[0]) / 2, (a[1] + b[1]) / 2), points)
img_center = (frame_size[0]/2, frame_size[1]/2)
diff = [center_point[0] - img_center[0], img_center[1] - center_point[1]]
datastr = pickle.dumps(diff)
self.sock.sendto(datastr, (UDP_IP, UDP_PORT))
cv.Circle(color_image, center_point, 40, cv.CV_RGB(255, 255, 255), 1)
cv.Circle(color_image, center_point, 30, cv.CV_RGB(255, 100, 0), 1)
cv.Circle(color_image, center_point, 20, cv.CV_RGB(255, 255, 255), 1)
cv.Circle(color_image, center_point, 10, cv.CV_RGB(255, 100, 0), 1)
cv.ShowImage("Target", color_image)
# Listen for ESC key
c = cv.WaitKey(7) % 0x100
if c == 27:
break
if __name__=="__main__":
t = Target()
t.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment