Skip to content

Instantly share code, notes, and snippets.

'''
@author MManning
@file ion/processes/data/transforms/ctd/ctd_L1_conductivity.py
@description Transforms CTD parsed data into L1 product for conductivity
'''
from pyon.ion.transforma import TransformDataProcess
from pyon.core.exception import BadRequest
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceProcessClient
@swarbhanu
swarbhanu / examply.py
Created July 19, 2012 19:36
An example script that launches the publisher that stores data in redis and streams it and the redis coordination transform that listens to those streams and removes the data from the same redis database.
class TransformEventListener(TransformEventProcess):
def on_start(self, event_type = None, event_origin = None, event_origin_type = None, event_subtype = None, algorithm = None):
self.listener = EventSubscriber( origin=event_origin,
origin_type = event_origin_type,
event_type=event_type,
sub_type=event_subtype,
callback=self.process_event )
self.listener.start()
@swarbhanu
swarbhanu / user_notification_sms_script.py
Created June 5, 2012 18:40
Script to get the user notification service send an sms to a phone as notification
#---------------------------------------------------------------------------------
# Create sms notification
#---------------------------------------------------------------------------------
from interface.objects import UserInfo
from pyon.event.event import EventPublisher
from ion.services.dm.presentation.user_notification_service import *
rr = pn.resource_registry
user = UserInfo(name = 'user')
@swarbhanu
swarbhanu / sending_actual_email_script.py
Created June 1, 2012 16:15
Script to send an actual email through the user notification service
from interface.objects import DeliveryMode, UserInfo, DetectionFilterConfig
from pyon.event.event import EventPublisher
#---------------------------------------------------------------------------------
# Create email notification
#---------------------------------------------------------------------------------
rr = pn.resource_registry
@swarbhanu
swarbhanu / user_notification_script.py
Created May 31, 2012 20:23
A sample script that shows how to set up email, sms and DetectionEvent notifications through User Notification Service
from interface.objects import DeliveryMode, UserInfo, DetectionFilterConfig
from pyon.util.int_test import IonIntegrationTestCase
from pyon.util.unit_test import PyonTestCase
from pyon.public import IonObject, RT, PRED, Container
from pyon.core.exception import NotFound, BadRequest
from nose.plugins.attrib import attr
from pyon.util.log import log
from pyon.event.event import EventPublisher
import gevent
from mock import Mock, mocksignature
@swarbhanu
swarbhanu / test_ingestion.py
Created February 15, 2012 17:08
Added skeleton test methods for stream policy in ingestion
#!/usr/bin/env python
'''
@file ion/services/dm/ingestion/test/test_ingestion.py
@author Swarbhanu Chatterjee
@test ion.services.dm.ingestion.ingestion_management_service test suite to cover all ingestion mgmt service code
'''
import gevent
from mock import Mock
@swarbhanu
swarbhanu / test_ingestion_worker_couch_integration.py
Created February 2, 2012 15:41
Integration test for ingestion workers. Tests that they write blog streams to couch, by reading back the data objects in couch after they have been written.
#!/usr/bin/env python
'''
@file ion/services/dm/ingestion/test/test_ingestion.py
@author Swarbhanu Chatterjee
@test ion.services.dm.ingestion.ingestion_management_service test suite to cover all ingestion mgmt service code
'''
import gevent
from mock import Mock, sentinel, patch
@swarbhanu
swarbhanu / ingestion_management_service.py
Created February 2, 2012 15:37
The ingestion_management_service.py. It has the class, IngestionWorker, near the bottom of the file.
#!/usr/bin/env python
__license__ = 'Apache 2.0'
'''
@author Maurice Manning
@author Swarbhanu Chatterjee
@file ion/services/dm/ingestion/ingestion_management_service.py
@description Implementation for IngestionManagementService
'''
from interface.services.dm.iingestion_management_service import BaseIngestionManagementService
from pyon.core.exception import NotFound
@swarbhanu
swarbhanu / ingestion_couch_script.py
Created February 2, 2012 15:33
The script file to launch the ingestion workers to launch blog streams and write them to couch.
from interface.services.dm.iingestion_management_service import IngestionManagementServiceClient
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient
from pyon.public import IonObject, RT, log, PRED
from pyon.ion.endpoint import ProcessPublisher
from pyon.public import log, StreamProcess
#########################################################################################
@swarbhanu
swarbhanu / test_integration_worker_couch_integration.py
Created February 1, 2012 23:22
Integration test for ingestion workers
#!/usr/bin/env python
'''
@file ion/services/dm/ingestion/test/test_ingestion.py
@author Swarbhanu Chatterjee
@test ion.services.dm.ingestion.ingestion_management_service test suite to cover all ingestion mgmt service code
'''
import gevent
from mock import Mock, sentinel, patch