Skip to content

Instantly share code, notes, and snippets.

@swarbhanu
Created May 31, 2012 20:23
Show Gist options
  • Save swarbhanu/2845961 to your computer and use it in GitHub Desktop.
Save swarbhanu/2845961 to your computer and use it in GitHub Desktop.
A sample script that shows how to set up email, sms and DetectionEvent notifications through User Notification Service
from interface.objects import DeliveryMode, UserInfo, DetectionFilterConfig
from pyon.util.int_test import IonIntegrationTestCase
from pyon.util.unit_test import PyonTestCase
from pyon.public import IonObject, RT, PRED, Container
from pyon.core.exception import NotFound, BadRequest
from nose.plugins.attrib import attr
from pyon.util.log import log
from pyon.event.event import EventPublisher
import gevent
from mock import Mock, mocksignature
import gevent
from gevent.timeout import Timeout
#---------------------------------------------------------------------------------
# Create email notification
#---------------------------------------------------------------------------------
rr = pn.resource_registry
user = UserInfo(name = 'user')
user_id, _ = rr.create(user)
uns = pn.user_notification
notification_id = uns.create_email(event_type='ResourceLifecycleEvent',
event_subtype=None,
origin='Some_Resource_Agent_ID1',
origin_type=None,
user_id=user_id,
email='email@email.com',
mode = DeliveryMode.DIGEST,
message_header='message_header',
parser='parser',
period=1)
#-------------------------------------------------------
# publish an event for each notification to generate the emails
#-------------------------------------------------------
rle_publisher = EventPublisher("ResourceLifecycleEvent")
rle_publisher.publish_event(origin='Some_Resource_Agent_ID1', description="RLE test event for email")
#-------------------------------------------------------
# grab the email message generated
#-------------------------------------------------------
msg_tuple = uns.event_processors[notification_id].smtp_client.sentmail.get(timeout=4)
print msg_tuple
assert uns.event_processors[notification_id].smtp_client.sentmail.empty() == True
message = msg_tuple[2]
list_lines = message.split("\n")
#-------------------------------------------------------
# parse the message body
#-------------------------------------------------------
message_dict = {}
for line in list_lines:
key_item = line.split(": ")
if key_item[0] == 'Subject':
message_dict['Subject'] = key_item[1] + key_item[2]
else:
try:
message_dict[key_item[0]] = key_item[1]
except IndexError as exc:
# these IndexError exceptions happen only because the message sometimes
# has successive /r/n (i.e. new lines) and therefore,
# the indexing goes out of range. These new lines
# can just be ignored. So we ignore the exceptions here.
pass
#-------------------------------------------------------
# print the parsed message body of the email generated
#-------------------------------------------------------
print "message_dict: ", message_dict
#---------------------------------------------------------------------------------
#################################################################################
#---------------------------------------------------------------------------------
#---------------------------------------------------------------------------------
# Create sms notification
#---------------------------------------------------------------------------------
notification_id = uns.create_sms(event_type='ResourceLifecycleEvent',
event_subtype=None,
origin='Some_Resource_Agent_ID2',
origin_type=None,
user_id=user_id,
phone = '401-XXX-XXXX',
provider='T-Mobile',
message_header='message_header',
parser='parser',
)
#-------------------------------------------------------
# publish an event for each notification to generate the emails
#-------------------------------------------------------
rle_publisher = EventPublisher("ResourceLifecycleEvent")
rle_publisher.publish_event(origin='Some_Resource_Agent_ID2', description="RLE test event for sms")
#-------------------------------------------------------
# grab the email message generated
#-------------------------------------------------------
msg_tuple = uns.event_processors[notification_id].smtp_client.sentmail.get(timeout=4)
assert uns.event_processors[notification_id].smtp_client.sentmail.empty() == True
message = msg_tuple[2]
list_lines = message.split("\n")
#-------------------------------------------------------
# parse the message body
#-------------------------------------------------------
message_dict = {}
for line in list_lines:
key_item = line.split(": ")
if key_item[0] == 'Subject':
message_dict['Subject'] = key_item[1] + key_item[2]
else:
try:
message_dict[key_item[0]] = key_item[1]
except IndexError as exc:
# these IndexError exceptions happen only because the message sometimes
# has successive /r/n (i.e. new lines) and therefore,
# the indexing goes out of range. These new lines
# can just be ignored. So we ignore the exceptions here.
pass
#-------------------------------------------------------
# print the parsed message body of the email generated
#-------------------------------------------------------
print "message_dict: ", message_dict
#---------------------------------------------------------------------------------
#################################################################################
#---------------------------------------------------------------------------------
#---------------------------------------------------------------------------------
# Create DetectionEvent Notification
#---------------------------------------------------------------------------------
# Create detection notification
dfilt = DetectionFilterConfig()
dfilt.processing['condition'] = 5
dfilt.processing['comparator'] = '>'
dfilt.processing['filter_field'] = 'voltage'
dfilt.delivery['message'] = 'I got my detection event!'
notification_id = uns.create_detection_filter(event_type='ExampleDetectableEvent',
event_subtype=None,
origin='Some_Resource_Agent_ID3',
origin_type=None,
user_id=user_id,
filter_config=dfilt
)
#---------------------------------------------------------------------------------
# Create event subscription for resulting detection event
#---------------------------------------------------------------------------------
# Create an email notification so that when the DetectionEventProcessor
# detects an event and fires its own output event, this will caught by an
# EmailEventProcessor and an email will be sent to the user
notification_id_2 = uns.create_email(event_type='DetectionEvent',
event_subtype=None,
origin='DetectionEventProcessor',
origin_type=None,
user_id=user_id,
email='email@email.com',
mode = DeliveryMode.UNFILTERED,
message_header='Detection event',
parser='parser',
period=1)
# Send event that is not detected
# publish an event for each notification to generate the emails
rle_publisher = EventPublisher("ExampleDetectableEvent")
# since the voltage field in this event is less than 5, it will not be detected
rle_publisher.publish_event(origin='Some_Resource_Agent_ID1',
description="RLE test event",
voltage = 3)
# this event does not generate a DetectionEvent
# to check this, we can do:
assert uns.event_processors[notification_id_2].smtp_client.sentmail.empty() == True
# Send Event that is detected
# publish an event for each notification to generate the emails
# since the voltage field in this event is greater than 5, it WILL be detected
rle_publisher = EventPublisher("ExampleDetectableEvent")
rle_publisher.publish_event(origin='Some_Resource_Agent_ID3',
description="RLE test event",
voltage = 10)
#-------------------------------------------------------
# make assertions
#-------------------------------------------------------
msg_tuple = uns.event_processors[notification_id_2].smtp_client.sentmail.get(timeout=4)
# The first event never triggered an email because the voltage was less than 5, the queue is now empty
assert uns.event_processors[notification_id_2].smtp_client.sentmail.empty() == True
print "The email address to which email is going: msg_tuple[1] = ", msg_tuple[1]
print "The email address from where the email is going: msg_tuple[1] = ", msg_tuple[0]
# parse the message body
message = msg_tuple[2]
list_lines = message.split("\n")
message_dict = {}
for line in list_lines:
key_item = line.split(": ")
if key_item[0] == 'Subject':
message_dict['Subject'] = key_item[1] + key_item[2]
else:
try:
message_dict[key_item[0]] = key_item[1]
except IndexError as exc:
# these IndexError exceptions happen only because the message sometimes
# has successive /r/n (i.e. new lines) and therefore,
# the indexing goes out of range. These new lines
# can just be ignored. So we ignore the exceptions here.
pass
print "The parsed message body of email generated by DetectionEventProcessor: ", message_dict
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment