Example of MetPX plugin integration in a Python 3 virtualenv.
python3 -m venv bulletins_es_test
cd bulletins_es_test
. bin/activate
pip install elasticsearch
sr_subscribe --version # Version 2.18.11b6
sr_subscribe bulletins.conf foreground
class Transformer(object): | |
def __init__(self): | |
self.dd_url = 'https://dd.weather.gc.ca/bulletins/alphanumeric' | |
self.bulletin_types = ['SA'] | |
self.index_name = 'bulletins' | |
def bulletin2dict(self, filepath): | |
dict_ = { | |
'type': 'Feature', | |
'geometry': { | |
'type': 'Point', | |
'coordinates': [-75, 45] | |
}, | |
'properties': {} | |
} | |
try: | |
bulletin_path = filepath.split('/alphanumeric/')[1] | |
except IndexError as err: | |
raise RuntimeError(err) | |
identifier = bulletin_path.replace('/', '.') | |
issuer_name = None | |
issuer_country = None | |
dict_['ID'] = identifier | |
dict_['properties']['identifier'] = identifier | |
tokens = bulletin_path.split('/') | |
yyyymmdd = tokens[0] | |
hh = tokens[3] | |
filename = tokens[-1] | |
yyyy = yyyymmdd[0:4] | |
mm = yyyymmdd[4:6] | |
dd = yyyymmdd[6:8] | |
min_ = filename.split('_')[2][-2:] | |
datetime = '{}-{}-{} {}:{}'.format( | |
yyyy, mm, dd, hh, min_) | |
dict_['geometry'] = { | |
'type': 'Point', | |
'coordinates': [-75, 45] # TODO: use real coordinates | |
} | |
dict_['properties']['datetime'] = datetime | |
dict_['properties']['type'] = tokens[1] | |
dict_['properties']['issuer_code'] = tokens[2] | |
dict_['properties']['issuer_name'] = issuer_name | |
dict_['properties']['issuer_country'] = issuer_country | |
dict_['properties']['issuing_office'] = tokens[2][2:] | |
dict_['properties']['url'] = '{}/{}'.format(self.dd_url, bulletin_path) | |
return dict_ | |
def index_to_es(self, dict_): | |
try: | |
ES.index(index=self.index_name, | |
doc_type='FeatureCollection', | |
id=dict_['ID'], | |
body=dict_) | |
except Exception as err: | |
raise RuntimeError(err) | |
def perform(self, parent): | |
from elasticsearch import Elasticsearch | |
try: | |
filepath = parent.msg.local_file | |
parent.logger.info('Indexing {} to Elasticsearch'.format(filepath)) | |
dict_ = self.bulletin2dict(filepath) | |
parent.logger.debug('Resulting dict: {}'.format(filepath)) | |
self.index_to_es(dict_) | |
return True | |
except Exception as err: | |
parent.logger.warning(err) | |
return False | |
transformer = Transformer() | |
self.on_message = transformer.perform |
broker amqps://anonymous:anonymous@dd.weather.gc.ca/ | |
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME} | |
directory /data/geomet/swob-dd-feed-test | |
instances 2 | |
subtopic bulletins.alphanumeric.# | |
notify_only True | |
on_message bulletin_es_loader.py | |
mirror True | |
accept .* |