Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tashda/04179bc6f0dfc87d869ffcc37c3c7d58 to your computer and use it in GitHub Desktop.
Save tashda/04179bc6f0dfc87d869ffcc37c3c7d58 to your computer and use it in GitHub Desktop.
# Fetches data from https://www.energidataservice.dk/tso-electricity/elspotprices
# API documentation http://docs.ckan.org/en/latest/api/index.html#making-an-api-request
from datetime import datetime, timedelta
import time
import requests
from influxdb import InfluxDBClient
import paho.mqtt.client as mqtt
INFLUXDB_ADDRESS = '10.0.1.4'
INFLUXDB_USER = '***'
INFLUXDB_PASSWORD = '***'
INFLUXDB_DATABASE = 'klovervang41'
MQTT_ADDRESS = '10.0.1.4'
MQTT_USER = '***'
MQTT_PASSWORD = '***'
MQTT_CLIENT_ID = 'MQTTPowerPrice'
"""
DK_WEST = West Denmark (zip-code above 4999)
DK_EAST = East Denmark (zip-code below 5000)
"""
DK_WEST = "DK1"
DK_EAST = "DK2"
def createHeader():
apiHeader = {
"Content-Type": "application/json"
}
return [apiHeader]
def dateFormat(date):
return date.strftime("%Y-%m-%dT%H:%M:%SZ")
def spotprices(pricezone):
"""
Gives the spot prices in the specified from/to range.
Spot prices are delivered ahead of our current time, up to 24 hours ahead.
The price is excluding VAT and tarrifs.
"""
try:
headers = createHeader()
url = "https://api.energidataservice.dk/datastore_search?resource_id=elspotprices&limit=48&filters={\"PriceArea\":\"DK2\"}&sort=HourUTC desc"
return requests.get(url, headers=headers[0]).json()
except Exception as ex:
raise Exception(str(ex))
def send_sensor_data_to_influxdb(influxdb_client, value):
json_body = [
{
'measurement': "spotprice",
'tags': {
'market': DK_EAST
},
'fields': {
'value': value
}
}
]
print(json_body)
influxdb_client.write_points(json_body)
def publish_message(mqtt_client, topic, value):
result = mqtt_client.publish(topic, value)
# result: [0, 1]
status = result[0]
if status == 0:
print('Send `' + str(value) +'` to topic : ' + topic)
else:
print('Failed to send message to topic : ' + topic)
def filter_current_price(result):
hour_dk = result["HourDK"]
current_hour = time.strftime("%Y-%m-%dT%H:00:00")
return hour_dk == current_hour
mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt_client.connect(MQTT_ADDRESS, 1883)
influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE)
try:
response = spotprices(DK_EAST)
prices = response["result"]["records"]
current_hour_result = list(filter(filter_current_price, iter(prices)))
if len(current_hour_result) == 0:
raise Exception("No current hour result")
current_hour_price = current_hour_result[0]["SpotPriceEUR"]
# Converting to DKK using the target EUR currency rate from Nationalbanken
current_hour_price_kwh = (float(current_hour_price) * 7.46) / 1000.0
#print (current_hour_price_kwh)
send_sensor_data_to_influxdb(influxdb_client, current_hour_price_kwh)
publish_message(mqtt_client, 'basement/hall/sensor/room/powerSpotprice', current_hour_price_kwh)
except Exception as inst:
print("Exception: " + inst)
# To ensure that the message is actually published (async)
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment