Skip to content

Instantly share code, notes, and snippets.

@GuyAglionby
Created May 29, 2023 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GuyAglionby/7964dc952be1a4bc9bd58ad4a888c08e to your computer and use it in GitHub Desktop.
Save GuyAglionby/7964dc952be1a4bc9bd58ad4a888c08e to your computer and use it in GitHub Desktop.
Pull data out from the ADS-B Exchange feed client
import json
import sqlite3
from time import sleep
SCHEMA = """CREATE TABLE IF NOT EXISTS planes (
hex TEXT NOT NULL,
unix_time INT NOT NULL,
flight TEXT,
category TEXT);
CREATE TABLE IF NOT EXISTS messages (
unix_time INT NOT NULL,
message_count INT NOT NULL);
"""
def query_db(db, query, args=(), one=False):
cur = db.execute(query, args)
rv = cur.fetchall()
cur.close()
return (rv[0] if rv else None) if one else rv
def insert_db(db, query, args=()):
db.execute(query, args)
db.commit()
def init_db(db, schema):
db.cursor().executescript(schema)
db.commit()
def get_db(db_name, schema):
db_path = f"/root/{db_name}.db"
db = sqlite3.connect(db_path)
db.row_factory = sqlite3.Row
init_db(db, schema)
return db
def get_last_seen(db):
rows = query_db(db, "SELECT * FROM planes")
if rows is None:
return {}
last_seen = {}
for row in rows:
last_seen[row['hex']] = max(row['unix_time'], last_seen.get(row['hex'], -1))
return last_seen
def update_last_seen(db, hex, flight, category):
rows = query_db(db, "SELECT * FROM planes WHERE hex = ? ORDER BY unix_time DESC LIMIT 1", (hex,))
if len(rows) != 1:
print(f"Error: more than one row for hex, time ({len(rows)} for {hex})")
return
row = rows[0]
time = row['unix_time']
if row['flight'] is None and flight is not None:
insert_db(db, "UPDATE planes SET flight = ? WHERE hex = ? AND unix_time = ?", (flight, hex, time))
if row['category'] is None and category is not None:
insert_db(db, "UPDATE planes SET category = ? WHERE hex = ? AND unix_time = ?", (category, hex, time))
def main():
db = get_db("planes", SCHEMA)
last_seen = get_last_seen(db)
json_path = "/run/adsbexchange-feed/aircraft.json"
required_gap_seconds = 30 * 60
previous_messages_time = -1
while True:
with open(json_path) as f:
data = json.load(f)
messages = data['messages']
time = int(data['now'])
if time - previous_messages_time > required_gap_seconds:
previous_messages_time = time
insert_db(db, "INSERT INTO messages VALUES (?, ?)", (time, messages))
for aircraft in data['aircraft']:
hex = aircraft['hex']
flight = aircraft.get('flight')
if flight is not None:
flight = flight.strip()
category = aircraft.get('category')
if hex in last_seen and time - last_seen[hex] < required_gap_seconds:
update_last_seen(db, hex, flight, category)
last_seen[hex] = time
continue
last_seen[hex] = time
insert_db(db, "INSERT INTO planes VALUES (?, ?, ?, ?)", (hex, time, flight, category))
sleep(5)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment