Skip to content

Instantly share code, notes, and snippets.

@tomkralidis
Last active June 5, 2019 13:27
Show Gist options
  • Save tomkralidis/cf42f051f1a109034a3800daaf51beb5 to your computer and use it in GitHub Desktop.
Save tomkralidis/cf42f051f1a109034a3800daaf51beb5 to your computer and use it in GitHub Desktop.
MetPX virtualenv example

Example of MetPX plugin integration in a Python 3 virtualenv.

python3 -m venv bulletins_es_test
cd bulletins_es_test
. bin/activate
pip install elasticsearch
sr_subscribe --version  # Version 2.18.11b6
sr_subscribe bulletins.conf foreground
class Transformer(object):
def __init__(self):
self.dd_url = 'https://dd.weather.gc.ca/bulletins/alphanumeric'
self.bulletin_types = ['SA']
self.index_name = 'bulletins'
def bulletin2dict(self, filepath):
dict_ = {
'type': 'Feature',
'geometry': {
'type': 'Point',
'coordinates': [-75, 45]
},
'properties': {}
}
try:
bulletin_path = filepath.split('/alphanumeric/')[1]
except IndexError as err:
raise RuntimeError(err)
identifier = bulletin_path.replace('/', '.')
issuer_name = None
issuer_country = None
dict_['ID'] = identifier
dict_['properties']['identifier'] = identifier
tokens = bulletin_path.split('/')
yyyymmdd = tokens[0]
hh = tokens[3]
filename = tokens[-1]
yyyy = yyyymmdd[0:4]
mm = yyyymmdd[4:6]
dd = yyyymmdd[6:8]
min_ = filename.split('_')[2][-2:]
datetime = '{}-{}-{} {}:{}'.format(
yyyy, mm, dd, hh, min_)
dict_['geometry'] = {
'type': 'Point',
'coordinates': [-75, 45] # TODO: use real coordinates
}
dict_['properties']['datetime'] = datetime
dict_['properties']['type'] = tokens[1]
dict_['properties']['issuer_code'] = tokens[2]
dict_['properties']['issuer_name'] = issuer_name
dict_['properties']['issuer_country'] = issuer_country
dict_['properties']['issuing_office'] = tokens[2][2:]
dict_['properties']['url'] = '{}/{}'.format(self.dd_url, bulletin_path)
return dict_
def index_to_es(self, dict_):
try:
ES.index(index=self.index_name,
doc_type='FeatureCollection',
id=dict_['ID'],
body=dict_)
except Exception as err:
raise RuntimeError(err)
def perform(self, parent):
from elasticsearch import Elasticsearch
try:
filepath = parent.msg.local_file
parent.logger.info('Indexing {} to Elasticsearch'.format(filepath))
dict_ = self.bulletin2dict(filepath)
parent.logger.debug('Resulting dict: {}'.format(filepath))
self.index_to_es(dict_)
return True
except Exception as err:
parent.logger.warning(err)
return False
transformer = Transformer()
self.on_message = transformer.perform
broker amqps://anonymous:anonymous@dd.weather.gc.ca/
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
directory /data/geomet/swob-dd-feed-test
instances 2
subtopic bulletins.alphanumeric.#
notify_only True
on_message bulletin_es_loader.py
mirror True
accept .*
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment