Created
November 27, 2019 05:56
-
-
Save selenologist/0f17ec1d9447820966d5da7ab0143443 to your computer and use it in GitHub Desktop.
Pulls data from the rfs.nsw.gov.au majorIncidents feed every 10 minutes, printing a summary of nearby incidents.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math, json, time, re | |
import urllib.request | |
MI_URL = "https://www.rfs.nsw.gov.au/feeds/majorIncidents.json" | |
def get_features_from_net(): | |
start = time.time() | |
data = urllib.request.urlopen(MI_URL).read() | |
loadtime = time.time() | |
result = json.loads(data)["features"] | |
jsontime = time.time() | |
print("Downloaded new data at {}\nLoading took {:.5f}s\nDecoding took {:.5f}s\nTotal {:.6f}s\n".format( | |
time.ctime(loadtime), loadtime - start, jsontime - loadtime, jsontime - start)) | |
return result | |
RADIUS_LIMIT = 100 # view things within 100km | |
UPDATE_INTERVAL = 60 * 10 # 10 minutes | |
# for some reason RFS gives coordinates in [lon, lat] format, so we do this too | |
MY_COORDS = [0, 0] | |
deg2rad = lambda deg: math.pi * deg / 180 | |
def geodist(x, y): # adapted from https://www.movable-type.co.uk/scripts/latlong.html | |
R = 6371 # mean radius of earth, in km | |
# destructure coords to lat/lon | |
lon1, lat1 = x | |
lon2, lat2 = y | |
phi1 = deg2rad(lat1) | |
phi2 = deg2rad(lat2) | |
deltaphi = deg2rad(lat2 - lat1) / 2 | |
deltalambda = deg2rad(lon2 - lon1) / 2 | |
a = math.sin(deltaphi) * math.sin(deltaphi) \ | |
+ math.cos(phi1) * math.cos(phi2) * math.sin(deltalambda) \ | |
* math.sin(deltalambda) | |
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) | |
return R * c # output is distance in km | |
# original: | |
# getpoint = | |
# lambda feature: | |
# feature["geometry"]["geometries"][0]["coordinates"] if "geometries" in feature["geometry"] else feature["geometry"]["coordinates"] | |
def getpoint(feature): | |
# if there are multiple geometries for this feature | |
if "geometries" in feature["geometry"]: | |
# first geometry seems to always be the point, probably should check this though | |
return feature["geometry"]["geometries"][0]["coordinates"] | |
else: | |
# if there's only one geometry it should just be a point | |
return feature["geometry"]["coordinates"] | |
gettitle = lambda feature: feature["properties"]["title"] | |
# returns (index, distance, coordinate) | |
def points_by_distance(features, ref_coord): | |
# extract coordinates from the feature list, | |
# attaching the feature index and distance to ref_coord to them | |
def makepoint(index_and_feature): | |
index, feature = index_and_feature # unpack index and feature | |
point = getpoint(feature) | |
return (index, geodist(ref_coord, point), point) | |
points = map(makepoint, enumerate(features)) | |
# now return the points as a list sorted by distance | |
return sorted(points, key = lambda p: p[1]) | |
# gets properties and distances of events within the specified radius in km | |
def within(features, sorted_points, radius): | |
for p in sorted_points: | |
if p[1] > radius: | |
break # stop when radius exceeded | |
yield (p[1], features[p[0]]["properties"]) | |
STATUS_REGEX = re.compile(r"STATUS: ([^<]+)") | |
TYPE_REGEX = re.compile(r"TYPE: ([^<]+)") | |
while True: | |
#features = json.loads(open("majorIncidents.json").read())["features"] | |
features = get_features_from_net() | |
sorted_points = points_by_distance(features, MY_COORDS) | |
print("Closest event: {:6.2f} km away, {}\n".format( | |
sorted_points[0][1], gettitle(features[sorted_points[0][0]]))) | |
print("Within {} km:".format(RADIUS_LIMIT)) | |
any_within = False | |
for (distance, prop) in within(features, sorted_points, RADIUS_LIMIT): | |
any_within = True | |
# attempt to pull status and type strings from description | |
def from_desc(regex, default="<unknown>"): | |
match = regex.search(prop["description"]) | |
if match: | |
return match.group(1).strip() | |
else: | |
return default | |
status = from_desc(STATUS_REGEX) | |
ftype = from_desc(TYPE_REGEX) | |
print("* {:6.2f} km {:>16}: {}\t- {} {}".format( | |
distance, prop["category"], prop["title"], status, ftype)) | |
if "guid" in prop: | |
print(" {}\n".format(prop["guid"])) | |
if not any_within: | |
print("None!") | |
print("----------------\n") | |
time.sleep(UPDATE_INTERVAL) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment