Skip to content

Instantly share code, notes, and snippets.

@topiaruss
Last active June 21, 2016 15:57
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save topiaruss/264015387b69f867c4bf to your computer and use it in GitHub Desktop.
Save topiaruss/264015387b69f867c4bf to your computer and use it in GitHub Desktop.
Experimenting with minimal test frameworks for spout and bolt
[pytest]
python_paths = src/bolts src/spouts
testpaths = src/tests
pytest==2.8.7
pytest-mock==0.10.1
pytest-pythonpath==0.7
import json
from io import BytesIO
from mock import call
from streamparse.storm import Tuple, Bolt
from wordcount import WordCounter
def prep_tup(id, comp, stream, task, tuple):
"""
Prepare a test tuple and a terminated json equivalent
"""
tup_dict = dict(id=id, comp=comp, stream=stream, task=task, tuple=tuple)
tup_json = "{}\nend\n".format(json.dumps(tup_dict)).encode('utf-8')
tup = Tuple(tup_dict['id'], tup_dict['comp'],
tup_dict['stream'], tup_dict['task'],
tup_dict['tuple'],)
return tup, tup_json
def test_wordcount_basic(mocker):
mocker.patch.object(Bolt, 'ack', autospec=True)
mocker.patch.object(Bolt, 'read_task_ids', autospec=True)
mocker.patch.object(Bolt, 'send_message', autospec=True)
tup, tupj = prep_tup(14, 'word_spout', 'words', 'some_bolt', ['test_word'])
bolt = WordCounter(input_stream=BytesIO(tupj),
output_stream=BytesIO())
bolt.initialize({}, {})
bolt._run()
assert Bolt.send_message.call_args_list[0] == \
call(bolt, {u'command': u'emit',
u'anchors': [tup.id],
u'tuple': ['test_word', 1]})
assert Bolt.send_message.call_args_list[1] == \
call(bolt, {u'msg': 'test_word: 1',
u'command': u'log',
u'level': 2})
import json
from io import BytesIO
from mock import call
from streamparse.storm import Spout
from words import WordSpout
def prep_spout_tup(command):
"""
Prepare a test tuple and a terminated json equivalent
"""
tup_dict = dict(command=command)
tup_json = "{}\nend\n".format(json.dumps(tup_dict)).encode('utf-8')
return tup_json
def test_words_spout_basic(mocker):
mocker.patch.object(Spout, 'ack', autospec=True)
mocker.patch.object(Spout, 'emit', autospec=True)
mocker.patch.object(Spout, 'emit_many', autospec=True)
mocker.patch.object(Spout, 'send_message', autospec=True)
tupj = prep_spout_tup('next')
spout = WordSpout(input_stream=BytesIO(tupj), output_stream=BytesIO())
spout.initialize({}, {})
spout._run()
assert Spout.send_message.call_args_list[0] == \
call(spout, {u'command': u'sync'})
assert Spout.emit.call_args_list[0] == \
call(spout, [u'dog'])
@dan-blanchard
Copy link

This is really nice. I've been wanting to put something together to help people with testing but just hadn't gotten around to it yet. We'll probably want to add this to pystorm instead of streamparse in the end.

@tdhopper
Copy link

@topiaruss Have you done more on this. I'm interested!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment