todo
Last active
September 4, 2022 10:52
-
-
Save kylebarron/8c8af6876cbb0efbe59476d89e756fab to your computer and use it in GitHub Desktop.
Preprocessing for example of ADSB data in deck.gl
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from datetime import datetime | |
from typing import Sequence | |
import click | |
import numpy as np | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
from simplification.cutil import simplify_coords_vw_idx | |
def json_data_to_table( | |
data: list, timestamp_offset: int, simplification_eps: float | |
) -> pa.Table: | |
# Remove items without a trace array | |
data = [item for item in data if item.get("trace") and item.get("timestamp")] | |
len_coords = 0 | |
# Simplified coord indexes | |
simplified_indexes = [] | |
for item in data: | |
try: | |
# We add geometry simplification here | |
trace = item["trace"] | |
item_coords = [[coord[2], coord[1]] for coord in trace] | |
simplified = simplify_coords_vw_idx(item_coords, simplification_eps) | |
simplified_indexes.append(simplified) | |
len_coords += len(simplified) | |
except TypeError as e: | |
raise TypeError(f"data: {item}") | |
# Should be the same number of timestamps as coordinates | |
timestamp_arr = np.zeros(len_coords, dtype=np.float32) | |
coord_arr = np.zeros(len_coords * 2, dtype=np.float32) | |
# We can use the same offsets for both coords and timestamps | |
coord_offsets = np.zeros(len(data) + 1, dtype=np.uint32) | |
item_index = 0 | |
coord_index = 0 | |
for item, simplified_index in zip(data, simplified_indexes): | |
coord_offsets[item_index] = coord_index | |
for simplified_coord_index in simplified_index: | |
trace_point = item["trace"][simplified_coord_index] | |
# trace uses lat-lon ordering | |
# We want lon-lat ordering | |
coord_arr[coord_index * 2] = trace_point[2] | |
coord_arr[coord_index * 2 + 1] = trace_point[1] | |
timestamp_arr[coord_index] = ( | |
item["timestamp"] + trace_point[0] - timestamp_offset | |
) | |
coord_index += 1 | |
item_index += 1 | |
coord_offsets[item_index] = coord_index | |
pyarrow_coord_arr = pa.FixedSizeListArray.from_arrays(coord_arr, 2) | |
pyarrow_geom_arr = pa.ListArray.from_arrays( | |
pa.array(coord_offsets), pyarrow_coord_arr | |
) | |
pyarrow_timestamp_arr = pa.ListArray.from_arrays( | |
pa.array(coord_offsets), pa.array(timestamp_arr) | |
) | |
pyarrow_registration = pa.array([item.get("r") for item in data]) | |
return pa.table( | |
{ | |
"geometry": pyarrow_geom_arr, | |
"timestamp": pyarrow_timestamp_arr, | |
"registration": pyarrow_registration, | |
} | |
) | |
def should_keep(datum, registration_prefixes: Sequence[str]) -> bool: | |
keep = True | |
if registration_prefixes: | |
if not any( | |
datum.get("r", "").startswith(prefix) for prefix in registration_prefixes | |
): | |
keep = False | |
return keep | |
@click.command() | |
@click.option("--chunksize", type=int, default=5000, help="parquet chunk size") | |
@click.option("-i", "--input", type=str, default="data.jsonl") | |
@click.option("-o", "--output", type=str, default="data.parquet") | |
@click.option("--registration-prefixes", type=str, required=False) | |
@click.option( | |
"--timestamp-offset", | |
type=int, | |
default=datetime(year=2022, month=5, day=1).timestamp(), | |
) | |
@click.option("--simplify-tolerance", type=float, default=0.00005) | |
def main( | |
chunksize: int, | |
input, | |
output, | |
registration_prefixes: str, | |
timestamp_offset: int, | |
simplify_tolerance: float, | |
): | |
if registration_prefixes: | |
registration_prefixes = registration_prefixes.split(",") | |
writer = None | |
data = [] | |
geom_index = 0 | |
with open(input) as f: | |
for line in f: | |
if geom_index >= chunksize: | |
table = json_data_to_table(data, timestamp_offset, simplify_tolerance) | |
if not writer: | |
writer = pq.ParquetWriter( | |
output, table.schema, compression="brotli" | |
) | |
writer.write_table(table) | |
data = [] | |
geom_index = 0 | |
datum = json.loads(line) | |
if should_keep(datum, registration_prefixes): | |
data.append(datum) | |
geom_index += 1 | |
table = json_data_to_table(data, timestamp_offset, simplify_tolerance) | |
if not writer: | |
writer = pq.ParquetWriter(output, table.schema, compression="brotli") | |
writer.write_table(table) | |
writer.close() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from pathlib import Path | |
import click | |
import requests | |
index_url = "https://samples.adsbexchange.com/traces/2022/05/01/index.json" | |
output_path = "data.jsonl" | |
output_index_path = "indexes.txt" | |
baseurl = "https://samples.adsbexchange.com/traces/2022/05/01" | |
existing_indexes = [] | |
if Path(output_path).exists() and Path(output_index_path).exists(): | |
with open(output_index_path) as f: | |
for line in f: | |
existing_indexes.append(line.strip()) | |
with requests.Session() as session, open(output_path, "a") as output_file, open( | |
output_index_path, "a" | |
) as output_index_file: | |
index_resp = session.get(f"{baseurl}/index.json") | |
indexes = index_resp.json()["traces"] | |
indexes = [x for x in indexes if x not in existing_indexes] | |
with click.progressbar(indexes) as bar: | |
for index in bar: | |
url = f"{baseurl}/{index[-2:]}/trace_full_{index}.json" | |
r = session.get(url) | |
r.raise_for_status() | |
output_file.write(json.dumps(r.json(), separators=(",", ":"))) | |
output_file.write("\n") | |
output_index_file.write(index) | |
output_index_file.write("\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment