Skip to content

Instantly share code, notes, and snippets.

@Williangalvani
Created April 27, 2020 13:42
Show Gist options
  • Save Williangalvani/838a7f14b2dec1af1d25879f81e82293 to your computer and use it in GitHub Desktop.
Save Williangalvani/838a7f14b2dec1af1d25879f81e82293 to your computer and use it in GitHub Desktop.
Waterlinked DVL Integration script (WIP)
#!/usr/bin/env python3
"""
This script is used to integrate the Waterlinked A50 DVL with ardupilot.
It uses a tcp connection to port
There must also be a mavlink connection avaiable at 0.0.0.0:14551 so the script can get
ATTITUDE messages
"""
import asyncio
from pymavlink import mavutil
import math
import time
import socket
import sys
import json
import argparse
if sys.version_info[0] < 3 or sys.version_info[1] < 6:
raise Exception("You must use Python >= 3.6 for async support!")
def cmd_set_home(lat, lon, alt):
'''called when user selects "Set Home (with height)" on map'''
print("Setting home to: ", lat, lon, alt)
return master.mav.set_gps_global_origin_send(
0,
lat, # lat
lon, # lon
alt) # param7
# Create the connection
master = mavutil.mavlink_connection('udpin:0.0.0.0:14551', dialect="ardupilotmega")
# Wait a heartbeat before sending commands
master.wait_heartbeat()
master.mav.statustext_send(text=bytearray("#test", encoding="utf-8"), severity=mavutil.mavlink.MAV_SEVERITY_NOTICE)
# Set GPS origin ( This is the Blue Robotics parking lot)
#cmd_set_home(338414762,-1183351514,1)
cmd_set_home(-274905547, -485299403,1)
last_attitude = [0.0,0.0,0.0]
current_attitude = [0.0,0.0,0.0]
last_position = [0.0,0.0,0.0]
current_position = [0.0,0.0,0.0]
last_time = None
last_response = {"vx": 0, "vy": 0}
def send_vision(x, y, z, rotation=[0,0,0], confidence=100, dt=125000):
"Sends message VISION_POSITION_DELTA to flight controller"
print("sending... dt: {0}us, \tconfidence: {1}".format(dt, confidence))
master.mav.vision_position_delta_send(
0, # time
dt, # delta time (us)
rotation, # angle delta
[x, y, z], # position delta
confidence) # confidence (0-100%)
def send_rangefinder(distance):
print(int(distance*100))
master.mav.distance_sensor_send(
0, # time_boot_ms
0, # min_distance
5000, # max_distance
int(distance*100), # distance
2, # type MAVLINK
0, # device id
25, # PITCH 270
0)
async def receive_attitude():
"Receive attitude messages from flight controller"
global current_attitude
while True:
await asyncio.sleep(0.001)
msg = master.recv_match()
if not msg:
continue
if msg.get_type() == 'ATTITUDE':
msg = msg.to_dict()
roll = msg['roll']
pitch = msg['pitch']
yaw = msg['yaw']
current_attitude = [roll, pitch, yaw]
async def write_dvl(ip, port):
"""every time a new dvl reading is available, calculate new angle deltas
and sends to the flight controller
"""
reader, writer = await asyncio.open_connection(ip, port)
global last_attitude
global last_time
global current_attitude
while True:
data = await reader.read(1000)
if data is None:
await asyncio.sleep(0.01)
continue
try:
data = json.loads(data)
except:
await asyncio.sleep(0.01)
continue
if data["velocity_valid"] == False:
await asyncio.sleep(0.01)
continue
vx, vy, vz, fom = data["vx"], data["vy"], data["vz"], data["fom"]
dt = data["time"] / 1000
dx = dt*vx
dy = dt*vy
dz = dt*vz
# handle bad readings here
#if fom > 0.005:
# print("bad fom:", fom)
# continue
send_rangefinder(data["altitude"])
# Subtracts current altitude from last attitude to get the delta
angles = list(map(float.__sub__, current_attitude, last_attitude))
angles[2] = angles[2] % (math.pi*2)
send_vision(dx, dy, dz, list(angles), dt=int(dt*1e6), confidence=100)
last_attitude = current_attitude
await asyncio.sleep(0.01)
async def main():
"Runs attitude and dvl tasks"
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-i', '--ip', help='DVL IP address', type=str, default='192.168.2.13')
parser.add_argument('-p', '--port', help='TCP port at the DVL', type=int, default=16171)
args = parser.parse_args()
task1 = asyncio.create_task(receive_attitude())
task2 = asyncio.create_task(write_dvl(args.ip, args.port))
await asyncio.gather(task1, task2)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment