Skip to content

Instantly share code, notes, and snippets.

@k-zehnder
Last active February 7, 2022 03:49
Show Gist options
  • Save k-zehnder/e06c636da103d8f7d1dbc4aaa4e3edad to your computer and use it in GitHub Desktop.
Save k-zehnder/e06c636da103d8f7d1dbc4aaa4e3edad to your computer and use it in GitHub Desktop.
practice class for getting FlightRadar24 data within boundary
# for more see repo below:
# https://github.com/k-zehnder/fastapi-docker-async-postgres-flight-data
import os
import json
from operator import ge
import httpx
import asyncio
import datetime
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, validator
from app.dataclass.flightradar.api import API
from app.models import *
from sqlmodel import create_engine, SQLModel, Session, Field
HEADERS = {'Connection': 'keep-alive',
'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; '
'x64) AppleWebKit/537.36 (KHTML, '
'like Gecko) Chrome')}
API_STRING = ("https://data-live.flightradar24.com/clickhandler/?flight={flight_id}")
class Data:
"""
Class for getting FlightRadar24 API data.
"""
def __init__(self) -> None:
self.API_STRING = API_STRING
self.api = API()
self.detailed = []
def get_data(self):
# area = Area(Point(37.8, 37.6), Point(-121.90, -121.78))
area = Area(Point(59.06, 55.00), Point(30.97, 36.46))
try:
return self.parse_data(json.loads(self.api.get_area(area)))
except Exception as e:
print("*"*50)
print("NO FLIGHTS OVERHEAD!")
def parse_data(self, data):
briefs = [BriefFlightCreate(**data[item]) for item in data]
return [flight.id for flight in briefs]
def _debug(self, r):
data = r.json()["identification"]
print()
return(json.dumps(data, indent=4, sort_keys=False))
async def make_request_async(self, flight_id, client):
r = await client.get(self.API_STRING.format(flight_id=flight_id))
print(self._debug(r))
data = r.json()
detailed = DetailedFlightCreate(
identification=data["identification"]["id"],
callsign=data["identification"]["callsign"]
)
self.detailed.append(detailed)
async def async_main(self):
overhead_ids = self.get_data()
async with httpx.AsyncClient() as client:
await asyncio.gather(
*[self.make_request_async(flight_id, client) for flight_id in overhead_ids]
)
return {
"time" : datetime.datetime.utcnow(),
"detailed" : self.detailed
}
def run(self):
return asyncio.run(self.async_main())
def __str__(self):
return f"{len(self.detailed)}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment