Skip to content

Instantly share code, notes, and snippets.

@JayH5
Created July 1, 2015 13:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JayH5/ab3451de12ee9fdcfba4 to your computer and use it in GitHub Desktop.
Save JayH5/ab3451de12ee9fdcfba4 to your computer and use it in GitHub Desktop.
MessageSequenceHelper
from datetime import datetime, timedelta
from twisted.internet.defer import inlineCallbacks, returnValue
from zope.interface import implements
from vumi.tests.helpers import IHelper
from vumi.message import format_vumi_date
class MessageSequenceHelper(object):
implements(IHelper)
def __init__(self, backend, msg_helper):
self._backend = backend
self._msg_helper = msg_helper
def setup(self, *args, **kwargs):
pass
def cleanup(self):
pass
@inlineCallbacks
def create_inbound_message_sequence(self):
"""
Create a sequence of 5 inbound messages in a new batch and add them to
the backend. Returns the batch id and the list of message keys.
"""
batch_id = yield self._backend.batch_start()
all_keys = []
start = (datetime.utcnow().replace(microsecond=0) -
timedelta(seconds=10))
for i in xrange(5):
timestamp = start + timedelta(seconds=i)
addr = "addr%s" % (i,)
msg = self._msg_helper.make_inbound(
"Message %s" % (i,), timestamp=timestamp, from_addr=addr)
yield self._backend.add_inbound_message(msg, batch_ids=[batch_id])
all_keys.append((msg["message_id"], # Key
format_vumi_date(timestamp), # Timestamp
addr)) # Address
returnValue((batch_id, all_keys))
@inlineCallbacks
def create_outbound_message_sequence(self):
"""
Create a sequence of 5 outbound messages in a new batch and add them to
the backend. Returns the batch id and the list of message keys.
"""
batch_id = yield self._backend.batch_start()
all_keys = []
start = (datetime.utcnow().replace(microsecond=0) -
timedelta(seconds=10))
for i in xrange(5):
timestamp = start + timedelta(seconds=i)
addr = "addr%s" % (i,)
msg = self._msg_helper.make_inbound(
"Message %s" % (i,), timestamp=timestamp, to_addr=addr)
yield self._backend.add_outbound_message(msg, batch_ids=[batch_id])
all_keys.append((msg["message_id"], # Key
format_vumi_date(timestamp), # Timestamp
addr)) # Address
returnValue((batch_id, all_keys))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment