Skip to content

Instantly share code, notes, and snippets.

@topiaruss
Last active June 21, 2016 15:57
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save topiaruss/264015387b69f867c4bf to your computer and use it in GitHub Desktop.
Save topiaruss/264015387b69f867c4bf to your computer and use it in GitHub Desktop.
Experimenting with minimal test frameworks for spout and bolt
[pytest]
python_paths = src/bolts src/spouts
testpaths = src/tests
pytest==2.8.7
pytest-mock==0.10.1
pytest-pythonpath==0.7
import json
from io import BytesIO
from mock import call
from streamparse.storm import Tuple, Bolt
from wordcount import WordCounter
def prep_tup(id, comp, stream, task, tuple):
"""
Prepare a test tuple and a terminated json equivalent
"""
tup_dict = dict(id=id, comp=comp, stream=stream, task=task, tuple=tuple)
tup_json = "{}\nend\n".format(json.dumps(tup_dict)).encode('utf-8')
tup = Tuple(tup_dict['id'], tup_dict['comp'],
tup_dict['stream'], tup_dict['task'],
tup_dict['tuple'],)
return tup, tup_json
def test_wordcount_basic(mocker):
mocker.patch.object(Bolt, 'ack', autospec=True)
mocker.patch.object(Bolt, 'read_task_ids', autospec=True)
mocker.patch.object(Bolt, 'send_message', autospec=True)
tup, tupj = prep_tup(14, 'word_spout', 'words', 'some_bolt', ['test_word'])
bolt = WordCounter(input_stream=BytesIO(tupj),
output_stream=BytesIO())
bolt.initialize({}, {})
bolt._run()
assert Bolt.send_message.call_args_list[0] == \
call(bolt, {u'command': u'emit',
u'anchors': [tup.id],
u'tuple': ['test_word', 1]})
assert Bolt.send_message.call_args_list[1] == \
call(bolt, {u'msg': 'test_word: 1',
u'command': u'log',
u'level': 2})
import json
from io import BytesIO
from mock import call
from streamparse.storm import Spout
from words import WordSpout
def prep_spout_tup(command):
"""
Prepare a test tuple and a terminated json equivalent
"""
tup_dict = dict(command=command)
tup_json = "{}\nend\n".format(json.dumps(tup_dict)).encode('utf-8')
return tup_json
def test_words_spout_basic(mocker):
mocker.patch.object(Spout, 'ack', autospec=True)
mocker.patch.object(Spout, 'emit', autospec=True)
mocker.patch.object(Spout, 'emit_many', autospec=True)
mocker.patch.object(Spout, 'send_message', autospec=True)
tupj = prep_spout_tup('next')
spout = WordSpout(input_stream=BytesIO(tupj), output_stream=BytesIO())
spout.initialize({}, {})
spout._run()
assert Spout.send_message.call_args_list[0] == \
call(spout, {u'command': u'sync'})
assert Spout.emit.call_args_list[0] == \
call(spout, [u'dog'])
@topiaruss
Copy link
Author

pip install -r requirements-test.txt
py.test

@amontalenti
Copy link

This is a really nice idea. Thanks for sharing, @topiaruss

@topiaruss
Copy link
Author

@amontalenti Most welcome. I encourage exchange of ideas about testing patterns for SP. Can you suggest a better location for that?

@topiaruss
Copy link
Author

Added example for testing a spout

Copy link

ghost commented Feb 21, 2016

nice,
Wont Tupe(**tupl_dict) streamline that generator a bit though?

@topiaruss
Copy link
Author

@mathster, not that helpful. Tuple does not want kv pairs, and order is significant, so .values() breaks.

@topiaruss
Copy link
Author

For anyone still watching this, I'm creating a pytest-based test module for each Component (Bolt or Spout), and emphasising testing of emits, rather than send_messages, which takes a lot of time to maintain, if logging changes.

I'm finding it really useful for testing stream names, and also exercising different branches of process() methods, when multiple streams are configured in a Bolt. I can produce 100% test coverage really easily, and pdb right into logic that I'm developing.

@amontalenti
Copy link

Adding @kbourgoin and @dan-blanchard here so they can see what you're up to.

@dan-blanchard
Copy link

This is really nice. I've been wanting to put something together to help people with testing but just hadn't gotten around to it yet. We'll probably want to add this to pystorm instead of streamparse in the end.

@tdhopper
Copy link

@topiaruss Have you done more on this. I'm interested!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment