Skip to content

Instantly share code, notes, and snippets.

@kleptog
Created June 12, 2018 20:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kleptog/572b529b84a1f0c40c3c69edaa18d670 to your computer and use it in GitHub Desktop.
Save kleptog/572b529b84a1f0c40c3c69edaa18d670 to your computer and use it in GitHub Desktop.
A simple example for how to export data from the Enelogic API (WSSE)
# From: https://enelogic.docs.apiary.io/#introduction/option-1.-wsse-authentication
# Go to the developer center: enelogic.com/nl/developers.
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com)
# Fill in values below
import hashlib
import json
import os
import requests
import random
import string
from datetime import datetime, timedelta
appid = "..."
appsecret = "..."
username = "...@..."
apikey = "..."
def wsse(username, password):
nonce = ''.join(random.choice(string.letters + string.digits) for _ in range(16))
created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
hash = hashlib.sha1(nonce + created + password).digest().encode('base64').strip()
return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (
username, hash, nonce.encode('base64').strip(), created)
def enelogic_wsse(appid, username, appsecret, apikey):
username = appid + "." + username
password = appsecret + "." + apikey
return wsse(username, password)
s = requests.Session()
s.headers.update({'Content-Type': 'application/json'})
# Get buildings
# Note: you have to regenerate the WSSE every request...
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})
res = s.get("https://enelogic.com/api/buildings/")
res.raise_for_status()
buildings = {r['id']: r for r in res.json()}
#[
# {
# "id": 1505,
# "label": "Standaard",
# "address": "Nijverheidsweg",
# "addressNo": 25,
# "addressAdd": "A",
# "zipCode": "4529PP",
# "location": "Eede",
# "region": "Zeeland",
# "country": "NL",
# "timezone": "Europe/Amsterdam",
# "main": true
# }
#]
# Get measuringpoints
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})
res = s.get("https://enelogic.com/api/measuringpoints/")
res.raise_for_status()
measurepoints = {r['id']: r for r in res.json()}
# {
# "id": 1651,
# "buildingId": 1505,
# "mdId": 1,
# "label": "Netbeheerder",
# "dayMin": "2013-03-14 00:00:00",
# "dayMax": "2014-02-27 23:45:00",
# "monthMin": "2013-02-15 00:00:00",
# "monthMax": "2014-02-27 00:00:00",
# "yearMin": "2013-02-01 00:00:00",
# "yearMax": "2014-02-27 00:00:00",
# "timezone": "Europe/Amsterdam"
# },
for measurepoint in measurepoints:
curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
while curr < end:
fname = "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'
next = curr + timedelta(days=1)
if os.path.exists(fname):
curr = next
continue
print fname
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})
res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d")))
res.raise_for_status()
with open(fname, "w") as f:
f.write(res.content)
curr = next
@ArieKraak
Copy link

I like to have your program as an example to connect to my Enelogic information.

Presumably your example is based upon Python <3. To get the code working for Python 3 I have made some minor changes.

nonce = ''.join(random.choice(string.letters + string.digits) for _ in range(16)) to nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))

and

print fname to print (fname)

But I don't get the hash rule done:
hash = hashlib.sha1(nonce + created + password).digest().encode('base64').strip()

gives an error
"Unicode-objects must be encoded before hashing"

I would appreciate if you could advice me how to solve this foor Python 3

@gjmeer
Copy link

gjmeer commented Jan 28, 2020

Dear Kleptog,

Thank you for your script. I try to import the Python script into Power BI and thus create a table with all data.

I almost succeeded. I only work with the oAuth2 api, so I deleted the WSSE rules.

However, at the end of the script, Python returns a syntax error. I have the impression that it ends up in a certain loop where it does not come out or that the rule: next = curr + timedelta (days = 1) is not clear.

The end of my script is now:

for measurepoint in measurepoints:
    curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
    end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
    
    while curr < end:
        import os
        dirname = os.path.dirname(__file__)
        fname = os.path.join(dirname, "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'
        next = curr + timedelta(days=1)
        
	if os.path.exists(fname):
            curr = next
            continue
        
	print (fname)
        res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d""?access_token="***")))
        res.raise_for_status()
        
	with open(fname, "w") as f:
            f.write(res.content)
        
        curr = next

At the start of the script I imported timedelta from datetime.

Do you have an idea how I can solve this?

@gjmeer
Copy link

gjmeer commented Jan 28, 2020

gives an error
"Unicode-objects must be encoded before hashing"

I solved this via:

while curr < end: import os dirname = os.path.dirname(__file__) fname = os.path.join(dirname, "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'

This was needed beceause I use the script in Power BI en this doesnt allow a reletive path. Don't know if this solution workes for you

@ArieKraak
Copy link

ArieKraak commented Feb 19, 2020

Finally I managed to upgrade the WSSE example of Kleptog from a Python 2 to a Python 3 version (see code below for the essential parts of the authorization code).
There is only one remark:
The authorization works, but not always. Sometimes it gives an error:
403 Client Error: Forbidden for url: https://enelogic.com/api/
So far I did not solve this problem, but must something have to be related to:

  • A problem with cookies?
  • A caching problem?
  • Or just the state of the Enelogic server (I tested this code with a limited account (maximum 500 calls)?
  • or a combination of above, because the problem sometimes disappears after the initial error message
def autorisatie():
    username=u"AppId.EnelogicUsername"
    password="AppSecret.UserAPIKey"
    password=password.encode('utf8')
    nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
    nonce=nonce.encode('utf-8')
    created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    created=created.encode('utf-8')
    hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip()
    wsse= 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8'))
    return(wsse)

wsse=autorisatie ()
s.headers.update({'X-WSSE': wsse}) 

@jeroenf1973
Copy link

Dear Kleptog,

Thank you for your script. I try to import the Python script into Power BI and thus create a table with all data.

I almost succeeded. I only work with the oAuth2 api, so I deleted the WSSE rules.

However, at the end of the script, Python returns a syntax error. I have the impression that it ends up in a certain loop where it does not come out or that the rule: next = curr + timedelta (days = 1) is not clear.

The end of my script is now:

for measurepoint in measurepoints:
    curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
    end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
    
    while curr < end:
        import os
        dirname = os.path.dirname(__file__)
        fname = os.path.join(dirname, "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'
        next = curr + timedelta(days=1)
        
	if os.path.exists(fname):
            curr = next
            continue
        
	print (fname)
        res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d""?access_token="***")))
        res.raise_for_status()
        
	with open(fname, "w") as f:
            f.write(res.content)
        
        curr = next

At the start of the script I imported timedelta from datetime.

Do you have an idea how I can solve this?

Hi gjmeer,

You have an oauth2 script working? Would you like to share the code (i can't get it working)?

regards
J

@gjmeer
Copy link

gjmeer commented Apr 1, 2021

Hi gjmeer,
You have an oauth2 script working? Would you like to share the code (i can't get it working)?

regards
J

Dear J,

It is long time ago i tried this Pythonscript in Power BI. The OAuth2 did work, but I didn't recieve the data I want.
I followed the instruction on enelogic.docs.apiary.io to create an appid, appscret and apikey.

Do you have a working script beside the OAuth?

The Pythonscript is as follows:

# From: https://enelogic.docs.apiary.io/#introduction/option-2.-oauth2-authentication-(recommended)

# Go to the developer center: enelogic.com/nl/developers.
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com)
# Fill in values below

import hashlib
import json
import os
import requests
import random
import string
import base64
from datetime import datetime, date, time, timedelta

appid = "Generated by Google Developers"
appsecret = "Generated by Google Developers"
username = "username from Enelogic"
apikey = "Generated by Google Developers"

def wsse(username, password):
    nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
    nonce = nonce.encode('utf-8')    

    created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    created = created.encode('utf-8')

    hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip()
    
    return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8'))
    return(wsse)

def enelogic_wsse(appid, username, appsecret, apikey):
    username = appid + "." + username
    password = appsecret + "." + apikey
    password = password.encode('utf8')    

    return wsse(username, password)

s = requests.Session()
s.headers.update({'Content-Type': 'application/json'})    

# Get buildings

# Note: you have to regenerate the WSSE every request...
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/buildings/")
res.raise_for_status()

buildings = {r['id']: r for r in res.json()}
#[
#  {
#    "id": 1505,
#    "label": "Standaard",
#    "address": "Nijverheidsweg",
#    "addressNo": 25,
#    "addressAdd": "A",
#    "zipCode": "4529PP",
#    "location": "Eede",
#    "region": "Zeeland",
#    "country": "NL",
#    "timezone": "Europe/Amsterdam",
#    "main": true
#  }
#]

# Get measuringpoints
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/measuringpoints/")
res.raise_for_status()

measurepoints = {r['id']: r for r in res.json()}
#  {
#    "id": 1651,
#    "buildingId": 1505,
#    "mdId": 1,
#    "label": "Netbeheerder",
#    "dayMin": "2013-03-14 00:00:00",
#    "dayMax": "2014-02-27 23:45:00",
#    "monthMin": "2013-02-15 00:00:00",
#    "monthMax": "2014-02-27 00:00:00",
#    "yearMin": "2013-02-01 00:00:00",
#    "yearMax": "2014-02-27 00:00:00",
#    "timezone": "Europe/Amsterdam"
#  },
  
for measurepoint in measurepoints:
    curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
    end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
    
    while curr < end:
        fname = os.path.join(dirname, "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'        
        next = curr + timedelta(days=1)

        if os.path.exists(fname):
            curr = next
            continue
        
        print (fname)
        s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    
        res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d")))
        res.raise_for_status()
        
        with open(fname, "w") as f:
            f.write(res.content)
        
        curr = next

@jeroenf1973
Copy link

Hi gjmeer,
You have an oauth2 script working? Would you like to share the code (i can't get it working)?
regards
J

Dear J,

It is long time ago i tried this Pythonscript in Power BI. The OAuth2 did work, but I didn't recieve the data I want.
I followed the instruction on enelogic.docs.apiary.io to create an appid, appscret and apikey.

Do you have a working script beside the OAuth?

The Pythonscript is as follows:

# From: https://enelogic.docs.apiary.io/#introduction/option-2.-oauth2-authentication-(recommended)

# Go to the developer center: enelogic.com/nl/developers.
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com)
# Fill in values below

import hashlib
import json
import os
import requests
import random
import string
import base64
from datetime import datetime, date, time, timedelta

appid = "Generated by Google Developers"
appsecret = "Generated by Google Developers"
username = "username from Enelogic"
apikey = "Generated by Google Developers"

def wsse(username, password):
    nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
    nonce = nonce.encode('utf-8')    

    created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    created = created.encode('utf-8')

    hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip()
    
    return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8'))
    return(wsse)

def enelogic_wsse(appid, username, appsecret, apikey):
    username = appid + "." + username
    password = appsecret + "." + apikey
    password = password.encode('utf8')    

    return wsse(username, password)

s = requests.Session()
s.headers.update({'Content-Type': 'application/json'})    

# Get buildings

# Note: you have to regenerate the WSSE every request...
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/buildings/")
res.raise_for_status()

buildings = {r['id']: r for r in res.json()}
#[
#  {
#    "id": 1505,
#    "label": "Standaard",
#    "address": "Nijverheidsweg",
#    "addressNo": 25,
#    "addressAdd": "A",
#    "zipCode": "4529PP",
#    "location": "Eede",
#    "region": "Zeeland",
#    "country": "NL",
#    "timezone": "Europe/Amsterdam",
#    "main": true
#  }
#]

# Get measuringpoints
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/measuringpoints/")
res.raise_for_status()

measurepoints = {r['id']: r for r in res.json()}
#  {
#    "id": 1651,
#    "buildingId": 1505,
#    "mdId": 1,
#    "label": "Netbeheerder",
#    "dayMin": "2013-03-14 00:00:00",
#    "dayMax": "2014-02-27 23:45:00",
#    "monthMin": "2013-02-15 00:00:00",
#    "monthMax": "2014-02-27 00:00:00",
#    "yearMin": "2013-02-01 00:00:00",
#    "yearMax": "2014-02-27 00:00:00",
#    "timezone": "Europe/Amsterdam"
#  },
  
for measurepoint in measurepoints:
    curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
    end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
    
    while curr < end:
        fname = os.path.join(dirname, "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'        
        next = curr + timedelta(days=1)

        if os.path.exists(fname):
            curr = next
            continue
        
        print (fname)
        s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    
        res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d")))
        res.raise_for_status()
        
        with open(fname, "w") as f:
            f.write(res.content)
        
        curr = next

dear gjmeer

Thanks for the reply. I already had the wsse script working, but it seems a little unstable (authentication errors now and then). That is why i was searching for an oauth2 solution. Mine works fine with the inintal access token generated via the 'documented' google playground method, but i can't get token refreshed from a python script.

Thanks for your reply, i will struggle on :-)

@HarryVerjans
Copy link

Hi gjmeer,
You have an oauth2 script working? Would you like to share the code (i can't get it working)?
regards
J

Dear J,
It is long time ago i tried this Pythonscript in Power BI. The OAuth2 did work, but I didn't recieve the data I want.
I followed the instruction on enelogic.docs.apiary.io to create an appid, appscret and apikey.
Do you have a working script beside the OAuth?
The Pythonscript is as follows:

# From: https://enelogic.docs.apiary.io/#introduction/option-2.-oauth2-authentication-(recommended)

# Go to the developer center: enelogic.com/nl/developers.
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com)
# Fill in values below

import hashlib
import json
import os
import requests
import random
import string
import base64
from datetime import datetime, date, time, timedelta

appid = "Generated by Google Developers"
appsecret = "Generated by Google Developers"
username = "username from Enelogic"
apikey = "Generated by Google Developers"

def wsse(username, password):
    nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
    nonce = nonce.encode('utf-8')    

    created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    created = created.encode('utf-8')

    hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip()
    
    return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8'))
    return(wsse)

def enelogic_wsse(appid, username, appsecret, apikey):
    username = appid + "." + username
    password = appsecret + "." + apikey
    password = password.encode('utf8')    

    return wsse(username, password)

s = requests.Session()
s.headers.update({'Content-Type': 'application/json'})    

# Get buildings

# Note: you have to regenerate the WSSE every request...
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/buildings/")
res.raise_for_status()

buildings = {r['id']: r for r in res.json()}
#[
#  {
#    "id": 1505,
#    "label": "Standaard",
#    "address": "Nijverheidsweg",
#    "addressNo": 25,
#    "addressAdd": "A",
#    "zipCode": "4529PP",
#    "location": "Eede",
#    "region": "Zeeland",
#    "country": "NL",
#    "timezone": "Europe/Amsterdam",
#    "main": true
#  }
#]

# Get measuringpoints
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/measuringpoints/")
res.raise_for_status()

measurepoints = {r['id']: r for r in res.json()}
#  {
#    "id": 1651,
#    "buildingId": 1505,
#    "mdId": 1,
#    "label": "Netbeheerder",
#    "dayMin": "2013-03-14 00:00:00",
#    "dayMax": "2014-02-27 23:45:00",
#    "monthMin": "2013-02-15 00:00:00",
#    "monthMax": "2014-02-27 00:00:00",
#    "yearMin": "2013-02-01 00:00:00",
#    "yearMax": "2014-02-27 00:00:00",
#    "timezone": "Europe/Amsterdam"
#  },
  
for measurepoint in measurepoints:
    curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
    end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
    
    while curr < end:
        fname = os.path.join(dirname, "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'        
        next = curr + timedelta(days=1)

        if os.path.exists(fname):
            curr = next
            continue
        
        print (fname)
        s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    
        res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d")))
        res.raise_for_status()
        
        with open(fname, "w") as f:
            f.write(res.content)
        
        curr = next

dear gjmeer

Thanks for the reply. I already had the wsse script working, but it seems a little unstable (authentication errors now and then). That is why i was searching for an oauth2 solution. Mine works fine with the inintal access token generated via the 'documented' google playground method, but i can't get token refreshed from a python script.

Thanks for your reply, i will struggle on :-)

Dear Jeroen,
As ypu have a working version of a WSSE script could you help me. The OAUTH2 is too complex for me. I would like to just execute the python script on my windows laptop. But I get some errors. I generated the keys with https://enelogic.com/nl/developers/
appid = "10105_327wdyv10aiow4occc8c0ds08wskodc4scsckoo8dkks4cckkk8"
appsecret = "1rjn5swy4ux3406sw0kk048kc4wwcolwcgskwkws0so0wo084g"
username = "jd@bits4all.nl"
apikey = "4mqjcj35oq04occcskkssc4oc00cog0"
And used the apikey from the enelogic account.
Is this the correct way to obtain the parameters.
The script is not working not on windows and not on raspberry linux.
Could you share your script and information, I know it is a little unstable.
Thanks for your help.

@jeroenf1973
Copy link

jeroenf1973 commented Aug 10, 2022

for me this works:
I have one file pulling in all sorts of energy related data (energie_data.py) which uses my enelogic.py

energie_data.py:

import requests
import enelogic

...
...
# enelogic (ruwe) data opvragen buiten de functie, anders failt de wsse authenticatie steeds...
wsse_header = enelogic.wsse_autorisatie_header()

with requests.Session() as s:
    s.headers.update({'Content-Type': 'application/json'})
    # WSSE header moet bij ieder request ververst worden:
    s.headers.update({'X-WSSE': wsse_header})
    enelogic_url = "https://enelogic.com/api/measuringpoints/3*****0/datapoint/days/{}/{}".format(datum['enelogic_start_date'], datum['enelogic_end_date'])
    res = s.get(enelogic_url)
    # Terug komt json, dus deze kunnen we gaan parsen
    enelogic_json = res.json()
    print(enelogic_json)
......

enelogic.py:

import hashlib
import random
import string
import base64
from datetime import datetime

def wsse_autorisatie_header():
    username = u"955**********w4s.jer*********.com".  #format = app_id.username
    password = "14j********k8c.isf******s8k" # format= app_secret.api_key
    password = password.encode('utf8')
    nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
    nonce = nonce.encode('utf-8')
    created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    created = created.encode('utf-8')
    hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip()
    wsse = 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8'))
    return(wsse)

def process_enelogic_data(json_data):
    #sorteren van de list op basis van de key uit de dicts die in de list zitten
    #eerst op datum dan op rate (id is niet betrouwbaar)
    json_data = sorted(json_data, key = lambda i:(i['date'],i['rate']))
    print("json_data:",json_data)
    #extraheren van data uit de dictionary:
    ......

@edsub
Copy link

edsub commented Oct 16, 2023

Based on the Enelogic API docs and some insights I got from the above, I have a running Python3 script that gets my metering data (based on the Enelogic measuringpoint_id) between 2 dates and saves the interval volumes (calculated between two consecutive readings from Enelogic) in a certain format in CSV.
The script does some basic checks but nothing special.
I use OAuth2.0, but it requires me to get a new accesstoken every 2 or 3 days. This is annoying so I'd like to learn how to refresh the accesstoken using the refreshtoken that is supplied via the Google OAuth 2.0 playground..... Any help appreciated.

#!python3
import requests
from urllib.parse import urlencode
import json
from datetime import datetime
from datetime import timedelta
from datetime import date
import operator
from decimal import Decimal
import ciso8601
import pytz
import csv

access_token = "from_google_oauth2.0_playground" ##UPDATE BEFORE USE OF SCRIPT
base_url = "https://enelogic.com/api"
meteringpoint = "measuringpoint_id_in enelogic" ##UPDATE BEFORE USE OF SCRIPT
meteringdaystart = (ciso8601.parse_datetime("2023-09-10")).astimezone()   ##UPDATE BEFORE USE OF SCRIPT
meteringdaystop =  (ciso8601.parse_datetime("2023-10-15")).astimezone()  ##UPDATE BEFORE USE OF SCRIPT
#meteringdaystop = (ciso8601.parse_datetime(date.today().strftime("%Y-%m-%d"))-timedelta(days=1)).astimezone()
meterid_I="i_9999999999"  ## hardcoded meterid used in csv file, import (levering) meter
meterid_E="e_9999999999" ##hardcoded meterid used in csv file, export (teruglevering) meter


################
#Error display #
################
def show_error():
    ft = sys.exc_info()[0]
    fv = sys.exc_info()[1]
    print("Error type : %s" % ft )
    print("Error value: %s" % fv )
    return

def daterange(start_date, end_date):
    for n in range(int((end_date.replace(tzinfo=None) - start_date.replace(tzinfo=None)).days)):
        yield start_date + timedelta(n)

def get_interval_data(meterpointid,day):
    s_day=day.strftime("%Y-%m-%d")
    s_nextday = (day+timedelta(days=1)).strftime("%Y-%m-%d")
    url=base_url+"/measuringpoints/"+meterpointid+"/datapoints/"+s_day+"/"+s_nextday+"?access_token="+access_token
    headers = {"Content-type": "application/json",
       "Accept": "*/*",
       "Accept-Encoding" : "gzip,deflate,br",
       "Connection" : "keep-alive"}
    meteringdata=None
    print(url)
    try:
        response = requests.get(url=url, headers=headers) # , data=json.dumps(body))
        if response.status_code != 200:
            print ("HTTP Error retrieving metering data. HTTP response: %s" % (response.status_code))
        else:
            print ("Metering data retrieved. HTTP response: %s" % (response.status_code))
            meteringdata=response.json()
    except:
        show_error()
        print ("Error retrieveing metering data")
    return meteringdata

#################################################################
print(meteringdaystart.isoformat())
print(meteringdaystop.isoformat())
#meteringday = datetime.strptime("2023-08-21", "%Y-%m-%d")
#meteringday = (ciso8601.parse_datetime("2023-02-21")).astimezone()
#print(meteringday.isoformat())

for meteringday in daterange(meteringdaystart, meteringdaystop):
  clean_interval_data = []
  for i in get_interval_data(meteringpoint,meteringday):
      if i not in clean_interval_data:
          clean_interval_data.append(i)
      else:
          print("Warning, duplicate record on %s. Filtered out." % meteringday)
  clean_interval_data.sort(key=operator.itemgetter('datetime','rate'))
  import_interval_data = [d for d in clean_interval_data if d['rate'] == 180]
  export_interval_data = [d for d in clean_interval_data if d['rate'] == 280]
  #For a full day Enelogic provides 97 values
  if  len(import_interval_data) != 97 or  len(export_interval_data) !=97:
     print("Warning, unexpected number of reads on %s (97 expected)" % meteringday)
     print("Import reads : %d"% len(import_interval_data))
     print("Export reads : %d"% len(export_interval_data))
#     exit(1)
  utctz=pytz.timezone("UTC")
  n=0
#Import
  csv_filename="I_"+meterid_I+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
  try:
#If csv_file exists: open it
          csv_file=open(csv_filename, 'rt')
          csv_file.close()
          csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
  except IOError:
#Otherwise: create it
          csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
          writer.writerow([
           'MeterID',
           'Resolution',
           'IntervalStart',
           'IsEstimated',
           'EnergyWh'
           ])
  for i in  import_interval_data:
      #print(i['datetime'], 'Import', i['quantity'])
       if n==0:
          old_quantity = i['quantity']
       else: #ciso8601.parse_datetime(i['datetime'])
          intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
          intervalstart=intervalstart.astimezone(utctz)
          print(i['datetime'], 'Import', i['quantity'])
          writer.writerow([
            meterid_I,
            '15min',
            intervalstart.isoformat().replace("+00:00", "Z"),
            'FALSE',
            str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
            ])
          old_quantity = i['quantity']
       n+=1
  csv_file.close()
  n=0
#Export
  csv_filename="E_"+meterid_E+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
  try:
#If csv_file exists: open it
          csv_file=open(csv_filename, 'rt')
          csv_file.close()
          csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
  except IOError:
#Otherwise: create it
          csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
          writer.writerow([
           'MeterID',
           'Resolution',
           'IntervalStart',
           'IsEstimated',
           'EnergyWh'
           ])
  for i in  export_interval_data:
       #print(i['datetime'], 'Export', i['quantity'])
       if n==0:
          old_quantity = i['quantity']
       else:
          intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
          intervalstart=intervalstart.astimezone(utctz)
          print(i['datetime'], 'Export', i['quantity'])
          writer.writerow([
            meterid_E,
            '15min',
            intervalstart.isoformat().replace("+00:00", "Z"),
            'FALSE',
            str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
            ])
          old_quantity = i['quantity']
       n+=1
  csv_file.close()
 
##output format
##MeterID;Resolution;IntervalStart;IsEstimated;EnergyWh
##91998_I;15min;2021-12-31T23:00:00Z;FALSE;82.76184291

@edsub
Copy link

edsub commented Jan 25, 2024

Anyone that have the refresh token using OAuth2 running??

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment