Skip to content

Instantly share code, notes, and snippets.

@kylemcdonald
Last active January 21, 2024 08:05
Show Gist options
  • Save kylemcdonald/6e860400656433f8a274d915626837d0 to your computer and use it in GitHub Desktop.
Save kylemcdonald/6e860400656433f8a274d915626837d0 to your computer and use it in GitHub Desktop.
Parse the ADSBX heatmap files.
import numpy as np
import datetime
import re
from dataclasses import dataclass
def point_to_str(point):
hex = f"{point & 0xFFFFFF:06x}"
hex = ("~" + hex) if (point & 0x1000000) else hex
return hex
@dataclass
class Slice:
timestamp: datetime.datetime
callsigns: list
telemetry: list
@dataclass
class Callsign:
hex: str
flight: str
squawk: str
@dataclass
class Telemetry:
hex: str
type: str
lat: float
lon: float
alt: float
gs: float
# bbox format: [min lat, min lon, max lat, max lon]
def parse_heatmap(fn, bbox=None, return_callsigns=True):
"""
This parses the ADSBX heatmap format.
The beginning of the file has around 2KB of unknown data.
Then the format is a sequence of slices.
Each slice has a header of a 16-byte chunk:
- Bytes 1-4 (4 bytes) are the magic number 0x0E7F7C9D
- Bytes 5-12 (8 bytes) are a timestamp
- Bytes 13-16 (4 bytes) are an update interval.
Then a sequence of 16-byte chunks. This can either be a callsign, or telemetry.
A callsign is has a hex code, call sign, and squawk code.
Telemetry has a hex code, lat/lon, altitude, and ground speed.
- Bytes 0-4 (4 bytes) are the hex code and type.
- Bytes 5-8 (4 bytes) are the lat.
- Bytes 9-12 (4 bytes) are the lon.
- Bytes 13-16 (4 bytes) are the altitude and ground speed.
Callsign and telemetry are randomly interleaved within the slice.
"""
with open(fn, "rb") as f:
raw = f.read()
points_u8 = np.frombuffer(raw, dtype=np.uint8)
points_u = points_u8.view(np.uint32)
points = points_u8.view(np.int32)
if bbox is not None:
bbox = [bbox[0] * 1e6, bbox[1] * 1e6, bbox[2] * 1e6, bbox[3] * 1e6]
slices = []
slice_begin_marker = 0x0E7F7C9D
type_list = [
"adsb_icao",
"adsb_icao_nt",
"adsr_icao",
"tisb_icao",
"adsc",
"mlat",
"other",
"mode_s",
"adsb_other",
"adsr_other",
"tisb_trackfile",
"tisb_other",
"mode_ac",
]
data = []
i = 0
for i in range(len(points)):
if points[i] == slice_begin_marker:
break
while i < len(points):
callsigns = []
telemetry = []
now = points_u[i + 2] / 1000 + points_u[i + 1] * 4294967.296 # timestamp
ival = (points_u[i + 3] & 65535) / 1000 # update interval
timestamp = datetime.datetime.fromtimestamp(now)
i += 4
while i < len(points) and points[i] != slice_begin_marker:
p0 = points[i]
p1 = points[i + 1]
p2 = points[i + 2]
# callsign data
if p1 > 1073741824:
if not return_callsigns:
i += 4
continue
hex = point_to_str(p0)
flight = None
if points_u8[4 * (i + 2)] != 0:
flight = "".join(
chr(points_u8[4 * (i + 2) + j]) for j in range(8)
).strip()
squawk = str(p1 & 0xFFFF).zfill(4)
callsigns.append(Callsign(hex=hex, flight=flight, squawk=squawk))
i += 4
continue
lat = p1
lon = p2
if bbox is not None:
if lat < bbox[0] or lat > bbox[2] or lon < bbox[1] or lon > bbox[3]:
i += 4
continue
# lat/lon in degrees
lat /= 1e6
lon /= 1e6
# hex code
hex = point_to_str(p0)
# hex code type: first 5 bits of 1st 32-bit int
type = p0 >> 27 & 0x1F
type = type_list[type] if type < len(type_list) else "unknown"
# final chunk of data
p3 = points[i + 3]
# barometric altitude in feet: first two bytes of 4th 32-bit int
alt = p3 & 65535
if alt & 32768:
alt |= -65536
if alt == -123:
alt = "ground"
else:
alt *= 25
# ground speed in knots: second two bytes of 4th 32-bit int
gs = p3 >> 16
if gs == -1:
gs = None
else:
gs /= 10
telemetry.append(Telemetry(hex=hex, type=type, lat=lat, lon=lon, alt=alt, gs=gs))
i += 4
data.append(Slice(timestamp=timestamp, callsigns=callsigns, telemetry=telemetry))
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment