Instantly share code, notes, and snippets.

# k-nut/criticaltracks-cleanup.py

Last active June 27, 2022 16:58
Show Gist options
• Save k-nut/58692cdd339766806c22469922b36c5f to your computer and use it in GitHub Desktop.
CriticalTracks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
 import sqlite3 import json import sys FILENAME = "./criticaltracks.sqlite" def get_distane(point, other_point): from math import sin, cos, sqrt, atan2, radians # approximate radius of earth in m R = 6373.0 * 1_000 lon1, lat1 = [radians(x) for x in point["geometry"]["coordinates"]] lon2, lat2 = [radians(x) for x in other_point["geometry"]["coordinates"]] dlon = lon2 - lon1 dlat = lat2 - lat1 a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 c = 2 * atan2(sqrt(a), sqrt(1 - a)) return R * c def may_show(point, points): NEIGHBORS = 3 DISTANCE = 100 found = 0 for candidate in points: if get_distane(candidate, point) < DISTANCE: found += 1 if found == NEIGHBORS: return True return False def create_point(key, values): lat = values["latitude"] / 1_000_000 lon = values["longitude"] / 1_000_000 return { "type": "Feature", "geometry": {"type": "Point", "coordinates": [lon, lat]}, } def main(): con = sqlite3.connect(FILENAME) cur = con.cursor() rows = [] for row in cur.execute('select * from tracks'): timestamp, data = row print(timestamp, file=sys.stderr) points = [create_point(key, value) for key, value in json.loads(data)['locations'].items()] filtered_points = [point for point in points if may_show(point, points)] if len(filtered_points): rows.append({"timestamp": timestamp, "data": filtered_points}) print(json.dumps(rows)) if __name__ == "__main__": main()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters

17:55
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
 import urllib.request import sqlite3 from time import sleep URL = 'https://api.criticalmaps.net/postv2' FILENAME = "criticaltracks.sqlite" def get_data(): with urllib.request.urlopen(URL) as url: return url.read().decode() def add_row(cur, con, data: str): cur.execute('insert into tracks (data) values(?);', (data,)) con.commit() def main(): con = sqlite3.connect(FILENAME) cur = con.cursor() cur.execute('CREATE TABLE IF NOT EXISTS tracks (timestamp datetime default current_timestamp, data text);') con.commit() while True: data = get_data() add_row(cur, con, data) sleep(5) print(".", end="") if __name__ == "__main__": main()
to join this conversation on GitHub. Already have an account? Sign in to comment