Skip to content

Instantly share code, notes, and snippets.

@kleptog
Created June 12, 2018 20:03
Show Gist options
  • Save kleptog/572b529b84a1f0c40c3c69edaa18d670 to your computer and use it in GitHub Desktop.
Save kleptog/572b529b84a1f0c40c3c69edaa18d670 to your computer and use it in GitHub Desktop.
A simple example for how to export data from the Enelogic API (WSSE)
# From: https://enelogic.docs.apiary.io/#introduction/option-1.-wsse-authentication
# Go to the developer center: enelogic.com/nl/developers.
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com)
# Fill in values below
import hashlib
import json
import os
import requests
import random
import string
from datetime import datetime, timedelta
appid = "..."
appsecret = "..."
username = "...@..."
apikey = "..."
def wsse(username, password):
nonce = ''.join(random.choice(string.letters + string.digits) for _ in range(16))
created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
hash = hashlib.sha1(nonce + created + password).digest().encode('base64').strip()
return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (
username, hash, nonce.encode('base64').strip(), created)
def enelogic_wsse(appid, username, appsecret, apikey):
username = appid + "." + username
password = appsecret + "." + apikey
return wsse(username, password)
s = requests.Session()
s.headers.update({'Content-Type': 'application/json'})
# Get buildings
# Note: you have to regenerate the WSSE every request...
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})
res = s.get("https://enelogic.com/api/buildings/")
res.raise_for_status()
buildings = {r['id']: r for r in res.json()}
#[
# {
# "id": 1505,
# "label": "Standaard",
# "address": "Nijverheidsweg",
# "addressNo": 25,
# "addressAdd": "A",
# "zipCode": "4529PP",
# "location": "Eede",
# "region": "Zeeland",
# "country": "NL",
# "timezone": "Europe/Amsterdam",
# "main": true
# }
#]
# Get measuringpoints
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})
res = s.get("https://enelogic.com/api/measuringpoints/")
res.raise_for_status()
measurepoints = {r['id']: r for r in res.json()}
# {
# "id": 1651,
# "buildingId": 1505,
# "mdId": 1,
# "label": "Netbeheerder",
# "dayMin": "2013-03-14 00:00:00",
# "dayMax": "2014-02-27 23:45:00",
# "monthMin": "2013-02-15 00:00:00",
# "monthMax": "2014-02-27 00:00:00",
# "yearMin": "2013-02-01 00:00:00",
# "yearMax": "2014-02-27 00:00:00",
# "timezone": "Europe/Amsterdam"
# },
for measurepoint in measurepoints:
curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
while curr < end:
fname = "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'
next = curr + timedelta(days=1)
if os.path.exists(fname):
curr = next
continue
print fname
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})
res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d")))
res.raise_for_status()
with open(fname, "w") as f:
f.write(res.content)
curr = next
@edsub
Copy link

edsub commented Oct 16, 2023

Based on the Enelogic API docs and some insights I got from the above, I have a running Python3 script that gets my metering data (based on the Enelogic measuringpoint_id) between 2 dates and saves the interval volumes (calculated between two consecutive readings from Enelogic) in a certain format in CSV.
The script does some basic checks but nothing special.
I use OAuth2.0, but it requires me to get a new accesstoken every 2 or 3 days. This is annoying so I'd like to learn how to refresh the accesstoken using the refreshtoken that is supplied via the Google OAuth 2.0 playground..... Any help appreciated.

#!python3
import requests
from urllib.parse import urlencode
import json
from datetime import datetime
from datetime import timedelta
from datetime import date
import operator
from decimal import Decimal
import ciso8601
import pytz
import csv

access_token = "from_google_oauth2.0_playground" ##UPDATE BEFORE USE OF SCRIPT
base_url = "https://enelogic.com/api"
meteringpoint = "measuringpoint_id_in enelogic" ##UPDATE BEFORE USE OF SCRIPT
meteringdaystart = (ciso8601.parse_datetime("2023-09-10")).astimezone()   ##UPDATE BEFORE USE OF SCRIPT
meteringdaystop =  (ciso8601.parse_datetime("2023-10-15")).astimezone()  ##UPDATE BEFORE USE OF SCRIPT
#meteringdaystop = (ciso8601.parse_datetime(date.today().strftime("%Y-%m-%d"))-timedelta(days=1)).astimezone()
meterid_I="i_9999999999"  ## hardcoded meterid used in csv file, import (levering) meter
meterid_E="e_9999999999" ##hardcoded meterid used in csv file, export (teruglevering) meter


################
#Error display #
################
def show_error():
    ft = sys.exc_info()[0]
    fv = sys.exc_info()[1]
    print("Error type : %s" % ft )
    print("Error value: %s" % fv )
    return

def daterange(start_date, end_date):
    for n in range(int((end_date.replace(tzinfo=None) - start_date.replace(tzinfo=None)).days)):
        yield start_date + timedelta(n)

def get_interval_data(meterpointid,day):
    s_day=day.strftime("%Y-%m-%d")
    s_nextday = (day+timedelta(days=1)).strftime("%Y-%m-%d")
    url=base_url+"/measuringpoints/"+meterpointid+"/datapoints/"+s_day+"/"+s_nextday+"?access_token="+access_token
    headers = {"Content-type": "application/json",
       "Accept": "*/*",
       "Accept-Encoding" : "gzip,deflate,br",
       "Connection" : "keep-alive"}
    meteringdata=None
    print(url)
    try:
        response = requests.get(url=url, headers=headers) # , data=json.dumps(body))
        if response.status_code != 200:
            print ("HTTP Error retrieving metering data. HTTP response: %s" % (response.status_code))
        else:
            print ("Metering data retrieved. HTTP response: %s" % (response.status_code))
            meteringdata=response.json()
    except:
        show_error()
        print ("Error retrieveing metering data")
    return meteringdata

#################################################################
print(meteringdaystart.isoformat())
print(meteringdaystop.isoformat())
#meteringday = datetime.strptime("2023-08-21", "%Y-%m-%d")
#meteringday = (ciso8601.parse_datetime("2023-02-21")).astimezone()
#print(meteringday.isoformat())

for meteringday in daterange(meteringdaystart, meteringdaystop):
  clean_interval_data = []
  for i in get_interval_data(meteringpoint,meteringday):
      if i not in clean_interval_data:
          clean_interval_data.append(i)
      else:
          print("Warning, duplicate record on %s. Filtered out." % meteringday)
  clean_interval_data.sort(key=operator.itemgetter('datetime','rate'))
  import_interval_data = [d for d in clean_interval_data if d['rate'] == 180]
  export_interval_data = [d for d in clean_interval_data if d['rate'] == 280]
  #For a full day Enelogic provides 97 values
  if  len(import_interval_data) != 97 or  len(export_interval_data) !=97:
     print("Warning, unexpected number of reads on %s (97 expected)" % meteringday)
     print("Import reads : %d"% len(import_interval_data))
     print("Export reads : %d"% len(export_interval_data))
#     exit(1)
  utctz=pytz.timezone("UTC")
  n=0
#Import
  csv_filename="I_"+meterid_I+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
  try:
#If csv_file exists: open it
          csv_file=open(csv_filename, 'rt')
          csv_file.close()
          csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
  except IOError:
#Otherwise: create it
          csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
          writer.writerow([
           'MeterID',
           'Resolution',
           'IntervalStart',
           'IsEstimated',
           'EnergyWh'
           ])
  for i in  import_interval_data:
      #print(i['datetime'], 'Import', i['quantity'])
       if n==0:
          old_quantity = i['quantity']
       else: #ciso8601.parse_datetime(i['datetime'])
          intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
          intervalstart=intervalstart.astimezone(utctz)
          print(i['datetime'], 'Import', i['quantity'])
          writer.writerow([
            meterid_I,
            '15min',
            intervalstart.isoformat().replace("+00:00", "Z"),
            'FALSE',
            str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
            ])
          old_quantity = i['quantity']
       n+=1
  csv_file.close()
  n=0
#Export
  csv_filename="E_"+meterid_E+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
  try:
#If csv_file exists: open it
          csv_file=open(csv_filename, 'rt')
          csv_file.close()
          csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
  except IOError:
#Otherwise: create it
          csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
          writer.writerow([
           'MeterID',
           'Resolution',
           'IntervalStart',
           'IsEstimated',
           'EnergyWh'
           ])
  for i in  export_interval_data:
       #print(i['datetime'], 'Export', i['quantity'])
       if n==0:
          old_quantity = i['quantity']
       else:
          intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
          intervalstart=intervalstart.astimezone(utctz)
          print(i['datetime'], 'Export', i['quantity'])
          writer.writerow([
            meterid_E,
            '15min',
            intervalstart.isoformat().replace("+00:00", "Z"),
            'FALSE',
            str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
            ])
          old_quantity = i['quantity']
       n+=1
  csv_file.close()
 
##output format
##MeterID;Resolution;IntervalStart;IsEstimated;EnergyWh
##91998_I;15min;2021-12-31T23:00:00Z;FALSE;82.76184291

@edsub
Copy link

edsub commented Jan 25, 2024

Anyone that have the refresh token using OAuth2 running??

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment