Skip to content

Instantly share code, notes, and snippets.

@lukecampbell
Last active December 11, 2015 10:38
Show Gist options
  • Save lukecampbell/4588361 to your computer and use it in GitHub Desktop.
Save lukecampbell/4588361 to your computer and use it in GitHub Desktop.
Load datasets and streamer utilities
# IPython log file
from ion.services.dm.utility.granule_utils import time_series_domain
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient
from interface.services.dm.idataset_management_service import DatasetManagementServiceClient
from interface.services.dm.iingestion_management_service import IngestionManagementServiceClient
from interface.services.sa.idata_product_management_service import DataProductManagementServiceClient
from interface.services.dm.idata_retriever_service import DataRetrieverServiceClient
from ion.services.dm.inventory.dataset_management_service import DatasetManagementService
from interface.services.sa.idata_acquisition_management_service import DataAcquisitionManagementServiceClient
from ion.services.dm.utility.granule import RecordDictionaryTool
from pyon.container.cc import Container
from pyon.ion.stream import StandaloneStreamPublisher, StandaloneStreamSubscriber
from ion.services.dm.utility.test.parameter_helper import ParameterHelper
from interface.objects import DataProduct
from coverage_model import QuantityType, ArrayType, CategoryType, ConstantType, SparseConstantType
import numpy as np
import time
from pyon.public import PRED
import gevent
dset_i = 0
stream_n=0
def load_datasets():
global dset_i
pubsub_management = PubsubManagementServiceClient()
dataset_management = DatasetManagementServiceClient()
ingestion_management = IngestionManagementServiceClient()
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
stream_def_id = pubsub_management.create_stream_definition('std', parameter_dictionary_id=pdict_id)
stream_id, route = pubsub_management.create_stream('instrument_stream_%s'%dset_i, 'xp1', stream_definition_id=stream_def_id)
ingest_config_id = ingestion_management.list_ingestion_configurations(id_only=True)[0]
tdom, sdom = time_series_domain()
dataset_id = dataset_management.create_dataset('instrument_dataset_%s'%dset_i, parameter_dictionary_id=pdict_id, spatial_domain=sdom.dump(), temporal_domain=tdom.dump())
ingestion_management.persist_data_stream(stream_id=stream_id, ingestion_configuration_id=ingest_config_id, dataset_id=dataset_id)
dataset_management.register_dataset(dataset_id)
dset_i +=1
return stream_id, route, stream_def_id, dataset_id
def start_stats_listener():
from pyon.event.event import EventSubscriber
def cb(*args, **kwargs):
print args[0].__dict__
es = EventSubscriber(event_type='IngestionStatus', callback=cb)
return es, es.start()
def make_pubsub(data_product_id, callback):
global stream_n
pubsub_management = PubsubManagementServiceClient()
stream_def_id = get_stream_def(data_product_id)
stream_id, route = pubsub_management.create_stream('test%s' % stream_n, stream_definition_id=stream_def_id, exchange_point='science data')
publisher = StandaloneStreamPublisher(stream_id, route)
sub_id = pubsub_management.create_subscription('listener%s' % stream_n, stream_ids=[stream_id])
pubsub_management.activate_subscription(sub_id)
subscriber = StandaloneStreamSubscriber('listener%s' % stream_n, callback=callback)
subscriber.start()
return publisher, subscriber
def get_stream_def(data_product_id):
resource_registry = Container.instance.resource_registry
stream_def_ids, _ = resource_registry.find_objects(data_product_id, 'hasStreamDefinition', id_only=True)
return stream_def_ids[0]
def populate_dataset(dataset_id, hours):
cov = DatasetManagementService._get_simplex_coverage(dataset_id)
dt = int(hours * 3600)
cov.insert_timesteps(dt)
now = time.time()
cov.set_parameter_values('time', np.arange(now - dt, now) + 2208988800)
cov.set_parameter_values('temp', [280000] * dt)
cov.set_parameter_values('conductivity', [100000] * dt)
cov.set_parameter_values('pressure', [2789] * dt)
cov.set_parameter_values('lat', [45] * dt)
cov.set_parameter_values('lon', [-17] * dt)
def load_qc_tables():
resource_registry = Container.instance.resource_registry
data_acquisition = DataAcquisitionManagementServiceClient()
grt_path = 'res/preload/r2_ioc/attachments/Data_QC_Lookup_Table_Global_Range_Test_2013-2-21.csv'
grt_parser_ids, _ = resource_registry.find_resources(restype='Parser', name="Global Range Test",id_only=True)
grt_parser_id = grt_parser_ids[0]
data_acquisition.parse_qc_reference(grt_parser_id, open(grt_path).read())
spk_path = 'res/preload/r2_ioc/attachments/Data_QC_Lookup_Table_spike_test_updated.csv'
spk_parser_ids, _ = resource_registry.find_resources(restype='Parser', name="Spike Test",id_only=True)
spk_parser_id = spk_parser_ids[0]
data_acquisition.parse_qc_reference(spk_parser_id, open(spk_path).read())
def get_rdt(pdict_name='', config={}):
pubsub_management = PubsubManagementServiceClient()
dataset_management = DatasetManagementServiceClient()
pdict_id = dataset_management.read_parameter_dictionary_by_name(pdict_name or 'ctd_parsed_param_dict', id_only=True)
stream_def_id = pubsub_management.create_stream_definition('%s streamdef' % pdict_name or 'ctd', parameter_dictionary_id=pdict_id, stream_configuration=config)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
return rdt
def rdt_for_data_product(data_product_id=''):
resource_registry = Container.instance.resource_registry
stream_def_ids, _ = resource_registry.find_objects(data_product_id,'hasStreamDefinition',id_only=True)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_ids[0])
return rdt
def publish_rdt_to_data_product(data_product_id, rdt, connection_id='', connection_index=''):
resource_registry = Container.instance.resource_registry
pubsub_management = PubsubManagementServiceClient()
stream_ids, _ = resource_registry.find_objects(data_product_id,'hasStream',id_only=True)
stream_id = stream_ids[0]
route = pubsub_management.read_stream_route(stream_id)
publisher = StandaloneStreamPublisher(stream_id,route)
publisher.publish(rdt.to_granule(connection_id=connection_id, connection_index=connection_index))
def populate_data_product(data_product_id, hours=1):
resource_registry = Container.instance.resource_registry
dataset_ids, _ = resource_registry.find_objects(data_product_id, 'hasDataset',id_only=True)
dataset_id = dataset_ids[0]
cov = DatasetManagementService._get_simplex_coverage(dataset_id)
rdt = rdt_for_data_product(data_product_id)
rdt['time'] = [0]
rdt.fetch_lookup_values()
for field in rdt._lookup_values():
print 'looking at ', field
if isinstance(rdt.context(field).param_type, SparseConstantType):
print '%s is sparse constant type' % field
if rdt[field]:
print 'Setting %s in cov' % field
cov.set_parameter_values(field, value=rdt[field])
dt= int(hours * 3600)
cov.insert_timesteps(dt)
now = time.time()
tparam = cov.temporal_parameter_name
for param in cov.list_parameters():
if param == tparam:
cov.set_parameter_values('time', np.arange(now - dt, now) + 2208988800)
else:
fill_parameter(cov, param, dt)
def fill_tmpsf(cov):
cov.insert_timesteps(3600)
now = time.time()
ntp_now = now - 2208988800
cov.set_parameter_values('time', np.arange(ntp_now-3600,ntp_now))
temp_vals = np.array([[0,1,2,3,4]]*3600)
cov.set_parameter_values('temperature', temp_vals)
def fill_parameter(cov, parameter, dt):
context = cov.get_parameter_context(parameter)
t = np.arange(dt)
if isinstance(context.param_type, QuantityType):
if parameter == 'temp':
cov.set_parameter_values(parameter, float_range(-2,50,t))
elif parameter == 'conductivity':
cov.set_parameter_values(parameter, float_range(0, 200,t))
elif parameter == 'pressure':
cov.set_parameter_values(parameter, float_range(0,5000, t))
elif parameter == 'lat':
cov.set_parameter_values(parameter, [45] * dt)
elif parameter == 'lon':
cov.set_parameter_values(parameter, [-71] * dt)
else:
cov.set_parameter_values(parameter, np.sin(np.pi * 2 * t / 60))
elif isinstance(context.param_type, ArrayType):
values = np.array([range(10)]*dt)
cov.set_parameter_values(parameter, values)
elif isinstance(context.param_type, CategoryType):
cov.set_parameter_values(parameter, [context.categories.keys()[0]] * t)
elif isinstance(context.param_type, ConstantType):
cov.set_parameter_values(parameter, np.dtype(context.param_type.value_encoding).type(1))
def float_range(minvar, maxvar,t):
a = (maxvar-minvar)/2
return np.sin(np.pi * 2 * t /60) * a + (minvar + a)
def make_data_product(pdict_name, dp_name, available_fields=[]):
dataset_management = DatasetManagementServiceClient()
pubsub_management = PubsubManagementServiceClient()
data_product_management = DataProductManagementServiceClient()
pdict_id = dataset_management.read_parameter_dictionary_by_name(pdict_name, id_only=True)
stream_def_id = pubsub_management.create_stream_definition('%s stream_def' % dp_name, parameter_dictionary_id=pdict_id, available_fields=available_fields or None)
tdom, sdom = time_series_domain()
tdom = tdom.dump()
sdom = sdom.dump()
dp_obj = DataProduct(name=dp_name)
dp_obj.temporal_domain = tdom
dp_obj.spatial_domain = sdom
data_product_id = data_product_management.create_data_product(dp_obj, stream_definition_id=stream_def_id)
data_product_management.activate_data_product_persistence(data_product_id)
return data_product_id
def create_extended_data_product():
dataset_management = DatasetManagementServiceClient()
ph = ParameterHelper(dataset_management, lambda *args, **kwargs : None)
ph.create_extended_parsed()
dp_id = make_data_product('extended_parsed', 'extended data product')
return dp_id
def create_data_product(pdict_name, reference_designator=''):
global dset_i
dataset_management = DatasetManagementServiceClient()
pubsub_management = PubsubManagementServiceClient()
data_product_management = DataProductManagementServiceClient()
resource_registry = Container.instance.resource_registry
tdom, sdom = time_series_domain()
tdom = tdom.dump()
sdom = sdom.dump()
dp_obj = DataProduct(
name='%s data product' % pdict_name,
description='ctd stream test',
processing_level_code='Parsed_Canonical',
temporal_domain = tdom,
spatial_domain = sdom)
pdict_id = dataset_management.read_parameter_dictionary_by_name(pdict_name, id_only=True)
stream_def_id = pubsub_management.create_stream_definition(name='%s def' % pdict_name, parameter_dictionary_id=pdict_id, stream_configuration={'reference_designator':reference_designator})
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id)
data_product_management.activate_data_product_persistence(data_product_id)
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True)
stream_id = stream_ids[0]
route = pubsub_management.read_stream_route(stream_id)
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True)
dataset_id = dataset_ids[0]
dset_i+=1
return data_product_id, stream_id, route, stream_def_id, dataset_id
def load_data_product_prime():
global dset_i
dataset_management = DatasetManagementServiceClient()
pubsub_management = PubsubManagementServiceClient()
data_product_management = DataProductManagementServiceClient()
resource_registry = Container.instance.resource_registry
tdom, sdom = time_series_domain()
tdom = tdom.dump()
sdom = sdom.dump()
dp_obj = DataProduct(
name='instrument_data_product_%i' % dset_i,
description='ctd stream test',
processing_level_code='Parsed_Canonical',
temporal_domain = tdom,
spatial_domain = sdom)
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_LC_TEST', id_only=True)
stream_def_id = pubsub_management.create_stream_definition(name='parsed', parameter_dictionary_id=pdict_id)
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id)
data_product_management.activate_data_product_persistence(data_product_id)
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True)
stream_id = stream_ids[0]
route = pubsub_management.read_stream_route(stream_id)
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True)
dataset_id = dataset_ids[0]
dset_i+=1
return data_product_id, stream_id, route, stream_def_id, dataset_id
def load_data_product():
global dset_i
dataset_management = DatasetManagementServiceClient()
pubsub_management = PubsubManagementServiceClient()
data_product_management = DataProductManagementServiceClient()
resource_registry = Container.instance.resource_registry
tdom, sdom = time_series_domain()
tdom = tdom.dump()
sdom = sdom.dump()
dp_obj = DataProduct(
name='instrument_data_product_%i' % dset_i,
description='ctd stream test',
processing_level_code='Parsed_Canonical',
temporal_domain = tdom,
spatial_domain = sdom)
pdict_id = dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
stream_def_id = pubsub_management.create_stream_definition(name='parsed', parameter_dictionary_id=pdict_id)
data_product_id = data_product_management.create_data_product(data_product=dp_obj, stream_definition_id=stream_def_id)
data_product_management.activate_data_product_persistence(data_product_id)
stream_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasStream', id_only=True)
stream_id = stream_ids[0]
route = pubsub_management.read_stream_route(stream_id)
dataset_ids, assocs = resource_registry.find_objects(subject=data_product_id, predicate='hasDataset', id_only=True)
dataset_id = dataset_ids[0]
dset_i+=1
return data_product_id, stream_id, route, stream_def_id, dataset_id
class Streamer(object):
def __init__(self, data_product_id, interval=1):
self.resource_registry = Container.instance.resource_registry
self.pubsub_management = PubsubManagementServiceClient()
self.data_product_id = data_product_id
self.i=0
self.interval = interval
self.finished = False
self.g = gevent.spawn(self.run)
def run(self):
while not self.finished:
gevent.sleep(self.interval)
rdt = rdt_for_data_product(self.data_product_id)
now = time.time()
rdt['time'] = np.array([now + 2208988800])
rdt['temp'] = self.float_range(10,14,np.array([now]))
rdt['pressure'] = self.float_range(11,12,np.array([now]))
rdt['lat'] = [41.205]
rdt['lon'] = [-71.74]
rdt['conductivity'] = self.float_range(3.3,3.5,np.array([now]))
rdt['driver_timestamp'] = np.array([now + 2208988800])
rdt['preferred_timestamp'] = ['driver_timestamp']
publish_rdt_to_data_product(self.data_product_id, rdt)
self.i += 1
def close(self):
self.finished = True
self.g.join(5)
self.g.kill()
def start(self):
self.finished = False
self.g = gevent.spawn(self.run)
@classmethod
def float_range(cls, minvar, maxvar,t):
'''
Produces a signal with values between minvar and maxvar
at a frequency of 1/60 Hz centered at the midpoint
between minvar and maxvar.
This method provides a deterministic function that
varies over time and is sinusoidal when graphed.
'''
a = (maxvar-minvar)/2
return np.sin(np.pi * 2 * t /60) * a + (minvar + a)
from pyon.ion.stream import StandaloneStreamPublisher
from ion.services.dm.utility.granule import RecordDictionaryTool
import gevent
import numpy as np
class Streamer(object):
def __init__(self, stream_id, route, stream_def_id, interval=1):
self.publisher = StandaloneStreamPublisher(stream_id, route)
self.i=0
self.interval = interval
self.stream_def_id = stream_def_id
self.g = gevent.spawn(self.run)
self.finished = False
def run(self):
while not self.finished:
gevent.sleep(self.interval)
rdt = RecordDictionaryTool(stream_definition_id=self.stream_def_id)
rdt['time'] = np.array([self.i])
rdt['temp'] = np.array([self.i])
self.i += 1
self.publisher.publish(rdt.to_granule())
def close(self):
self.finished = True
self.g.join(5)
self.g.kill()
def quick_write(stream_id, route, stream_def_id, values=1000):
publisher = StandaloneStreamPublisher(stream_id, route)
stream_def_id = stream_def_id
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
rdt['time'] = np.arange(values)
rdt['temp'] = np.arange(values)
publisher.publish(rdt.to_granule())
from load_datasets import load_data_product
from streamer import quick_write
dp_id, stream_id, route, stream_def_id, dataset_id = load_data_product()
quick_write(stream_id, route, stream_def_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment