-
-
Save Parlane/10424b2eb729842a194e7dafec7755b7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A python package for decoding ABS-D messages. | |
Copyright (C) 2015 Junzi Sun (TU Delft) | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation, either version 3 of the License, or | |
(at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program. If not, see <http://www.gnu.org/licenses/>. | |
""" | |
import math | |
import pyModeS.util as util | |
from pyModeS.util import crc | |
def df(msg): | |
"""Get the downlink format (DF) number | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
int: DF number | |
""" | |
return util.df(msg) | |
def icao(msg): | |
"""Get the ICAO 24 bits address, bytes 3 to 8. | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
String: ICAO address in 6 bytes hexadecimal string | |
""" | |
return msg[2:8] | |
def data(msg): | |
"""Return the data frame in the message, bytes 9 to 22""" | |
return msg[8:22] | |
def typecode(msg): | |
"""Type code of ADS-B message | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
int: type code number | |
""" | |
msgbin = util.hex2bin(msg) | |
return util.bin2int(msgbin[32:37]) | |
# --------------------------------------------- | |
# Aircraft Identification | |
# --------------------------------------------- | |
def category(msg): | |
"""Aircraft category number | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
int: category number | |
""" | |
if typecode(msg) < 1 or typecode(msg) > 4: | |
raise RuntimeError("%s: Not a identification message" % msg) | |
msgbin = util.hex2bin(msg) | |
return util.bin2int(msgbin[5:8]) | |
def callsign(msg): | |
"""Aircraft callsign | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
string: callsign | |
""" | |
if typecode(msg) < 1 or typecode(msg) > 4: | |
raise RuntimeError("%s: Not a identification message" % msg) | |
chars = '#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######' | |
msgbin = util.hex2bin(msg) | |
csbin = msgbin[40:96] | |
cs = '' | |
cs += chars[util.bin2int(csbin[0:6])] | |
cs += chars[util.bin2int(csbin[6:12])] | |
cs += chars[util.bin2int(csbin[12:18])] | |
cs += chars[util.bin2int(csbin[18:24])] | |
cs += chars[util.bin2int(csbin[24:30])] | |
cs += chars[util.bin2int(csbin[30:36])] | |
cs += chars[util.bin2int(csbin[36:42])] | |
cs += chars[util.bin2int(csbin[42:48])] | |
# clean string, remove spaces and marks, if any. | |
cs = cs.replace('_', '') | |
cs = cs.replace('#', '') | |
return cs | |
# --------------------------------------------- | |
# Positions | |
# --------------------------------------------- | |
def oe_flag(msg): | |
"""Check the odd/even flag. Bit 54, 0 for even, 1 for odd. | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
int: 0 or 1, for even or odd frame | |
""" | |
if typecode(msg) < 5 or typecode(msg) > 18: | |
raise RuntimeError("%s: Not a position message" % msg) | |
msgbin = util.hex2bin(msg) | |
return int(msgbin[53]) | |
def cprlat(msg): | |
"""CPR encoded latitude | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
int: encoded latitude | |
""" | |
if typecode(msg) < 5 or typecode(msg) > 18: | |
raise RuntimeError("%s: Not a position message" % msg) | |
msgbin = util.hex2bin(msg) | |
return util.bin2int(msgbin[54:71]) | |
def cprlon(msg): | |
"""CPR encoded longitude | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
int: encoded longitude | |
""" | |
if typecode(msg) < 5 or typecode(msg) > 18: | |
raise RuntimeError("%s: Not a position message" % msg) | |
msgbin = util.hex2bin(msg) | |
return util.bin2int(msgbin[71:88]) | |
nb = 17 # bits used to encode a pos coord | |
nz = 15 # number of geographic latitude zones between equator and a pole | |
# latitude zone size in the N-S direction | |
def Dlat(i, surface=False): | |
if surface: | |
return 90.0 / (4 * nz - i) | |
else: | |
return 360.0 / (4 * nz - i) | |
def Dlon(Rlati, i, surface=False): | |
a = nl(Rlati) | |
b = 90.0 if surface else 360.0 | |
if (a-i) > 1: | |
return b / (a-i) | |
else: | |
return b | |
def Rlat(i, YZi, j, surface=False): | |
Dlati = Dlat(i, surface) | |
return (Dlati * ((j % (60 - i)) + YZi/(2**17))) | |
def nl(lat): | |
# special cases | |
if lat == 0: | |
return 59 | |
elif abs(lat) == 87: | |
return 2 | |
elif abs(lat) > 87: | |
return 1 | |
a = 1 - math.cos(math.pi / (2.0*nz)) | |
b = math.pow(math.cos(math.pi/180.0 * abs(lat)), 2) | |
ab = 1 - a/b | |
if ab > 1: | |
ab = 1 | |
elif ab < -1: | |
ab = -1 | |
c = math.acos(ab) | |
d = (2*math.pi) / c | |
return math.floor(d) | |
def position(msg_even, msg_odd, even_time, odd_time): | |
if typecode(msg_even) < 9 or typecode(msg_even) > 18: | |
raise RuntimeError("%s: Not a position message" % msg_even) | |
if typecode(msg_odd) < 9 or typecode(msg_odd) > 18: | |
raise RuntimeError("%s: Not a position message" % msg_odd) | |
Dlat0 = Dlat(0) | |
Dlat1 = Dlat(1) | |
XZ0 = cprlon(msg_even) | |
YZ0 = cprlat(msg_even) | |
XZ1 = cprlon(msg_odd) | |
YZ1 = cprlat(msg_odd) | |
j = math.floor((59 * YZ0 - 60*YZ1) / (2**17) + 0.5) | |
Rlat0 = Rlat(0, YZ0, j) | |
Rlat1 = Rlat(1, YZ1, j) | |
if Rlat0 > 270: | |
Rlat0 -= 360 | |
if Rlat1 > 270: | |
Rlat1 -= 360 | |
if nl(Rlat0) != nl(Rlat1): | |
return None | |
Dlon0 = Dlon(Rlat0, 0) | |
Dlon1 = Dlon(Rlat1, 1) | |
if odd_time > even_time: | |
# odd is latest i = 1 | |
nl1 = nl(Rlat1) | |
a = (XZ0*(nl1-1))-(XZ1*nl1) | |
m = math.floor(a/math.pow(2.0,17) + 0.5) | |
n1 = (nl1 - 1) if (nl1 - 1) > 1 else 1 | |
Rlon1 = Dlon1 * ((m % n1) + XZ1/(2**17)) | |
if Rlon1 > 180: | |
Rlon1 = Rlon1 - 360 | |
return (Rlat1, Rlon1) | |
else: | |
# even is latest i = 0 | |
nl0 = nl(Rlat0) | |
a = XZ0*(nl0-1)-XZ1*nl0 | |
m = math.floor(a/(2**17) + 0.5) | |
n0 = (nl0 - 0) if (nl0 - 0) > 1 else 1 | |
Rlon0 = Dlon0 * ((m % n0) + XZ0/(2**17)) | |
if Rlon0 > 180: | |
Rlon0 = Rlon0 - 360 | |
return (Rlat0, Rlon0) | |
def position_old(msg0, msg1, t0, t1): | |
"""Decode position from the combination of even and odd position message | |
131072 is 2^17, since CPR lat and lon are 17 bits each. | |
Args: | |
msg0 (string): even message (28 bytes hexadecimal string) | |
msg1 (string): odd message (28 bytes hexadecimal string) | |
t0 (int): timestamps for the even message | |
t1 (int): timestamps for the odd message | |
Returns: | |
(float, float): (latitude, longitude) of the aircraft | |
""" | |
if typecode(msg0) < 5 or typecode(msg0) > 18: | |
raise RuntimeError("%s: Not a position message" % msg0) | |
if typecode(msg1) < 5 or typecode(msg1) > 18: | |
raise RuntimeError("%s: Not a position message" % msg1) | |
msgbin0 = util.hex2bin(msg0) | |
msgbin1 = util.hex2bin(msg1) | |
cprlat_even = util.bin2int(msgbin0[54:71]) / 131072.0 | |
cprlon_even = util.bin2int(msgbin0[71:88]) / 131072.0 | |
cprlat_odd = util.bin2int(msgbin1[54:71]) / 131072.0 | |
cprlon_odd = util.bin2int(msgbin1[71:88]) / 131072.0 | |
air_d_lat_even = 360.0 / 60 | |
air_d_lat_odd = 360.0 / 59 | |
# compute latitude index 'j' | |
j = int(math.floor(59 * cprlat_even - 60 * cprlat_odd + 0.5)) | |
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even)) | |
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd)) | |
if lat_even >= 270: | |
lat_even = lat_even - 360 | |
if lat_odd >= 270: | |
lat_odd = lat_odd - 360 | |
# check if both are in the same latidude zone, exit if not | |
if _cprNL(lat_even) != _cprNL(lat_odd): | |
return None | |
# compute ni, longitude index m, and longitude | |
if (t0 > t1): | |
ni = _cprN(lat_even, 0) | |
m = math.floor(cprlon_even * (_cprNL(lat_even)-1) - | |
cprlon_odd * _cprNL(lat_even) + 0.5) | |
lon = (360.0 / ni) * (m % ni + cprlon_even) | |
lat = lat_even | |
else: | |
ni = _cprN(lat_odd, 1) | |
m = math.floor(cprlon_even * (_cprNL(lat_odd)-1) - | |
cprlon_odd * _cprNL(lat_odd) + 0.5) | |
lon = (360.0 / ni) * (m % ni + cprlon_odd) | |
lat = lat_odd | |
if lon > 180: | |
lon = lon - 360 | |
return lat, lon | |
def _cprN(lat, is_odd): | |
nl = _cprNL(lat) - is_odd | |
return nl if nl > 1 else 1 | |
def _cprNL(lat): | |
try: | |
nz = 60 | |
a = 1 - math.cos(math.pi * 2 / nz) | |
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2 | |
nl = 2 * math.pi / (math.acos(1 - a/b)) | |
return int(nl) | |
except: | |
# happens when latitude is +/-90 degree | |
return 1 | |
def position_known(msg, latlon): | |
lats, lons = latlon | |
i = oe_flag(msg) | |
YZi = cprlat(msg) | |
XZi = cprlon(msg) | |
Dlati = Dlat(i) | |
# compute latitude index 'j' | |
j = math.floor(lats/Dlati) + math.floor(0.5 + ((lats % Dlati)/Dlati) - (YZi/(2**nb))) | |
Rlati = Dlati * (j + (YZi/(2**nb))) | |
nlri = nl(Rlati) - i | |
if nlri > 0: | |
Dloni = 360 / nlri | |
else: | |
Dloni = 360 | |
m = math.floor(lons/Dloni) + math.floor(0.5 + ((lons % Dloni)/Dloni) - (XZi/2**nb)) | |
Rloni = Dloni * (m + XZi/(2**nb)) | |
return Rlati, Rloni | |
def position_surface_known(msg, latlon): | |
lats, lons = latlon | |
i = oe_flag(msg) | |
YZi = cprlat(msg) | |
XZi = cprlon(msg) | |
Dlati = Dlat(i, True) | |
# compute latitude index 'j' | |
j = math.floor(lats/Dlati) + math.floor(0.5 + ((lats % Dlati)/Dlati) - (YZi/(2**nb))) | |
Rlati = Dlati * (j + (YZi/(2**nb))) | |
nlri = nl(Rlati) - i | |
if nlri > 0: | |
Dloni = 90 / nlri | |
else: | |
Dloni = 90 | |
m = math.floor(lons/Dloni) + math.floor(0.5 + ((lons % Dloni)/Dloni) - (XZi/2**nb)) | |
Rloni = Dloni * (m + XZi/(2**nb)) | |
return Rlati, Rloni | |
def position_surface(msg_even, msg_odd, even_time, odd_time): | |
if typecode(msg_even) < 5 or typecode(msg_even) > 8: | |
raise RuntimeError("%s: Not a surface position message" % msg_even) | |
if typecode(msg_odd) < 5 or typecode(msg_odd) > 8: | |
raise RuntimeError("%s: Not a surface position message" % msg_odd) | |
XZ0 = cprlon(msg_even) | |
YZ0 = cprlat(msg_even) | |
XZ1 = cprlon(msg_odd) | |
YZ1 = cprlat(msg_odd) | |
Dlat0 = Dlat(0, True) | |
Dlat1 = Dlat(1, True) | |
j = math.floor((59 * YZ0 - 60*YZ1) / (2**17) + 0.5) | |
# Work out latitude | |
Rlat0 = Rlat(0, YZ0, j, True) | |
Rlat1 = Rlat(1, YZ1, j, True) | |
# Assume southen lats :( | |
# "To determine the correct latitude of the target, it is necessary to make | |
# use of the location of the receiver." | |
Rlat0 -= 90.0 | |
Rlat1 -= 90.0 | |
if nl(Rlat0) != nl(Rlat1): | |
return None | |
Dlon0 = Dlon(Rlat0, 0, True) | |
Dlon1 = Dlon(Rlat1, 1, True) | |
if odd_time > even_time: | |
# odd is latest i = 1 | |
nl1 = nl(Rlat1) | |
a = XZ0*(nl1-1) - XZ1*nl1 | |
m = math.floor(a/(2**17) + 0.5) | |
n1 = (nl1 - 1) if (nl1 - 1) > 1 else 1 | |
Rlon1 = Dlon1 * ((m % n1) + XZ1/(2**17)) | |
lat = Rlat1 | |
lon = Rlon1 | |
else: | |
# even is latest i = 0 | |
nl0 = nl(Rlat0) | |
a = XZ0*(nl0-1) - XZ1*nl0 | |
m = math.floor(a/(2**17) + 0.5) | |
n0 = (nl0 - 0) if (nl0 - 0) > 1 else 1 | |
Rlon0 = Dlon0 * ((m % n0) + XZ0/(2**17)) | |
lat = Rlat0 | |
lon = Rlon0 | |
# Guess Long | |
closest_lon = 170 | |
distance = 1000 | |
lon_offset = 0 | |
to_guess = [0, 90, -90, -180] | |
for i in to_guess: | |
d = abs((lon+i) - closest_lon) | |
if d < distance: | |
lon_offset = i | |
distance = d | |
return (lat,lon+lon_offset) | |
def altitude(msg): | |
"""Decode aircraft altitude | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
int: altitude in feet | |
""" | |
if typecode(msg) < 9 or typecode(msg) > 18: | |
raise RuntimeError("%s: Not a position message" % msg) | |
msgbin = util.hex2bin(msg) | |
q = msgbin[47] | |
if q: | |
n = util.bin2int(msgbin[40:47]+msgbin[48:52]) | |
alt = n * 25 - 1000 | |
return alt | |
else: | |
return None | |
def nic(msg): | |
"""Calculate NIC, navigation integrity category | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
int: NIC number (from 0 to 11), -1 if not applicable | |
""" | |
if typecode(msg) < 9 or typecode(msg) > 18: | |
raise RuntimeError("%s: Not a airborne position message" % msg) | |
msgbin = util.hex2bin(msg) | |
tc = typecode(msg) | |
nic_sup_b = util.bin2int(msgbin[39]) | |
if tc in [0, 18, 22]: | |
nic = 0 | |
elif tc == 17: | |
nic = 1 | |
elif tc == 16: | |
if nic_sup_b: | |
nic = 3 | |
else: | |
nic = 2 | |
elif tc == 15: | |
nic = 4 | |
elif tc == 14: | |
nic = 5 | |
elif tc == 13: | |
nic = 6 | |
elif tc == 12: | |
nic = 7 | |
elif tc == 11: | |
if nic_sup_b: | |
nic = 9 | |
else: | |
nic = 8 | |
elif tc in [10, 21]: | |
nic = 10 | |
elif tc in [9, 20]: | |
nic = 11 | |
else: | |
nic = -1 | |
return nic | |
def surf_nic(msg): | |
"""Calculate NIC, navigation integrity category for surface | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
int: NIC number (from 0 to 11), -1 if not applicable | |
""" | |
if typecode(msg) < 5 or typecode(msg) > 8: | |
raise RuntimeError("%s: Not a surface position message" % msg) | |
msgbin = util.hex2bin(msg) | |
tc = typecode(msg) | |
nic_sup_a = util.bin2int(msgbin[75]) | |
nic_sup_c = util.bin2int(msgbin[51]) | |
nic = -1 | |
if tc in [0, 8] and nic_sup_a == 0 and nic_sup_c == 0: | |
nic = 0 | |
elif tc == 8 and nic_sup_a != nic_sup_c: | |
nic = 6 | |
elif tc == 8: | |
nic = 7 | |
elif tc == 7 and nic_sup_a == 0: | |
nic = 8 | |
elif tc == 7 and nic_sup_a == 1: | |
nic = 9 | |
elif tc == 6: | |
nic = 10 | |
elif tc == 5: | |
nic = 11 | |
return nic | |
def subtype(msg): | |
if typecode(msg) not in [19, 23, 24, 28, 29, 30, 31]: | |
raise RuntimeError("%s: Message does not contain a typecode field" % msg) | |
msgbin = util.hex2bin(msg) | |
return util.bin2int(msgbin[37:40]) | |
def modea(msg): | |
if typecode(msg) not in [28] and subtype(msg) != 1: | |
raise RuntimeError("%s: Not a Extended Squitter for emergency data" % msg) | |
msgbin = util.hex2bin(msg) | |
modea_bits = util.bin2int(msgbin[43:56]) | |
C1 = (modea_bits >> 12) & 0x1 | |
A1 = (modea_bits >> 11) & 0x1 | |
C2 = (modea_bits >> 10) & 0x1 | |
A2 = (modea_bits >> 9) & 0x1 | |
C4 = (modea_bits >> 8) & 0x1 | |
A4 = (modea_bits >> 7) & 0x1 | |
B1 = (modea_bits >> 5) & 0x1 | |
D1 = (modea_bits >> 4) & 0x1 | |
B2 = (modea_bits >> 3) & 0x1 | |
D2 = (modea_bits >> 2) & 0x1 | |
B4 = (modea_bits >> 1) & 0x1 | |
D4 = (modea_bits >> 0) & 0x1 | |
ssr = (A4 << 11) | (A2 << 10) | (A1 << 9) | \ | |
(B4 << 8) | (B2 << 7) | (B1 << 6) | \ | |
(C4 << 5) | (C2 << 4) | (C1 << 3) | \ | |
(D4 << 2) | (D2 << 1) | (D1 << 0) | |
return ssr | |
# --------------------------------------------- | |
# Velocity | |
# --------------------------------------------- | |
def velocity(msg): | |
"""Calculate the speed, heading, and vertical rate | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
(int, float, int, string): speed (kt), heading (degree), | |
rate of climb/descend (ft/min), and speed type | |
('GS' for ground speed, 'AS' for airspeed) | |
""" | |
if typecode(msg) != 19: | |
raise RuntimeError("%s: Not a airborne velocity message" % msg) | |
msgbin = util.hex2bin(msg) | |
st = subtype(msg) | |
if st in (1, 2): | |
v_ew_sign = util.bin2int(msgbin[45]) | |
v_ew = util.bin2int(msgbin[46:56]) - 1 # east-west velocity | |
v_ns_sign = util.bin2int(msgbin[56]) | |
v_ns = util.bin2int(msgbin[57:67]) - 1 # north-south velocity | |
v_we = -1*v_ew if v_ew_sign else v_ew | |
v_sn = -1*v_ns if v_ns_sign else v_ns | |
spd = math.sqrt(v_sn*v_sn + v_we*v_we)-1 # unit in kts | |
hdg = math.atan2(v_we, v_sn) | |
hdg = math.degrees(hdg) # convert to degrees | |
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val | |
tag = 'GS' | |
else: | |
hdg = util.bin2int(msgbin[46:56]) / 1024.0 * 360.0 | |
spd = util.bin2int(msgbin[57:67])-1 | |
tag = 'AS' | |
if st == 4: | |
print ("super sonic!") | |
vr_sign = util.bin2int(msgbin[68]) | |
vr = util.bin2int(msgbin[68:77]) # vertical rate | |
rocd = -1*vr if vr_sign else vr # rate of climb/descend | |
return int(spd), hdg, int(rocd), tag | |
def surface_heading(msg): | |
if typecode(msg) < 5 or typecode(msg) > 8: | |
raise RuntimeError("%s: Not a surface position message" % msg) | |
msgbin = util.hex2bin(msg) | |
heading_valid = util.bin2int(msgbin[44]) | |
if heading_valid: | |
return util.bin2int(msgbin[45:52]) * (360.0/128.0) | |
return None | |
def speed_heading(msg): | |
"""Get speed and heading only from the velocity message | |
Args: | |
msg (string): 28 bytes hexadecimal message string | |
Returns: | |
(int, float): speed (kt), heading (degree) | |
""" | |
spd, hdg, rocd, tag = velocity(msg) | |
return spd, hdg |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment