Skip to content

Instantly share code, notes, and snippets.

@teopost
Last active December 30, 2019 19:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save teopost/299bb00cea2988bfae3994cc1af2900a to your computer and use it in GitHub Desktop.
Save teopost/299bb00cea2988bfae3994cc1af2900a to your computer and use it in GitHub Desktop.
Get data from ckan (opendata emilia romagna)
#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = 'teopost'
import urllib
import json
import datetime
# 1. SO2 (Biossido di Zolfo)
# 5. PM10 [max 50 al giorno]
# 7. O3 (Ozono) [warning 120, alert 180]
# 8. NO2 (Biossido di azoto) [max 200 l'ora]
# 9. NOX (Ossidi di azoto)
# 10. CO (Monossido di carbonio)
# 20. C6H6 (Benzene)
# 21. C6H5-CH3 (Toluene)
# 38. NO (Monossido di Azoto)
# 111. PM2.5 (Particolato)
def getValue(data, param):
retval=''
url = "https://dati.arpae.it/api/action/datastore_search_sql?sql=SELECT * from %22a1c46cfe-46e5-44b4-9231-7d9260a38e68%22 WHERE station_id = %276000031%27 and reftime=%27" + data + "T00:00:00%27 and variable_id=" + param + " order by value desc"
fileobj = urllib.urlopen(url)
results = fileobj.read().decode("utf-8")
data = json.loads(results)
if len(data["result"]["records"]) == 0:
retval='...'
else:
retval=data["result"]["records"][0]["value"]
return retval
if __name__ == '__main__':
oggi = datetime.date.today()
for day in range(4, 0, -1):
data_analisi = str(oggi - datetime.timedelta(days=day))
print ""
print "Dati del " + data_analisi
print " PM10 : " + str(getValue(data_analisi, "5"))
print " PM2.5: " + str(getValue(data_analisi, "111"))
print " O3 : " + str(getValue(data_analisi, "7"))
print " NO2 : " + str(getValue(data_analisi, "8"))
print " CO : " + str(getValue(data_analisi, "10"))
#!/usr/bin/python
# -*- coding: utf-8 -*-
# per popolare il db con i dati degli ultimi 30 giorni
# for i in {1..30}; do ./ckan2influx.py --days $i; done
__author__ = 'teopost'
import urllib
import argparse
import json
import datetime
from influxdb import InfluxDBClient
# 1. SO2 (Biossido di Zolfo)
# 5. PM10 [max 50 al giorno]
# 7. O3 (Ozono) [warning 120, alert 180]
# 8. NO2 (Biossido di azoto) [max 200 l'ora]
# 9. NOX (Ossidi di azoto)
# 10. CO (Monossido di carbonio)
# 20. C6H6 (Benzene)
# 21. C6H5-CH3 (Toluene)
# 38. NO (Monossido di Azoto)
# 111. PM2.5 (Particolato)
def getValue(data, param):
retval=''
url = "https://dati.arpae.it/api/action/datastore_search_sql?sql=SELECT * from %22a1c46cfe-46e5-44b4-9231-7d9260a38e68%22 WHERE station_id = %276000031%27 and reftime=%27" + str(data) + "T00:00:00%27 and variable_id=" + param + " order by value desc"
print url
fileobj = urllib.urlopen(url)
results = fileobj.read().decode("utf-8")
dt = json.loads(results)
if len(dt["result"]["records"]) == 0:
retval=float(0)
else:
retval=float(dt["result"]["records"][0]["value"])
return retval
def main(day, testmode):
oggi = datetime.date.today()
client = InfluxDBClient(host='localhost', port=8086, database='OPENDATA')
#client.create_retention_policy('awesome_policy', '3d', 3, default=True)
data_analisi = (oggi - datetime.timedelta(days=day))
if testmode:
print ""
print "Dati del " + str(data_analisi)
print " PM10 : " + str(getValue(data_analisi, "5"))
print " PM2.5 : " + str(getValue(data_analisi, "111"))
print " SO2 : " + str(getValue(data_analisi, "1"))
print " O3 : " + str(getValue(data_analisi, "7"))
print " NO2 : " + str(getValue(data_analisi, "8"))
print " NOX : " + str(getValue(data_analisi, "9"))
print " C6H6 : " + str(getValue(data_analisi, "20"))
print " C6H6-CH3 : " + str(getValue(data_analisi, "21"))
print " NO : " + str(getValue(data_analisi, "38"))
json_body = [
{
"measurement": "rilievi",
"tags": {
"station": "276000031"
},
"time": "2009-11-10T23:00:00Z",
"fields": {
"pm10" : 0,
"pm25" : 0,
"so2" : 0,
"o3" : 0,
"no2" : 0,
"nox" : 0,
"c6h6" : 0,
"c6h6-ch3" : 0,
"no" : 0,
}
}
]
json_body[0]['time'] = data_analisi.strftime('%Y-%m-%dT%H:%M:%SZ')
json_body[0]['fields']['pm10'] = getValue(data_analisi, "5")
json_body[0]['fields']['pm25'] = getValue(data_analisi, "111")
json_body[0]['fields']['so2'] = getValue(data_analisi, "1")
json_body[0]['fields']['o3'] = getValue(data_analisi, "7")
json_body[0]['fields']['no2'] = getValue(data_analisi, "8")
json_body[0]['fields']['nox'] = getValue(data_analisi, "9")
json_body[0]['fields']['c6h6'] = getValue(data_analisi, "20")
json_body[0]['fields']['c6h6-ch3'] = getValue(data_analisi, "21")
json_body[0]['fields']['no'] = getValue(data_analisi, "38")
#client.write_points(json_bodyi, retention_policy=retention_policy)
if not testmode:
print "Write on db data of: " + str(data_analisi)
client.write_points(json_body)
def parse_args():
"""Parse the args."""
parser = argparse.ArgumentParser(
description='Legga i dati dagli opendata di arpae romagna e li scrive su influxdb')
parser.add_argument('--days', type=int, required=False, default=0,
help='giorni a ritroso da recuperare')
parser.add_argument('--dryrun', action='store_true',
help='esegue una simulazione delle letture senza scriverli sul database')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
main(day=args.days, testmode=args.dryrun)
#!/usr/bin/python
# -*- coding: utf-8 -*-
# per popolare il db con i dati degli ultimi 30 giorni
# for i in {1..30}; do ./ckan2influx.py --days $i; done
__author__ = 'teopost'
import urllib
import argparse
import json
import datetime
from influxdb import InfluxDBClient
# 1. SO2 (Biossido di Zolfo)
# 5. PM10 [max 50 al giorno]
# 7. O3 (Ozono) [warning 120, alert 180]
# 8. NO2 (Biossido di azoto) [max 200 l'ora]
# 9. NOX (Ossidi di azoto)
# 10. CO (Monossido di carbonio)
# 20. C6H6 (Benzene)
# 21. C6H5-CH3 (Toluene)
# 38. NO (Monossido di Azoto)
# 111. PM2.5 (Particolato)
def getValue(data, param):
retval=''
baseurl='https://dati.arpae.it/api/action/datastore_search_sql'
sql="SELECT * from \"a1c46cfe-46e5-44b4-9231-7d9260a38e68\" WHERE station_id = '6000031' and reftime='\" + str(data) + \"T00:00:00' and variable_id=\" + param + \" order by value desc"
values = {'sql' : sql }
data = urllib.parse.urlencode(values)
data = data.encode('ascii')
req = urllib.request.Request(url, data)
with urllib.request.urlopen(req) as response:
the_page = response.read()
dt = json.loads(the_page)
if len(dt["result"]["records"]) == 0:
retval=float(0)
else:
retval=float(dt["result"]["records"][0]["value"])
return retval
def main(day, testmode):
oggi = datetime.date.today()
client = InfluxDBClient(host='localhost', port=8086, database='OPENDATA')
#client.create_retention_policy('awesome_policy', '3d', 3, default=True)
data_analisi = (oggi - datetime.timedelta(days=day))
if testmode:
print ""
print "Dati del " + str(data_analisi)
print " PM10 : " + str(getValue(data_analisi, "5"))
print " PM2.5 : " + str(getValue(data_analisi, "111"))
print " SO2 : " + str(getValue(data_analisi, "1"))
print " O3 : " + str(getValue(data_analisi, "7"))
print " NO2 : " + str(getValue(data_analisi, "8"))
print " NOX : " + str(getValue(data_analisi, "9"))
print " C6H6 : " + str(getValue(data_analisi, "20"))
print " C6H6-CH3 : " + str(getValue(data_analisi, "21"))
print " NO : " + str(getValue(data_analisi, "38"))
json_body = [
{
"measurement": "rilievi",
"tags": {
"station": "276000031"
},
"time": "2009-11-10T23:00:00Z",
"fields": {
"pm10" : 0,
"pm25" : 0,
"so2" : 0,
"o3" : 0,
"no2" : 0,
"nox" : 0,
"c6h6" : 0,
"c6h6-ch3" : 0,
"no" : 0,
}
}
]
json_body[0]['time'] = data_analisi.strftime('%Y-%m-%dT%H:%M:%SZ')
json_body[0]['fields']['pm10'] = getValue(data_analisi, "5")
json_body[0]['fields']['pm25'] = getValue(data_analisi, "111")
json_body[0]['fields']['so2'] = getValue(data_analisi, "1")
json_body[0]['fields']['o3'] = getValue(data_analisi, "7")
json_body[0]['fields']['no2'] = getValue(data_analisi, "8")
json_body[0]['fields']['nox'] = getValue(data_analisi, "9")
json_body[0]['fields']['c6h6'] = getValue(data_analisi, "20")
json_body[0]['fields']['c6h6-ch3'] = getValue(data_analisi, "21")
json_body[0]['fields']['no'] = getValue(data_analisi, "38")
#client.write_points(json_bodyi, retention_policy=retention_policy)
if not testmode:
print "Write on db data of: " + str(data_analisi)
client.write_points(json_body)
def parse_args():
"""Parse the args."""
parser = argparse.ArgumentParser(
description='Legga i dati dagli opendata di arpae romagna e li scrive su influxdb')
parser.add_argument('--days', type=int, required=False, default=0,
help='giorni a ritroso da recuperare')
parser.add_argument('--dryrun', action='store_true',
help='esegue una simulazione delle letture senza scriverli sul database')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
main(day=args.days, testmode=args.dryrun)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment