Skip to content

Instantly share code, notes, and snippets.

@libbkmz
Last active June 30, 2019 22:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save libbkmz/836356425fd543ed322b866cb3db2ebc to your computer and use it in GitHub Desktop.
Save libbkmz/836356425fd543ed322b866cb3db2ebc to your computer and use it in GitHub Desktop.
from flask import Flask, request, Response
import json
from nest import convert
app = Flask(__name__)
# usage example:
# cat input.json | http 'http://127.0.0.1:5000?level=currency&level=country&level=city'
@app.route('/', methods=["POST"])
def hello_world():
levels = request.args.getlist('level')
js = request.get_json()
return Response(json.dumps(convert(levels, js)), content_type="application/json")
[
{
"country": "US",
"city": "Boston",
"currency": "USD",
"amount": 100
},
{
"country": "FR",
"city": "Paris",
"currency": "EUR",
"amount": 20
},
{
"country": "FR",
"city": "Lyon",
"currency": "EUR",
"amount": 11.4
},
{
"country": "ES",
"city": "Madrid",
"currency": "EUR",
"amount": 8.9
},
{
"country": "UK",
"city": "London",
"currency": "GBP",
"amount": 12.2
},
{
"country": "UK",
"city": "London",
"currency": "FBP",
"amount": 10.9
}
]
import json
from copy import deepcopy as copy
import click
def f(levels, objs):
out = {}
for x in objs:
levels_copy = copy(levels)
out = obj_disengage(levels_copy, x, out)
return out
def obj_disengage(levels, obj, out):
if not levels:
if out and isinstance(out, list):
return out+[obj]
else:
return [obj]
if not obj:
raise KeyError("Too many levels...")
lvl = levels.pop(0)
if lvl in obj:
val = obj.pop(lvl)
else:
raise KeyError(f"Level '{lvl}' does not exists in input data")
out[val] = obj_disengage(levels, obj, out[val] if val in out else {})
return out
def convert(levels, obj):
return f(list(levels), obj)
@click.command()
@click.argument("levels", nargs=-1)
def main(levels):
stdin = click.get_text_stream("stdin")
js = json.loads(stdin.read())
print(convert(levels, js))
if __name__ == "__main__":
main()
{
"EUR": {
"ES": {
"Madrid": [{"amount": 8.9}]
},
"FR": {
"Lyon": [{"amount": 11.4}],
"Paris": [{"amount": 20}]
}
},
"FBP": {
"UK": {
"London": [{"amount": 10.9}]
}
},
"GBP": {
"UK": {
"London": [{"amount": 12.2}]
}
},
"USD": {
"US": {
"Boston": [{"amount": 100}]
}
}
}
-- http://sqlfiddle.com/#!17/928a9/26
CREATE INDEX ts_idx ON exchange_rates(ts);
WITH ex_rates AS (
SELECT t.ts, er.ts, user_id, currency, amount, from_currency, to_currency, rate FROM transactions as t
LEFT JOIN exchange_rates er ON (
er.ts = (
SELECT MAX(er2.ts) FROM exchange_rates AS er2
WHERE er2.ts <= t.ts AND t.currency = er2.from_currency AND er2.to_currency = 'GBP'
)
AND t.currency = er.from_currency
AND er.to_currency = 'GBP'
)
)
SELECT user_id, sum(amount*coalesce(rate, 1)) as total_spent_gbp FROM ex_rates
GROUP BY user_id
ORDER BY user_id
certifi==2019.6.16
chardet==3.0.4
Click==7.0
Flask==1.0.3
idna==2.8
itsdangerous==1.1.0
Jinja2==2.10.1
MarkupSafe==1.1.1
Pygments==2.4.2
python-dateutil==2.8.0
pytz==2019.1
requests==2.22.0
six==1.12.0
urllib3==1.25.3
Werkzeug==0.15.4
from unittest import TestCase
from nest import convert
import json
with open("input.json", "r") as hl:
input_json = hl.read()
with open("output.json", "r") as hl:
output_json = hl.read()
class TestMain(TestCase):
def test_main(self):
a = convert(("currency", "country", "city"), json.loads(input_json))
b = json.loads(output_json)
self.assertEqual(a, b)
def test_wrong_key(self):
with self.assertRaisesRegex(KeyError, "Level 'a' does not exists in input data"):
convert(["a"], [{"b": 1}])
def test_1(self):
a = convert(["a"], [
{"a": 1, "b": 2},
{"a": 1, "b": 3},
{"a": 1, "b": 4, "c": 4},
{"a": 1, "b": 5},
])
b = {1: [{'b': 2}, {'b': 3}, {'b': 4, 'c': 4}, {'b': 5}]}
self.assertEqual(a, b)
def test_2(self):
a = convert(["a", "b"], [
{"a": 1, "b": 2},
{"a": 1, "b": 3},
{"a": 1, "b": 4, "c": 4},
{"a": 1, "b": 4, "c": 5},
{"a": 1, "b": 4, "c": 5, "d": 6},
{"a": 1, "b": 4, "c": 5, "d": 7},
{"a": 1, "b": 5},
])
b = {1: {2: [{}], 3: [{}], 4: [{"c": 4}, {"c": 5}, {"c": 5, "d": 6}, {"c": 5, "d": 7}], 5: [{}]}}
self.assertEqual(a, b)
def test_3(self):
with self.assertRaisesRegex(KeyError, "Too many levels..."):
convert(["a", "b", "c", "d", "e"], [
{"a": 1, "b": 2, "c": 3, "d": 4},
{"a": 1, "b": 2, "c": 3, "d": 4, "e": 5},
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment