Skip to content

Instantly share code, notes, and snippets.

@swarbhanu
Created October 8, 2012 19:12
Show Gist options
  • Save swarbhanu/3854310 to your computer and use it in GitHub Desktop.
Save swarbhanu/3854310 to your computer and use it in GitHub Desktop.
'''
@author MManning
@file ion/processes/data/transforms/ctd/ctd_L1_conductivity.py
@description Transforms CTD parsed data into L1 product for conductivity
'''
from pyon.ion.transforma import TransformDataProcess
from pyon.core.exception import BadRequest
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceProcessClient
from ion.services.dm.utility.granule.record_dictionary import RecordDictionaryTool
from ion.core.function.transform_function import SimpleGranuleTransformFunction
# For usage: please refer to the integration tests in
# ion/processes/data/transforms/ctd/test/test_ctd_transforms.py
class CTDL1ConductivityTransform(TransformDataProcess):
''' A basic transform that receives input through a subscription,
parses the input from a CTD, extracts the conductivity value and scales it according to
the defined algorithm. If the transform
has an output_stream it will publish the output on the output stream.
'''
def on_start(self):
super(CTDL1ConductivityTransform, self).on_start()
if not self.CFG.process.publish_streams.has_key('conductivity'):
raise BadRequest("For CTD transforms, please send the stream_id using "
"a special keyword (ex: conductivity)")
self.cond_stream = self.CFG.process.publish_streams.conductivity
# Read the parameter dict from the stream def of the stream
pubsub = PubsubManagementServiceProcessClient(process=self)
self.stream_definition = pubsub.read_stream_definition(stream_id=self.cond_stream)
def recv_packet(self, packet, stream_route, stream_id):
"""Processes incoming data!!!!
"""
if packet == {}:
return
granule = CTDL1ConductivityTransformAlgorithm.execute(packet, params=self.stream_definition._id)
self.conductivity.publish(msg=granule)
class CTDL1ConductivityTransformAlgorithm(SimpleGranuleTransformFunction):
@staticmethod
@SimpleGranuleTransformFunction.validate_inputs
def execute(input=None, context=None, config=None, params=None, state=None):
rdt = RecordDictionaryTool.load_from_granule(input)
conductivity = rdt['conductivity']
cond_value = (conductivity / 100000.0) - 0.5
values = {}
for field_name,value in rdt.iteritems():
values[field_name]=value
# Update the conductivity values
values['conductivity'] = cond_value
# build the granule for conductivity
result = CTDL1ConductivityTransformAlgorithm._build_granule(stream_definition_id = params,
values=values)
return result
@staticmethod
def _build_granule(stream_definition_id=None, values=None):
root_rdt = RecordDictionaryTool(stream_definition_id=stream_definition_id)
for field_name, value in values.iteritems():
root_rdt[field_name] = value
return root_rdt.to_granule()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment