Created
July 1, 2015 13:35
-
-
Save JayH5/ab3451de12ee9fdcfba4 to your computer and use it in GitHub Desktop.
MessageSequenceHelper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
from twisted.internet.defer import inlineCallbacks, returnValue | |
from zope.interface import implements | |
from vumi.tests.helpers import IHelper | |
from vumi.message import format_vumi_date | |
class MessageSequenceHelper(object): | |
implements(IHelper) | |
def __init__(self, backend, msg_helper): | |
self._backend = backend | |
self._msg_helper = msg_helper | |
def setup(self, *args, **kwargs): | |
pass | |
def cleanup(self): | |
pass | |
@inlineCallbacks | |
def create_inbound_message_sequence(self): | |
""" | |
Create a sequence of 5 inbound messages in a new batch and add them to | |
the backend. Returns the batch id and the list of message keys. | |
""" | |
batch_id = yield self._backend.batch_start() | |
all_keys = [] | |
start = (datetime.utcnow().replace(microsecond=0) - | |
timedelta(seconds=10)) | |
for i in xrange(5): | |
timestamp = start + timedelta(seconds=i) | |
addr = "addr%s" % (i,) | |
msg = self._msg_helper.make_inbound( | |
"Message %s" % (i,), timestamp=timestamp, from_addr=addr) | |
yield self._backend.add_inbound_message(msg, batch_ids=[batch_id]) | |
all_keys.append((msg["message_id"], # Key | |
format_vumi_date(timestamp), # Timestamp | |
addr)) # Address | |
returnValue((batch_id, all_keys)) | |
@inlineCallbacks | |
def create_outbound_message_sequence(self): | |
""" | |
Create a sequence of 5 outbound messages in a new batch and add them to | |
the backend. Returns the batch id and the list of message keys. | |
""" | |
batch_id = yield self._backend.batch_start() | |
all_keys = [] | |
start = (datetime.utcnow().replace(microsecond=0) - | |
timedelta(seconds=10)) | |
for i in xrange(5): | |
timestamp = start + timedelta(seconds=i) | |
addr = "addr%s" % (i,) | |
msg = self._msg_helper.make_inbound( | |
"Message %s" % (i,), timestamp=timestamp, to_addr=addr) | |
yield self._backend.add_outbound_message(msg, batch_ids=[batch_id]) | |
all_keys.append((msg["message_id"], # Key | |
format_vumi_date(timestamp), # Timestamp | |
addr)) # Address | |
returnValue((batch_id, all_keys)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment