Skip to content

Instantly share code, notes, and snippets.

@obviyus
Last active March 15, 2025 16:39
Show Gist options
  • Save obviyus/86c693683ffee95c06d3c8d1177d394f to your computer and use it in GitHub Desktop.
Save obviyus/86c693683ffee95c06d3c8d1177d394f to your computer and use it in GitHub Desktop.
A script that sends an Android device's back camera video feed into a v4l2 sink and processes the image using YOLOv11 to check for airplanes.
# /// script
# requires-python = "==3.12.8"
# dependencies = [
# "opencv-python",
# "numpy",
# "ultralytics",
# "aiohttp",
# "aiofiles",
# "python-dotenv"
# ]
# ///
import os
import cv2
import numpy as np
import subprocess
import sqlite3
from ultralytics import YOLO
import asyncio
import aiohttp
import aiofiles
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
telegram_chat_id = os.getenv("TELEGRAM_CHAT_ID")
class PlaneDetector:
def __init__(self, model_path="yolo11n.pt", cache_timeout=1):
self.model = self._setup_model(model_path)
self.last_detection_time = datetime.now()
self.cache_timeout = timedelta(seconds=cache_timeout)
self.last_detection_result = None
self.session = None
def _setup_model(self, model_path):
abs_model_path = os.path.abspath(model_path)
model_dir = os.path.dirname(abs_model_path)
model_name = os.path.splitext(os.path.basename(model_path))[0]
ncnn_path = os.path.join(model_dir, f"{model_name}_ncnn_model")
if not os.path.exists(ncnn_path):
print(f"Exporting NCNN model to {ncnn_path}...")
model = YOLO(abs_model_path)
model.export(format="ncnn")
print("Export complete.")
else:
print(f"Using existing NCNN model: {ncnn_path}")
return YOLO(ncnn_path)
async def init_session(self):
self.session = aiohttp.ClientSession()
async def close_session(self):
if self.session:
await self.session.close()
async def send_to_telegram(self, image_path, message=None):
if not self.session:
await self.init_session()
url = f"https://api.telegram.org/bot{telegram_bot_token}/sendPhoto"
try:
async with aiofiles.open(image_path, "rb") as image_file:
file_data = await image_file.read()
form = aiohttp.FormData()
form.add_field("chat_id", telegram_chat_id)
form.add_field("photo", file_data, filename="image.jpg")
if message:
form.add_field("caption", message)
async with self.session.post(url, data=form) as response:
return response.status == 200
except Exception as e:
print(f"Telegram sending error: {e}")
return False
def process_frame(self, frame):
now = datetime.now()
if (
self.last_detection_result
and now - self.last_detection_time < self.cache_timeout
):
return self.last_detection_result
# Optimize frame before inference
if frame.shape[0] > 1080:
scale = 1080 / frame.shape[0]
frame = cv2.resize(
frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA
)
results = self.model(frame, verbose=False)
for result in results:
boxes = result.boxes
if len(boxes) > 0:
for box in boxes:
cls = int(box.cls[0])
label = result.names[cls]
if label == "airplane":
print("Airplane detected!")
box_coords = box.xyxy[0].cpu().numpy()
self.last_detection_result = (True, box_coords)
self.last_detection_time = now
return self.last_detection_result
print("No airplane detected.")
self.last_detection_result = (False, None)
self.last_detection_time = now
return self.last_detection_result
async def fetch_flight_data(session, timestamp):
url = "https://fr24api.flightradar24.com/api/historic/flight-positions/full"
bounds = "28.57488,28.551926,77.132233,77.200499"
headers = {
"accept": "application/json",
"accept-version": "v1",
"authorization": f"Bearer {os.getenv('FR24_AUTH_TOKEN')}",
}
params = {"timestamp": int(timestamp.timestamp()), "bounds": bounds}
try:
async with session.get(url, headers=headers, params=params) as response:
if response.status == 200:
data = await response.json()
if data.get("data") and len(data["data"]) > 0:
return data["data"][0]
except Exception as e:
print(f"Error fetching flight data: {e}")
return None
def format_flight_message(flight_data):
if not flight_data:
return "✈️ Aircraft detected! (No flight information available)"
message = "✈️ Aircraft Spotted! ✈️\n\n"
if flight_data.get("flight"):
message += (
f"🛫 Flight: {flight_data['flight']} ({flight_data.get('callsign', '')})\n"
)
if flight_data.get("type"):
message += f"🛩️ Aircraft: {flight_data['type']} ({flight_data.get('reg', '')})\n"
if flight_data.get("orig_iata") and flight_data.get("dest_iata"):
message += f"🛣️ Route: {flight_data['orig_iata']} → {flight_data['dest_iata']}\n"
if flight_data.get("alt"):
message += f"📏 Altitude: {flight_data['alt']} ft\n"
if flight_data.get("gspeed"):
message += f"💨 Speed: {flight_data['gspeed']} knots\n"
if flight_data.get("vspeed"):
v_dir = "⬆️" if flight_data["vspeed"] > 0 else "⬇️"
message += f"{v_dir} Vertical: {abs(flight_data['vspeed'])} ft/min\n"
return message
def init_database():
data_dir = os.path.join(os.path.dirname(__file__), "data")
os.makedirs(data_dir, exist_ok=True)
db_path = os.path.join(data_dir, "flights.db")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS flights (
fr24_id TEXT PRIMARY KEY,
flight TEXT,
callsign TEXT,
type TEXT,
registration TEXT,
operator TEXT,
orig_iata TEXT,
orig_icao TEXT,
dest_iata TEXT,
dest_icao TEXT,
altitude INTEGER,
ground_speed INTEGER,
vertical_speed INTEGER,
track INTEGER,
squawk TEXT,
latitude REAL,
longitude REAL,
timestamp DATETIME,
eta DATETIME
)
"""
)
conn.commit()
conn.close()
async def save_flight_data(flight_data):
if not flight_data:
return None
flight_id = flight_data.get("fr24_id", datetime.now().strftime("%Y%m%d_%H%M%S"))
conn = sqlite3.connect("./data/flights.db")
cursor = conn.cursor()
cursor.execute(
"""
INSERT OR REPLACE INTO flights
(fr24_id, flight, callsign, type, registration, operator,
orig_iata, orig_icao, dest_iata, dest_icao, altitude,
ground_speed, vertical_speed, track, squawk, latitude,
longitude, timestamp, eta)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
flight_id,
flight_data.get("flight"),
flight_data.get("callsign"),
flight_data.get("type"),
flight_data.get("reg"),
flight_data.get("operating_as"),
flight_data.get("orig_iata"),
flight_data.get("orig_icao"),
flight_data.get("dest_iata"),
flight_data.get("dest_icao"),
flight_data.get("alt"),
flight_data.get("gspeed"),
flight_data.get("vspeed"),
flight_data.get("track"),
flight_data.get("squawk"),
flight_data.get("lat"),
flight_data.get("lon"),
flight_data.get("timestamp"),
flight_data.get("eta"),
),
)
conn.commit()
conn.close()
return flight_id
async def sync_and_cleanup():
try:
remote_server = os.getenv("REMOTE_SERVER")
remote_path = os.getenv("REMOTE_PATH")
# Rsync data directory to remote server
rsync_command = [
"rsync",
"-avz",
"--progress",
"./data/",
f"{remote_server}:{remote_path}/data",
]
process = await asyncio.create_subprocess_exec(
*rsync_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
print("Successfully synced data to remote server")
# Delete local images but keep the database
for file in os.listdir("./data"):
if file.endswith((".jpg", ".jpeg", ".png")) and os.path.isfile(
os.path.join("./data", file)
):
os.remove(os.path.join("./data", file))
print("Cleaned up local images")
else:
print(f"Rsync error: {stderr.decode()}")
except Exception as e:
print(f"Error during sync and cleanup: {e}")
async def capture_video_stream(detector):
video_device = "/dev/video0"
error_threshold = 5
error_count = 0
last_processed_time = datetime.now()
is_processing = False
while True:
try:
cap = cv2.VideoCapture(video_device)
if not cap.isOpened():
raise RuntimeError(f"Failed to open {video_device}")
print("Starting video capture, processing frame every 3 seconds...")
while True:
current_time = datetime.now()
if (
not is_processing
and (current_time - last_processed_time).seconds < 3
):
await asyncio.sleep(0.1)
continue
ret, frame = cap.read()
if not ret or frame is None:
error_count += 1
print(f"\nCamera capture error {error_count}/{error_threshold}")
if error_count >= error_threshold:
print("Too many errors, resetting camera...")
break
continue
error_count = 0
# Only process if enough time has passed and we're not already processing
if (
not is_processing
and (current_time - last_processed_time).seconds >= 3
):
is_processing = True
process_start = datetime.now()
print(
f"\nProcessing frame at {process_start.strftime('%H:%M:%S')}..."
)
detected, box = detector.process_frame(frame)
if detected:
print("Plane detected, processing image...")
cropped = await process_detection(frame, box)
flight_data = await fetch_flight_data(
detector.session,
datetime.now(),
)
flight_id = await save_flight_data(flight_data)
image_filename = f"{flight_id or 'unknown'}.jpg"
image_path = os.path.join("data", image_filename)
cv2.imwrite(image_path, cropped)
if flight_id:
await save_flight_data(flight_data)
message = format_flight_message(flight_data)
print("Sending to Telegram...")
await detector.send_to_telegram(image_path, message)
print("Syncing data to remote server...")
await sync_and_cleanup()
process_end = datetime.now()
process_time = (process_end - process_start).total_seconds()
print(f"Processing took {process_time:.2f} seconds")
last_processed_time = current_time
is_processing = False
except Exception as e:
print(f"\nError in video capture: {e}")
finally:
cap.release()
await asyncio.sleep(1)
async def process_detection(frame, box):
x1, y1, x2, y2 = box.astype(int)
center_x, center_y = (x1 + x2) // 2, (y1 + y2) // 2
side_length = int(max(x2 - x1, y2 - y1) * 2)
half_side = side_length // 2
crop_coords = np.array(
[
center_x - half_side,
center_y - half_side,
center_x + half_side,
center_y + half_side,
]
)
crop_coords = np.clip(
crop_coords,
[0, 0, 0, 0],
[frame.shape[1], frame.shape[0], frame.shape[1], frame.shape[0]],
)
return frame[crop_coords[1] : crop_coords[3], crop_coords[0] : crop_coords[2]]
async def main():
init_database()
detector = PlaneDetector()
await detector.init_session()
try:
subprocess.Popen(
"scrcpy --v4l2-sink=/dev/video0 --no-playback --no-audio --video-source=camera "
"--camera-id=0 --camera-size=1920x1080 --max-fps=1",
shell=True,
)
await capture_video_stream(detector)
finally:
await detector.close_session()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment