Skip to content

Instantly share code, notes, and snippets.

@Parlane

Parlane/adsb.py Secret

Created August 3, 2016 23:21
Show Gist options
  • Save Parlane/10424b2eb729842a194e7dafec7755b7 to your computer and use it in GitHub Desktop.
Save Parlane/10424b2eb729842a194e7dafec7755b7 to your computer and use it in GitHub Desktop.
"""
A python package for decoding ABS-D messages.
Copyright (C) 2015 Junzi Sun (TU Delft)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import math
import pyModeS.util as util
from pyModeS.util import crc
def df(msg):
"""Get the downlink format (DF) number
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: DF number
"""
return util.df(msg)
def icao(msg):
"""Get the ICAO 24 bits address, bytes 3 to 8.
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
String: ICAO address in 6 bytes hexadecimal string
"""
return msg[2:8]
def data(msg):
"""Return the data frame in the message, bytes 9 to 22"""
return msg[8:22]
def typecode(msg):
"""Type code of ADS-B message
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: type code number
"""
msgbin = util.hex2bin(msg)
return util.bin2int(msgbin[32:37])
# ---------------------------------------------
# Aircraft Identification
# ---------------------------------------------
def category(msg):
"""Aircraft category number
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: category number
"""
if typecode(msg) < 1 or typecode(msg) > 4:
raise RuntimeError("%s: Not a identification message" % msg)
msgbin = util.hex2bin(msg)
return util.bin2int(msgbin[5:8])
def callsign(msg):
"""Aircraft callsign
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
string: callsign
"""
if typecode(msg) < 1 or typecode(msg) > 4:
raise RuntimeError("%s: Not a identification message" % msg)
chars = '#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######'
msgbin = util.hex2bin(msg)
csbin = msgbin[40:96]
cs = ''
cs += chars[util.bin2int(csbin[0:6])]
cs += chars[util.bin2int(csbin[6:12])]
cs += chars[util.bin2int(csbin[12:18])]
cs += chars[util.bin2int(csbin[18:24])]
cs += chars[util.bin2int(csbin[24:30])]
cs += chars[util.bin2int(csbin[30:36])]
cs += chars[util.bin2int(csbin[36:42])]
cs += chars[util.bin2int(csbin[42:48])]
# clean string, remove spaces and marks, if any.
cs = cs.replace('_', '')
cs = cs.replace('#', '')
return cs
# ---------------------------------------------
# Positions
# ---------------------------------------------
def oe_flag(msg):
"""Check the odd/even flag. Bit 54, 0 for even, 1 for odd.
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: 0 or 1, for even or odd frame
"""
if typecode(msg) < 5 or typecode(msg) > 18:
raise RuntimeError("%s: Not a position message" % msg)
msgbin = util.hex2bin(msg)
return int(msgbin[53])
def cprlat(msg):
"""CPR encoded latitude
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: encoded latitude
"""
if typecode(msg) < 5 or typecode(msg) > 18:
raise RuntimeError("%s: Not a position message" % msg)
msgbin = util.hex2bin(msg)
return util.bin2int(msgbin[54:71])
def cprlon(msg):
"""CPR encoded longitude
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: encoded longitude
"""
if typecode(msg) < 5 or typecode(msg) > 18:
raise RuntimeError("%s: Not a position message" % msg)
msgbin = util.hex2bin(msg)
return util.bin2int(msgbin[71:88])
nb = 17 # bits used to encode a pos coord
nz = 15 # number of geographic latitude zones between equator and a pole
# latitude zone size in the N-S direction
def Dlat(i, surface=False):
if surface:
return 90.0 / (4 * nz - i)
else:
return 360.0 / (4 * nz - i)
def Dlon(Rlati, i, surface=False):
a = nl(Rlati)
b = 90.0 if surface else 360.0
if (a-i) > 1:
return b / (a-i)
else:
return b
def Rlat(i, YZi, j, surface=False):
Dlati = Dlat(i, surface)
return (Dlati * ((j % (60 - i)) + YZi/(2**17)))
def nl(lat):
# special cases
if lat == 0:
return 59
elif abs(lat) == 87:
return 2
elif abs(lat) > 87:
return 1
a = 1 - math.cos(math.pi / (2.0*nz))
b = math.pow(math.cos(math.pi/180.0 * abs(lat)), 2)
ab = 1 - a/b
if ab > 1:
ab = 1
elif ab < -1:
ab = -1
c = math.acos(ab)
d = (2*math.pi) / c
return math.floor(d)
def position(msg_even, msg_odd, even_time, odd_time):
if typecode(msg_even) < 9 or typecode(msg_even) > 18:
raise RuntimeError("%s: Not a position message" % msg_even)
if typecode(msg_odd) < 9 or typecode(msg_odd) > 18:
raise RuntimeError("%s: Not a position message" % msg_odd)
Dlat0 = Dlat(0)
Dlat1 = Dlat(1)
XZ0 = cprlon(msg_even)
YZ0 = cprlat(msg_even)
XZ1 = cprlon(msg_odd)
YZ1 = cprlat(msg_odd)
j = math.floor((59 * YZ0 - 60*YZ1) / (2**17) + 0.5)
Rlat0 = Rlat(0, YZ0, j)
Rlat1 = Rlat(1, YZ1, j)
if Rlat0 > 270:
Rlat0 -= 360
if Rlat1 > 270:
Rlat1 -= 360
if nl(Rlat0) != nl(Rlat1):
return None
Dlon0 = Dlon(Rlat0, 0)
Dlon1 = Dlon(Rlat1, 1)
if odd_time > even_time:
# odd is latest i = 1
nl1 = nl(Rlat1)
a = (XZ0*(nl1-1))-(XZ1*nl1)
m = math.floor(a/math.pow(2.0,17) + 0.5)
n1 = (nl1 - 1) if (nl1 - 1) > 1 else 1
Rlon1 = Dlon1 * ((m % n1) + XZ1/(2**17))
if Rlon1 > 180:
Rlon1 = Rlon1 - 360
return (Rlat1, Rlon1)
else:
# even is latest i = 0
nl0 = nl(Rlat0)
a = XZ0*(nl0-1)-XZ1*nl0
m = math.floor(a/(2**17) + 0.5)
n0 = (nl0 - 0) if (nl0 - 0) > 1 else 1
Rlon0 = Dlon0 * ((m % n0) + XZ0/(2**17))
if Rlon0 > 180:
Rlon0 = Rlon0 - 360
return (Rlat0, Rlon0)
def position_old(msg0, msg1, t0, t1):
"""Decode position from the combination of even and odd position message
131072 is 2^17, since CPR lat and lon are 17 bits each.
Args:
msg0 (string): even message (28 bytes hexadecimal string)
msg1 (string): odd message (28 bytes hexadecimal string)
t0 (int): timestamps for the even message
t1 (int): timestamps for the odd message
Returns:
(float, float): (latitude, longitude) of the aircraft
"""
if typecode(msg0) < 5 or typecode(msg0) > 18:
raise RuntimeError("%s: Not a position message" % msg0)
if typecode(msg1) < 5 or typecode(msg1) > 18:
raise RuntimeError("%s: Not a position message" % msg1)
msgbin0 = util.hex2bin(msg0)
msgbin1 = util.hex2bin(msg1)
cprlat_even = util.bin2int(msgbin0[54:71]) / 131072.0
cprlon_even = util.bin2int(msgbin0[71:88]) / 131072.0
cprlat_odd = util.bin2int(msgbin1[54:71]) / 131072.0
cprlon_odd = util.bin2int(msgbin1[71:88]) / 131072.0
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# compute latitude index 'j'
j = int(math.floor(59 * cprlat_even - 60 * cprlat_odd + 0.5))
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# check if both are in the same latidude zone, exit if not
if _cprNL(lat_even) != _cprNL(lat_odd):
return None
# compute ni, longitude index m, and longitude
if (t0 > t1):
ni = _cprN(lat_even, 0)
m = math.floor(cprlon_even * (_cprNL(lat_even)-1) -
cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
lat = lat_even
else:
ni = _cprN(lat_odd, 1)
m = math.floor(cprlon_even * (_cprNL(lat_odd)-1) -
cprlon_odd * _cprNL(lat_odd) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_odd)
lat = lat_odd
if lon > 180:
lon = lon - 360
return lat, lon
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 60
a = 1 - math.cos(math.pi * 2 / nz)
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(nl)
except:
# happens when latitude is +/-90 degree
return 1
def position_known(msg, latlon):
lats, lons = latlon
i = oe_flag(msg)
YZi = cprlat(msg)
XZi = cprlon(msg)
Dlati = Dlat(i)
# compute latitude index 'j'
j = math.floor(lats/Dlati) + math.floor(0.5 + ((lats % Dlati)/Dlati) - (YZi/(2**nb)))
Rlati = Dlati * (j + (YZi/(2**nb)))
nlri = nl(Rlati) - i
if nlri > 0:
Dloni = 360 / nlri
else:
Dloni = 360
m = math.floor(lons/Dloni) + math.floor(0.5 + ((lons % Dloni)/Dloni) - (XZi/2**nb))
Rloni = Dloni * (m + XZi/(2**nb))
return Rlati, Rloni
def position_surface_known(msg, latlon):
lats, lons = latlon
i = oe_flag(msg)
YZi = cprlat(msg)
XZi = cprlon(msg)
Dlati = Dlat(i, True)
# compute latitude index 'j'
j = math.floor(lats/Dlati) + math.floor(0.5 + ((lats % Dlati)/Dlati) - (YZi/(2**nb)))
Rlati = Dlati * (j + (YZi/(2**nb)))
nlri = nl(Rlati) - i
if nlri > 0:
Dloni = 90 / nlri
else:
Dloni = 90
m = math.floor(lons/Dloni) + math.floor(0.5 + ((lons % Dloni)/Dloni) - (XZi/2**nb))
Rloni = Dloni * (m + XZi/(2**nb))
return Rlati, Rloni
def position_surface(msg_even, msg_odd, even_time, odd_time):
if typecode(msg_even) < 5 or typecode(msg_even) > 8:
raise RuntimeError("%s: Not a surface position message" % msg_even)
if typecode(msg_odd) < 5 or typecode(msg_odd) > 8:
raise RuntimeError("%s: Not a surface position message" % msg_odd)
XZ0 = cprlon(msg_even)
YZ0 = cprlat(msg_even)
XZ1 = cprlon(msg_odd)
YZ1 = cprlat(msg_odd)
Dlat0 = Dlat(0, True)
Dlat1 = Dlat(1, True)
j = math.floor((59 * YZ0 - 60*YZ1) / (2**17) + 0.5)
# Work out latitude
Rlat0 = Rlat(0, YZ0, j, True)
Rlat1 = Rlat(1, YZ1, j, True)
# Assume southen lats :(
# "To determine the correct latitude of the target, it is necessary to make
# use of the location of the receiver."
Rlat0 -= 90.0
Rlat1 -= 90.0
if nl(Rlat0) != nl(Rlat1):
return None
Dlon0 = Dlon(Rlat0, 0, True)
Dlon1 = Dlon(Rlat1, 1, True)
if odd_time > even_time:
# odd is latest i = 1
nl1 = nl(Rlat1)
a = XZ0*(nl1-1) - XZ1*nl1
m = math.floor(a/(2**17) + 0.5)
n1 = (nl1 - 1) if (nl1 - 1) > 1 else 1
Rlon1 = Dlon1 * ((m % n1) + XZ1/(2**17))
lat = Rlat1
lon = Rlon1
else:
# even is latest i = 0
nl0 = nl(Rlat0)
a = XZ0*(nl0-1) - XZ1*nl0
m = math.floor(a/(2**17) + 0.5)
n0 = (nl0 - 0) if (nl0 - 0) > 1 else 1
Rlon0 = Dlon0 * ((m % n0) + XZ0/(2**17))
lat = Rlat0
lon = Rlon0
# Guess Long
closest_lon = 170
distance = 1000
lon_offset = 0
to_guess = [0, 90, -90, -180]
for i in to_guess:
d = abs((lon+i) - closest_lon)
if d < distance:
lon_offset = i
distance = d
return (lat,lon+lon_offset)
def altitude(msg):
"""Decode aircraft altitude
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: altitude in feet
"""
if typecode(msg) < 9 or typecode(msg) > 18:
raise RuntimeError("%s: Not a position message" % msg)
msgbin = util.hex2bin(msg)
q = msgbin[47]
if q:
n = util.bin2int(msgbin[40:47]+msgbin[48:52])
alt = n * 25 - 1000
return alt
else:
return None
def nic(msg):
"""Calculate NIC, navigation integrity category
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: NIC number (from 0 to 11), -1 if not applicable
"""
if typecode(msg) < 9 or typecode(msg) > 18:
raise RuntimeError("%s: Not a airborne position message" % msg)
msgbin = util.hex2bin(msg)
tc = typecode(msg)
nic_sup_b = util.bin2int(msgbin[39])
if tc in [0, 18, 22]:
nic = 0
elif tc == 17:
nic = 1
elif tc == 16:
if nic_sup_b:
nic = 3
else:
nic = 2
elif tc == 15:
nic = 4
elif tc == 14:
nic = 5
elif tc == 13:
nic = 6
elif tc == 12:
nic = 7
elif tc == 11:
if nic_sup_b:
nic = 9
else:
nic = 8
elif tc in [10, 21]:
nic = 10
elif tc in [9, 20]:
nic = 11
else:
nic = -1
return nic
def surf_nic(msg):
"""Calculate NIC, navigation integrity category for surface
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: NIC number (from 0 to 11), -1 if not applicable
"""
if typecode(msg) < 5 or typecode(msg) > 8:
raise RuntimeError("%s: Not a surface position message" % msg)
msgbin = util.hex2bin(msg)
tc = typecode(msg)
nic_sup_a = util.bin2int(msgbin[75])
nic_sup_c = util.bin2int(msgbin[51])
nic = -1
if tc in [0, 8] and nic_sup_a == 0 and nic_sup_c == 0:
nic = 0
elif tc == 8 and nic_sup_a != nic_sup_c:
nic = 6
elif tc == 8:
nic = 7
elif tc == 7 and nic_sup_a == 0:
nic = 8
elif tc == 7 and nic_sup_a == 1:
nic = 9
elif tc == 6:
nic = 10
elif tc == 5:
nic = 11
return nic
def subtype(msg):
if typecode(msg) not in [19, 23, 24, 28, 29, 30, 31]:
raise RuntimeError("%s: Message does not contain a typecode field" % msg)
msgbin = util.hex2bin(msg)
return util.bin2int(msgbin[37:40])
def modea(msg):
if typecode(msg) not in [28] and subtype(msg) != 1:
raise RuntimeError("%s: Not a Extended Squitter for emergency data" % msg)
msgbin = util.hex2bin(msg)
modea_bits = util.bin2int(msgbin[43:56])
C1 = (modea_bits >> 12) & 0x1
A1 = (modea_bits >> 11) & 0x1
C2 = (modea_bits >> 10) & 0x1
A2 = (modea_bits >> 9) & 0x1
C4 = (modea_bits >> 8) & 0x1
A4 = (modea_bits >> 7) & 0x1
B1 = (modea_bits >> 5) & 0x1
D1 = (modea_bits >> 4) & 0x1
B2 = (modea_bits >> 3) & 0x1
D2 = (modea_bits >> 2) & 0x1
B4 = (modea_bits >> 1) & 0x1
D4 = (modea_bits >> 0) & 0x1
ssr = (A4 << 11) | (A2 << 10) | (A1 << 9) | \
(B4 << 8) | (B2 << 7) | (B1 << 6) | \
(C4 << 5) | (C2 << 4) | (C1 << 3) | \
(D4 << 2) | (D2 << 1) | (D1 << 0)
return ssr
# ---------------------------------------------
# Velocity
# ---------------------------------------------
def velocity(msg):
"""Calculate the speed, heading, and vertical rate
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
(int, float, int, string): speed (kt), heading (degree),
rate of climb/descend (ft/min), and speed type
('GS' for ground speed, 'AS' for airspeed)
"""
if typecode(msg) != 19:
raise RuntimeError("%s: Not a airborne velocity message" % msg)
msgbin = util.hex2bin(msg)
st = subtype(msg)
if st in (1, 2):
v_ew_sign = util.bin2int(msgbin[45])
v_ew = util.bin2int(msgbin[46:56]) - 1 # east-west velocity
v_ns_sign = util.bin2int(msgbin[56])
v_ns = util.bin2int(msgbin[57:67]) - 1 # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)-1 # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg) # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val
tag = 'GS'
else:
hdg = util.bin2int(msgbin[46:56]) / 1024.0 * 360.0
spd = util.bin2int(msgbin[57:67])-1
tag = 'AS'
if st == 4:
print ("super sonic!")
vr_sign = util.bin2int(msgbin[68])
vr = util.bin2int(msgbin[68:77]) # vertical rate
rocd = -1*vr if vr_sign else vr # rate of climb/descend
return int(spd), hdg, int(rocd), tag
def surface_heading(msg):
if typecode(msg) < 5 or typecode(msg) > 8:
raise RuntimeError("%s: Not a surface position message" % msg)
msgbin = util.hex2bin(msg)
heading_valid = util.bin2int(msgbin[44])
if heading_valid:
return util.bin2int(msgbin[45:52]) * (360.0/128.0)
return None
def speed_heading(msg):
"""Get speed and heading only from the velocity message
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
(int, float): speed (kt), heading (degree)
"""
spd, hdg, rocd, tag = velocity(msg)
return spd, hdg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment