Skip to content

Instantly share code, notes, and snippets.

@lukecampbell
Created November 15, 2012 21:30
Show Gist options
  • Save lukecampbell/4081414 to your computer and use it in GitHub Desktop.
Save lukecampbell/4081414 to your computer and use it in GitHub Desktop.
dataset loader
# IPython log file
from pyon.ion.stream import StandaloneStreamPublisher
from ion.services.dm.utility.granule_utils import time_series_domain
from ion.services.dm.utility.granule import RecordDictionaryTool
import numpy as np
import time
#params = ['time','temp']
#
#pcr_ids = [pn.dataset_management.read_parameter_context_by_name(i,id_only=True) for i in params]
#pdict_id = pn.dataset_management.create_parameter_dictionary('test',parameter_context_ids=pcr_ids, temporal_context='time')
pdict_id = pn.dataset_management.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
stream_def_id = pn.pubsub_management.create_stream_definition('std', parameter_dictionary_id=pdict_id)
stream_id, route = pn.pubsub_management.create_stream('instrument_stream', 'xp1', stream_definition_id=stream_def_id)
ingest_config_id = pn.ingestion_management.list_ingestion_configurations(id_only=True)[0]
tdom, sdom = time_series_domain()
dataset_id = pn.dataset_management.create_dataset('instrument_dataset', parameter_dictionary_id=pdict_id, spatial_domain=sdom.dump(), temporal_domain=tdom.dump())
publisher = StandaloneStreamPublisher(stream_id, route)
pn.ingestion_management.persist_data_stream(stream_id=stream_id, ingestion_configuration_id=ingest_config_id, dataset_id=dataset_id)
rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
rdt['time'] = np.arange(20)
rdt['temp'] = np.arange(20)
rdt['pressure'] = np.random.randn(20) * 0.5 + 10
rdt['conductivity'] = [0] * 20
publisher.publish(rdt.to_granule())
pn.dataset_management.register_dataset(dataset_id)
#def sinewave(t):
# import numpy as np
# return 3 * np.sin(2 * np.pi * t/60.) + 20.21
#vfunc = np.vectorize(sinewave)
#for i in xrange(7):
# stream_id, route = pn.pubsub_management.create_stream('instrument_stream_%s' % i, 'xp1', stream_definition_id=stream_def_id)
# ingest_config_id = pn.ingestion_management.list_ingestion_configurations(id_only=True)[0]
# tdom, sdom = time_series_domain()
# dataset_id = pn.dataset_management.create_dataset('instrument_dataset_%s' % i, parameter_dictionary_id=pdict_id, spatial_domain=sdom.dump(), temporal_domain=tdom.dump())
# publisher = StandaloneStreamPublisher(stream_id, route)
# pn.ingestion_management.persist_data_stream(stream_id=stream_id, ingestion_configuration_id=ingest_config_id, dataset_id=dataset_id)
# rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
# now = time.time() + 2208988800
# then = now + 3600
# rdt['time'] = np.arange(now,then)
# rdt['temp'] = vfunc(rdt['time'])
# rdt['driver_timestamp'] = np.arange(now,then)
# rdt['preferred_timestamp'] = ['driver_timestamp'] * 3600
# rdt['pressure'] = np.random.randn(3600) * 0.5 + 10
# rdt['conductivity'] = [0] * 3600
#
# publisher.publish(rdt.to_granule())
# pn.dataset_management.register_dataset(dataset_id)
#
#
#
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment