Skip to content

Instantly share code, notes, and snippets.

@kylebarron
Last active September 4, 2022 10:52
Show Gist options
  • Save kylebarron/8c8af6876cbb0efbe59476d89e756fab to your computer and use it in GitHub Desktop.
Save kylebarron/8c8af6876cbb0efbe59476d89e756fab to your computer and use it in GitHub Desktop.
Preprocessing for example of ADSB data in deck.gl
import json
from datetime import datetime
from typing import Sequence
import click
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from simplification.cutil import simplify_coords_vw_idx
def json_data_to_table(
data: list, timestamp_offset: int, simplification_eps: float
) -> pa.Table:
# Remove items without a trace array
data = [item for item in data if item.get("trace") and item.get("timestamp")]
len_coords = 0
# Simplified coord indexes
simplified_indexes = []
for item in data:
try:
# We add geometry simplification here
trace = item["trace"]
item_coords = [[coord[2], coord[1]] for coord in trace]
simplified = simplify_coords_vw_idx(item_coords, simplification_eps)
simplified_indexes.append(simplified)
len_coords += len(simplified)
except TypeError as e:
raise TypeError(f"data: {item}")
# Should be the same number of timestamps as coordinates
timestamp_arr = np.zeros(len_coords, dtype=np.float32)
coord_arr = np.zeros(len_coords * 2, dtype=np.float32)
# We can use the same offsets for both coords and timestamps
coord_offsets = np.zeros(len(data) + 1, dtype=np.uint32)
item_index = 0
coord_index = 0
for item, simplified_index in zip(data, simplified_indexes):
coord_offsets[item_index] = coord_index
for simplified_coord_index in simplified_index:
trace_point = item["trace"][simplified_coord_index]
# trace uses lat-lon ordering
# We want lon-lat ordering
coord_arr[coord_index * 2] = trace_point[2]
coord_arr[coord_index * 2 + 1] = trace_point[1]
timestamp_arr[coord_index] = (
item["timestamp"] + trace_point[0] - timestamp_offset
)
coord_index += 1
item_index += 1
coord_offsets[item_index] = coord_index
pyarrow_coord_arr = pa.FixedSizeListArray.from_arrays(coord_arr, 2)
pyarrow_geom_arr = pa.ListArray.from_arrays(
pa.array(coord_offsets), pyarrow_coord_arr
)
pyarrow_timestamp_arr = pa.ListArray.from_arrays(
pa.array(coord_offsets), pa.array(timestamp_arr)
)
pyarrow_registration = pa.array([item.get("r") for item in data])
return pa.table(
{
"geometry": pyarrow_geom_arr,
"timestamp": pyarrow_timestamp_arr,
"registration": pyarrow_registration,
}
)
def should_keep(datum, registration_prefixes: Sequence[str]) -> bool:
keep = True
if registration_prefixes:
if not any(
datum.get("r", "").startswith(prefix) for prefix in registration_prefixes
):
keep = False
return keep
@click.command()
@click.option("--chunksize", type=int, default=5000, help="parquet chunk size")
@click.option("-i", "--input", type=str, default="data.jsonl")
@click.option("-o", "--output", type=str, default="data.parquet")
@click.option("--registration-prefixes", type=str, required=False)
@click.option(
"--timestamp-offset",
type=int,
default=datetime(year=2022, month=5, day=1).timestamp(),
)
@click.option("--simplify-tolerance", type=float, default=0.00005)
def main(
chunksize: int,
input,
output,
registration_prefixes: str,
timestamp_offset: int,
simplify_tolerance: float,
):
if registration_prefixes:
registration_prefixes = registration_prefixes.split(",")
writer = None
data = []
geom_index = 0
with open(input) as f:
for line in f:
if geom_index >= chunksize:
table = json_data_to_table(data, timestamp_offset, simplify_tolerance)
if not writer:
writer = pq.ParquetWriter(
output, table.schema, compression="brotli"
)
writer.write_table(table)
data = []
geom_index = 0
datum = json.loads(line)
if should_keep(datum, registration_prefixes):
data.append(datum)
geom_index += 1
table = json_data_to_table(data, timestamp_offset, simplify_tolerance)
if not writer:
writer = pq.ParquetWriter(output, table.schema, compression="brotli")
writer.write_table(table)
writer.close()
if __name__ == "__main__":
main()
import json
from pathlib import Path
import click
import requests
index_url = "https://samples.adsbexchange.com/traces/2022/05/01/index.json"
output_path = "data.jsonl"
output_index_path = "indexes.txt"
baseurl = "https://samples.adsbexchange.com/traces/2022/05/01"
existing_indexes = []
if Path(output_path).exists() and Path(output_index_path).exists():
with open(output_index_path) as f:
for line in f:
existing_indexes.append(line.strip())
with requests.Session() as session, open(output_path, "a") as output_file, open(
output_index_path, "a"
) as output_index_file:
index_resp = session.get(f"{baseurl}/index.json")
indexes = index_resp.json()["traces"]
indexes = [x for x in indexes if x not in existing_indexes]
with click.progressbar(indexes) as bar:
for index in bar:
url = f"{baseurl}/{index[-2:]}/trace_full_{index}.json"
r = session.get(url)
r.raise_for_status()
output_file.write(json.dumps(r.json(), separators=(",", ":")))
output_file.write("\n")
output_index_file.write(index)
output_index_file.write("\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment