Skip to content

Instantly share code, notes, and snippets.

@tripy
Created September 24, 2015 21:43
Show Gist options
  • Save tripy/45227a49e5a21b2523a6 to your computer and use it in GitHub Desktop.
Save tripy/45227a49e5a21b2523a6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""Opens 'coordinates.json' and prints out customers who are closer
than 100 KM ordered by the user_id.
Skips and prints records to STDERR which are not valid."""
import json
from operator import itemgetter
import sys
from math import sin, cos, radians, sqrt, pow, atan2
# Coordinates of Dublin
DUBLIN = {
"latitude": 53.3381985,
"longitude": -6.2592576,
}
# According to WGS 84 (KM)
EARTH_RADIUS = 6378.137
# We invite people close than this (KM)
GOOD_DISTANCE = 100
# Necessary JSON keys
MUST_HAVE_KEYS = ["name", "user_id", "latitude", "longitude"]
def print_error(message):
"""Prints message to STDERR"""
sys.stderr.write("*ERROR* %s" % message)
def is_valid_schema(json_record):
"""VERY basic JSON schema validation.
Returns False if the record does not match the schema.
Returns True if the recird matches the schema."""
for key in MUST_HAVE_KEYS:
if key not in json_record:
return False
return True
def validate_latitude(latitude):
"""Raises AssertionError if latitude is not valid"""
assert (latitude >= -90) and (latitude <= 90)
def validate_longitude(longitude):
"""Raises AssertionError if longitude is not valid"""
assert (longitude >= -180) and (longitude <= 180)
def normalize_record(json_dict):
"""Checks if the dict is valid and converts longitude
and latitude fields to float.
Returns the normalized dict.
Raises ValueError in case of invalid record."""
if is_valid_schema(json_dict):
json_dict["latitude"] = float(json_dict["latitude"])
json_dict["longitude"] = float(json_dict["longitude"])
try:
validate_latitude(json_dict['latitude'])
validate_longitude(json_dict['longitude'])
except AssertionError:
raise ValueError
else:
raise ValueError
return json_dict
def read_json_record(line):
"""Reads and validates a json record
Returns a dict in case of success.
Returns None in case of failure."""
try:
j_data = json.loads(line, object_hook=normalize_record)
except ValueError:
print_error("Failed to parse record: %s" % line)
return None
return j_data
def calculate_distance(point1, point2):
"""Calculates the distance between two coordinates using
the Vincenty formula.
:param point1: Hash containing point1 latitude and longitude keys
:param point2: Hash containing point1 latitude and longitude keys
:return: distance in KM between point1 and point2"""
lat1 = radians(point1['latitude'])
lon1 = radians(point1['longitude'])
lat2 = radians(point2['latitude'])
lon2 = radians(point2['longitude'])
cos_lat1 = cos(lat1)
sin_lat1 = sin(lat1)
cos_lat2 = cos(lat2)
sin_lat2 = sin(lat2)
delta_lon = lon2-lon1
cos_d_lon = cos(delta_lon)
sin_d_lon = sin(delta_lon)
delta_sigma = \
atan2(
sqrt(
pow(cos_lat2*sin_d_lon, 2) +
pow(cos_lat1*sin_lat2 - sin_lat1*cos_lat2*cos_d_lon, 2)
),
sin_lat1*sin_lat2 + cos_lat1*cos_lat2*cos_d_lon
)
return EARTH_RADIUS * delta_sigma
def filter_record(json_dict):
"""Returns True if the customer is closer than GOOD_DISTANCE"""
return json_dict['dublin_distance'] < GOOD_DISTANCE
def print_results(records):
"""Prints the names and user_ids sorted by the user_ids"""
print "We should invite the following customers:"
for elem in sorted(records, key=itemgetter('user_id')):
print "%s %s" % (elem["user_id"], elem["name"])
def process_records(fp):
good_records = []
for line in fp:
customer = read_json_record(line)
if customer is not None:
customer["dublin_distance"] = calculate_distance(
DUBLIN,
customer
)
if filter_record(customer):
good_records.append(customer)
print_results(good_records)
if __name__ == "__main__":
try:
fp = open('coordinates.json', 'r')
except IOError:
print_error("Unable to open coordinates.json\n")
sys.exit(1)
process_records(fp)
import math
import pytest
import calculate_distance
DISTANCE_ERROR_THRESHOLD = 1
NORTH_POLE = {
"latitude": 90,
"longitude": 0
}
SOUTH_POLE = {
"latitude": -90,
"longitude": 0
}
def test_print_error(capsys):
calculate_distance.print_error("Hello World")
out, err = capsys.readouterr()
assert "*ERROR* Hello World" in err
def test_is_valid_schema_invalid():
invalid_record = {"fake": "record"}
assert not calculate_distance.is_valid_schema(invalid_record)
def test_is_valid_schema_valid():
valid_record = {"latitude": "90", "longitude": "33",
"name": "false name", "user_id": "77"}
assert calculate_distance.is_valid_schema(valid_record)
def test_validate_latitude_invalid():
with pytest.raises(AssertionError):
calculate_distance.validate_latitude(-100)
def test_validate_longitude_invalid():
with pytest.raises(AssertionError):
calculate_distance.validate_longitude(181)
def test_zero_distance():
a = b = {"latitude": 47.5287784, "longitude": 19.0194798}
distance = calculate_distance.calculate_distance(a, b)
assert distance == 0
def test_poles_distances():
# Earth's circumference
expected_distance = 2 * math.pi * calculate_distance.EARTH_RADIUS/2
distance = calculate_distance.calculate_distance(NORTH_POLE, SOUTH_POLE)
assert abs(expected_distance-distance) < DISTANCE_ERROR_THRESHOLD
def test_filter_record_true():
fake_record = {"dublin_distance": 1}
assert calculate_distance.filter_record(fake_record) is True
def test_filter_record_false():
fake_record = {"dublin_distance": 101}
assert calculate_distance.filter_record(fake_record) is False
@tripy
Copy link
Author

tripy commented Sep 24, 2015

Running the tests requires pytest( pip install pytest && py.test)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment