Skip to content

Instantly share code, notes, and snippets.

@mraspaud
Created March 18, 2015 09:18
Show Gist options
  • Save mraspaud/36244cf58bcc1eb06ea8 to your computer and use it in GitHub Desktop.
Save mraspaud/36244cf58bcc1eb06ea8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014, 2015 Martin Raspaud
# Author(s):
# Martin Raspaud <martin.raspaud@smhi.se>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Test a new approach to the producer.
"""
from trollduction.xml_read import ProductList
import Queue
from mpop.utils import debug_on
debug_on()
def process_pl(elt, info):
flat_list = []
for item in elt:
new_info = info.copy()
for key, val in item.attrib.items():
if key in ["id", "name"]:
key = item.tag + key
new_info[key] = val
if item.text.strip():
return item.text.strip(), new_info
else:
flat_list.append(process_pl(item, new_info))
return flat_list
fname = "/home/a001673/usr/src/pytroll-config/etc/polar_product_list.xml"
from mpop.satellites import GenericFactory as GF
import logging
from trollsched.satpass import Pass
LOGGER = logging.getLogger(__name__)
def create_scene_from_message(msg):
"""Parse the message *msg* and return a corresponding MPOP scene.
"""
if msg.type in ["file", 'collection', 'dataset']:
return create_scene_from_mda(msg.data)
def create_scene_from_mda(mda):
"""Read the metadata *mda* and return a corresponding MPOP scene.
"""
time_slot = (mda.get('start_time') or
mda.get('nominal_time') or
mda.get('end_time'))
# orbit is not given for GEO satellites, use None
if 'orbit_number' not in mda:
mda['orbit_number'] = None
platform = mda["platform_name"]
LOGGER.info("platform %s time %s",
str(platform), str(time_slot))
if isinstance(mda['sensor'], (list, tuple, set)):
sensor = mda['sensor'][0]
else:
sensor = mda['sensor']
# Create satellite scene
global_data = GF.create_scene(satname=str(platform),
satnumber='',
instrument=str(sensor),
time_slot=time_slot,
orbit=mda['orbit_number'],
variant=mda.get('variant', ''))
LOGGER.debug("Creating scene for satellite %s", str(platform))
if mda['orbit_number'] is not None or mda.get('orbit_type') == "polar":
global_data.overpass = Pass(platform,
mda['start_time'],
mda['end_time'],
instrument=sensor)
# Update missing information to global_data.info{}
# TODO: this should be fixed in mpop.
global_data.info.update(mda)
global_data.info['time'] = time_slot
return global_data
class AbstractProcessor(object):
def _process_children(self, stuff):
for child in self.data:
child.process(stuff)
def required_channels(self, scene):
req_chans = set()
for child in self.data:
req_chans |= child.required_channels(scene)
return req_chans
class PLProcessor(AbstractProcessor):
def __init__(self, pl):
self.info = pl.attrib
self.data = [GroupProcessor(grp, self.info) for grp in pl.groups]
def process(self, msg):
self._process_children(create_scene_from_message(msg))
class GroupProcessor(AbstractProcessor):
def __init__(self, group, params):
self.info = params.copy()
self.info.update(group.info)
self.info["groupid"] = self.info["id"]
del self.info["id"]
if "name" in self.info:
self.info["groupname"] = self.info["name"]
del self.info["name"]
self.data = [processors.get(ch.tag, Processor)(ch, self.info)
for ch in group.data]
def get_def_names(self):
return [area.info["areaid"] for area in self.data]
def process(self, global_data):
# check coverage
# load data
req_channels = self.required_channels(global_data)
global_data.load(req_channels, area_def_names=self.get_def_names())
self._process_children(global_data)
if self.info.get("unload", False):
loaded_channels = [chn.name for chn
in global_data.loaded_channels()]
global_data.unload(*loaded_channels)
LOGGER.debug("unloading all channels after group %s",
self.info["groupid"])
class Processor(AbstractProcessor):
def __init__(self, elt, params):
self.info = params.copy()
self.info.update(elt.attrib)
for key in ["id", "name"]:
if key in self.info:
self.info[elt.tag + key] = self.info[key]
del self.info[key]
self.data = [processors.get(ch.tag, Processor)(ch, self.info)
for ch in elt]
if not self.data:
self.data = elt.text
class FileProcessor(Processor):
def process(self, img):
# todo: don't do overlay here, it could be done once for all...
if "overlay" in self.info:
img.add_overlay_from_config(self.info["overlay"])
# todo: link, trollsift compose
from trollsift import compose
img.save(compose(self.data, self.info))
class AreaProcessor(Processor):
def process(self, global_data):
local_data = global_data.project(self.info["areaid"],
self.required_channels(global_data))
self._process_children(local_data)
class ProdProcessor(Processor):
queue = Queue.Queue()
def process(self, scene):
fun = getattr(scene.image, self.info["productid"])
img = fun()
self._process_children(img)
def required_channels(self, scene):
try:
composite = getattr(scene.image, self.info["productid"])
except AttributeError:
LOGGER.debug("Composite %s not available",
self.info['productid'])
return set()
else:
return composite.prerequisites
processors = {
"area": AreaProcessor,
"product": ProdProcessor,
"file": FileProcessor,
}
from mock import patch, MagicMock
import mpop
@patch('mpop.scene.SatelliteInstrumentScene.load')
@patch('mpop.scene.SatelliteInstrumentScene.project')
@patch('trollsift.compose')
def test(new_compose, new_project, new_load):
new_load.return_value = mpop.scene.SatelliteInstrumentScene()
new_project.return_value = mpop.scene.SatelliteInstrumentScene()
new_project.return_value.image = MagicMock()
new_load.return_value.image = MagicMock()
new_compose = (lambda x, y: x)
pl = ProductList(fname)
proc = PLProcessor(pl)
from posttroll.message import Message
message = Message(
rawstr="""pytroll://AAPP-HRPT/1b/norrköping/utv/polar/direct_readout/ file safusr.u@lxserv248.smhi.se 2015-03-15T09:51:30.793830 v1.01 application/json {"uid": "hrpt_noaa15_20150315_0942_87549.l1b", "format": "AAPP-HRPT", "type": "binary", "start_time": "2015-03-15T09:42:28", "orbit_number": 87548, "uri": "ssh://safe.smhi.se//san1/pps/import/PPS_data/source/noaa15_20150315_0942_87549/hrpt_noaa15_20150315_0942_87549.l1b", "platform_name": "NOAA-15", "end_time": "2015-03-15T09:51:16", "sensor": "avhrr/3", "data_processing_level": "1b"}""")
proc.process(message)
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment