Created
September 24, 2015 21:43
-
-
Save tripy/45227a49e5a21b2523a6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Opens 'coordinates.json' and prints out customers who are closer | |
than 100 KM ordered by the user_id. | |
Skips and prints records to STDERR which are not valid.""" | |
import json | |
from operator import itemgetter | |
import sys | |
from math import sin, cos, radians, sqrt, pow, atan2 | |
# Coordinates of Dublin | |
DUBLIN = { | |
"latitude": 53.3381985, | |
"longitude": -6.2592576, | |
} | |
# According to WGS 84 (KM) | |
EARTH_RADIUS = 6378.137 | |
# We invite people close than this (KM) | |
GOOD_DISTANCE = 100 | |
# Necessary JSON keys | |
MUST_HAVE_KEYS = ["name", "user_id", "latitude", "longitude"] | |
def print_error(message): | |
"""Prints message to STDERR""" | |
sys.stderr.write("*ERROR* %s" % message) | |
def is_valid_schema(json_record): | |
"""VERY basic JSON schema validation. | |
Returns False if the record does not match the schema. | |
Returns True if the recird matches the schema.""" | |
for key in MUST_HAVE_KEYS: | |
if key not in json_record: | |
return False | |
return True | |
def validate_latitude(latitude): | |
"""Raises AssertionError if latitude is not valid""" | |
assert (latitude >= -90) and (latitude <= 90) | |
def validate_longitude(longitude): | |
"""Raises AssertionError if longitude is not valid""" | |
assert (longitude >= -180) and (longitude <= 180) | |
def normalize_record(json_dict): | |
"""Checks if the dict is valid and converts longitude | |
and latitude fields to float. | |
Returns the normalized dict. | |
Raises ValueError in case of invalid record.""" | |
if is_valid_schema(json_dict): | |
json_dict["latitude"] = float(json_dict["latitude"]) | |
json_dict["longitude"] = float(json_dict["longitude"]) | |
try: | |
validate_latitude(json_dict['latitude']) | |
validate_longitude(json_dict['longitude']) | |
except AssertionError: | |
raise ValueError | |
else: | |
raise ValueError | |
return json_dict | |
def read_json_record(line): | |
"""Reads and validates a json record | |
Returns a dict in case of success. | |
Returns None in case of failure.""" | |
try: | |
j_data = json.loads(line, object_hook=normalize_record) | |
except ValueError: | |
print_error("Failed to parse record: %s" % line) | |
return None | |
return j_data | |
def calculate_distance(point1, point2): | |
"""Calculates the distance between two coordinates using | |
the Vincenty formula. | |
:param point1: Hash containing point1 latitude and longitude keys | |
:param point2: Hash containing point1 latitude and longitude keys | |
:return: distance in KM between point1 and point2""" | |
lat1 = radians(point1['latitude']) | |
lon1 = radians(point1['longitude']) | |
lat2 = radians(point2['latitude']) | |
lon2 = radians(point2['longitude']) | |
cos_lat1 = cos(lat1) | |
sin_lat1 = sin(lat1) | |
cos_lat2 = cos(lat2) | |
sin_lat2 = sin(lat2) | |
delta_lon = lon2-lon1 | |
cos_d_lon = cos(delta_lon) | |
sin_d_lon = sin(delta_lon) | |
delta_sigma = \ | |
atan2( | |
sqrt( | |
pow(cos_lat2*sin_d_lon, 2) + | |
pow(cos_lat1*sin_lat2 - sin_lat1*cos_lat2*cos_d_lon, 2) | |
), | |
sin_lat1*sin_lat2 + cos_lat1*cos_lat2*cos_d_lon | |
) | |
return EARTH_RADIUS * delta_sigma | |
def filter_record(json_dict): | |
"""Returns True if the customer is closer than GOOD_DISTANCE""" | |
return json_dict['dublin_distance'] < GOOD_DISTANCE | |
def print_results(records): | |
"""Prints the names and user_ids sorted by the user_ids""" | |
print "We should invite the following customers:" | |
for elem in sorted(records, key=itemgetter('user_id')): | |
print "%s %s" % (elem["user_id"], elem["name"]) | |
def process_records(fp): | |
good_records = [] | |
for line in fp: | |
customer = read_json_record(line) | |
if customer is not None: | |
customer["dublin_distance"] = calculate_distance( | |
DUBLIN, | |
customer | |
) | |
if filter_record(customer): | |
good_records.append(customer) | |
print_results(good_records) | |
if __name__ == "__main__": | |
try: | |
fp = open('coordinates.json', 'r') | |
except IOError: | |
print_error("Unable to open coordinates.json\n") | |
sys.exit(1) | |
process_records(fp) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import pytest | |
import calculate_distance | |
DISTANCE_ERROR_THRESHOLD = 1 | |
NORTH_POLE = { | |
"latitude": 90, | |
"longitude": 0 | |
} | |
SOUTH_POLE = { | |
"latitude": -90, | |
"longitude": 0 | |
} | |
def test_print_error(capsys): | |
calculate_distance.print_error("Hello World") | |
out, err = capsys.readouterr() | |
assert "*ERROR* Hello World" in err | |
def test_is_valid_schema_invalid(): | |
invalid_record = {"fake": "record"} | |
assert not calculate_distance.is_valid_schema(invalid_record) | |
def test_is_valid_schema_valid(): | |
valid_record = {"latitude": "90", "longitude": "33", | |
"name": "false name", "user_id": "77"} | |
assert calculate_distance.is_valid_schema(valid_record) | |
def test_validate_latitude_invalid(): | |
with pytest.raises(AssertionError): | |
calculate_distance.validate_latitude(-100) | |
def test_validate_longitude_invalid(): | |
with pytest.raises(AssertionError): | |
calculate_distance.validate_longitude(181) | |
def test_zero_distance(): | |
a = b = {"latitude": 47.5287784, "longitude": 19.0194798} | |
distance = calculate_distance.calculate_distance(a, b) | |
assert distance == 0 | |
def test_poles_distances(): | |
# Earth's circumference | |
expected_distance = 2 * math.pi * calculate_distance.EARTH_RADIUS/2 | |
distance = calculate_distance.calculate_distance(NORTH_POLE, SOUTH_POLE) | |
assert abs(expected_distance-distance) < DISTANCE_ERROR_THRESHOLD | |
def test_filter_record_true(): | |
fake_record = {"dublin_distance": 1} | |
assert calculate_distance.filter_record(fake_record) is True | |
def test_filter_record_false(): | |
fake_record = {"dublin_distance": 101} | |
assert calculate_distance.filter_record(fake_record) is False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Running the tests requires pytest( pip install pytest && py.test)