Skip to content

Instantly share code, notes, and snippets.

@ergo70
Last active April 1, 2019 09:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ergo70/06c3bdccc7a7e760ea5b7535f5134b80 to your computer and use it in GitHub Desktop.
Save ergo70/06c3bdccc7a7e760ea5b7535f5134b80 to your computer and use it in GitHub Desktop.
OpenWeatherMap FDW
import requests
from multicorn import ForeignDataWrapper
from multicorn.utils import log_to_postgres, ERROR
__author__ = "Ernst-Georg Schmid"
__copyright__ = "Copyright (c) 2019 Ernst-Georg Schmid"
__license__ = "PostgreSQL"
__version__ = "2.0"
class OWMForeignDataWrapper(ForeignDataWrapper):
"""A multicorn based foreign data wrapper for Open Weather Map"""
def __init__(self, options, columns):
super(OWMForeignDataWrapper, self).__init__(options, columns)
self.weather_api_url = options.get('weather_api_url', 'api.openweathermap.org/data/2.5/weather')
self.uvi_api_url = options.get('uvi_api_url', 'api.openweathermap.org/data/2.5/uvi')
self.https_proxy = options.get('https_proxy', None)
self.api_key = options.get('api_key', None)
self.default_language = options.get('default_language', 'en')
def get_rel_size(self, quals, columns):
"""Inform the planner about the cost of a query"""
width = len(columns) * 100
nb_rows = 1
return (nb_rows, width)
def execute(self, quals, columns, sortkeys=None):
"""Execute a query"""
url = 'https://{}'.format(self.weather_api_url)
row = {}
proxies = None
lat_ok = False
long_ok = False
language = self.default_language
if quals:
for qual in quals:
if qual.field_name.lower() == 'latitude' and qual.operator == '=':
latitude = qual.value
lat_ok = True
if qual.field_name.lower() == 'longitude' and qual.operator == '=':
longitude = qual.value
long_ok = True
if qual.field_name.lower() == 'language' and qual.operator == '=':
language = qual.value
language = language.lower()
if self.api_key and lat_ok and long_ok:
query_params = {'appid': self.api_key, 'lat': latitude, 'lon': longitude, 'lang': language}
if self.https_proxy:
proxies = {'https': self.https_proxy}
with requests.Session() as s:
r = s.get(url, params=query_params, proxies=proxies)
if requests.codes.ok == r.status_code:
row['latitude'] = latitude
row['longitude'] = longitude
row['language'] = language
row['weather_data'] = r.text
url = 'https://{}'.format(self.uvi_api_url)
r = s.get(url, params=query_params, proxies=proxies)
if requests.codes.ok == r.status_code:
row['uvi_data'] = r.text
yield row
else:
log_to_postgres('Call to {} returned HTTP {}.'.format(url, r.status_code), ERROR)
else:
log_to_postgres('Call to {} returned HTTP {}.'.format(url, r.status_code), ERROR)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment