Skip to content

Instantly share code, notes, and snippets.

@cluther
Created March 29, 2018 20:51
Show Gist options
  • Save cluther/816bccf545ea1697d1c4aaa87d412d24 to your computer and use it in GitHub Desktop.
Save cluther/816bccf545ea1697d1c4aaa87d412d24 to your computer and use it in GitHub Desktop.
dsplugins.py
"""Monitors current conditions using the Weather Underground API."""
# Logging
import logging
LOG = logging.getLogger('zen.WeatherUnderground')
# stdlib Imports
import json
import random
import time
# Twisted Imports
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.web.client import getPage
# PythonCollector Imports
from ZenPacks.zenoss.PythonCollector.datasources.PythonDataSource import (
PythonDataSourcePlugin,
)
class Alerts(PythonDataSourcePlugin):
"""Weather Underground alerts data source plugin."""
@classmethod
def config_key(cls, datasource, context):
return (
context.device().id,
datasource.getCycleTime(context),
context.id,
'wunderground-alerts',
)
@classmethod
def params(cls, datasource, context):
return {
'api_key': context.zWundergroundAPIKey,
'api_link': context.api_link,
'location_name': context.title,
}
@inlineCallbacks
def collect(self, config):
data = self.new_data()
for datasource in config.datasources:
try:
response = yield getPage(
'http://api.wunderground.com/api/{api_key}/alerts{api_link}.json'
.format(
api_key=datasource.params['api_key'],
api_link=datasource.params['api_link']))
response = json.loads(response)
except Exception:
LOG.exception(
"%s: failed to get alerts data for %s",
config.id,
datasource.location_name)
continue
for alert in response['alerts']:
severity = None
if int(alert['expires_epoch']) <= time.time():
severity = 0
elif alert['significance'] in ('W', 'A'):
severity = 3
else:
severity = 2
data['events'].append({
'device': config.id,
'component': datasource.component,
'severity': severity,
'eventKey': 'wu-alert-{}'.format(alert['type']),
'eventClassKey': 'wu-alert',
'summary': alert['description'],
'message': alert['message'],
'wu-description': alert['description'],
'wu-date': alert['date'],
'wu-expires': alert['expires'],
'wu-phenomena': alert['phenomena'],
'wu-significance': alert['significance'],
'wu-type': alert['type'],
})
dpname = "{}_count".format(datasource.datasource)
data['values'][datasource.component][dpname] = len(response['alerts'])
random_name = "{}_random".format(datasource.datasource)
data['values'][datasource.component][random_name] = random.randint(0, 10)
returnValue(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment