Skip to content

Instantly share code, notes, and snippets.

@topiaruss
Last active June 21, 2016 15:57
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save topiaruss/264015387b69f867c4bf to your computer and use it in GitHub Desktop.
Save topiaruss/264015387b69f867c4bf to your computer and use it in GitHub Desktop.
Experimenting with minimal test frameworks for spout and bolt
[pytest]
python_paths = src/bolts src/spouts
testpaths = src/tests
pytest==2.8.7
pytest-mock==0.10.1
pytest-pythonpath==0.7
import json
from io import BytesIO
from mock import call
from streamparse.storm import Tuple, Bolt
from wordcount import WordCounter
def prep_tup(id, comp, stream, task, tuple):
"""
Prepare a test tuple and a terminated json equivalent
"""
tup_dict = dict(id=id, comp=comp, stream=stream, task=task, tuple=tuple)
tup_json = "{}\nend\n".format(json.dumps(tup_dict)).encode('utf-8')
tup = Tuple(tup_dict['id'], tup_dict['comp'],
tup_dict['stream'], tup_dict['task'],
tup_dict['tuple'],)
return tup, tup_json
def test_wordcount_basic(mocker):
mocker.patch.object(Bolt, 'ack', autospec=True)
mocker.patch.object(Bolt, 'read_task_ids', autospec=True)
mocker.patch.object(Bolt, 'send_message', autospec=True)
tup, tupj = prep_tup(14, 'word_spout', 'words', 'some_bolt', ['test_word'])
bolt = WordCounter(input_stream=BytesIO(tupj),
output_stream=BytesIO())
bolt.initialize({}, {})
bolt._run()
assert Bolt.send_message.call_args_list[0] == \
call(bolt, {u'command': u'emit',
u'anchors': [tup.id],
u'tuple': ['test_word', 1]})
assert Bolt.send_message.call_args_list[1] == \
call(bolt, {u'msg': 'test_word: 1',
u'command': u'log',
u'level': 2})
import json
from io import BytesIO
from mock import call
from streamparse.storm import Spout
from words import WordSpout
def prep_spout_tup(command):
"""
Prepare a test tuple and a terminated json equivalent
"""
tup_dict = dict(command=command)
tup_json = "{}\nend\n".format(json.dumps(tup_dict)).encode('utf-8')
return tup_json
def test_words_spout_basic(mocker):
mocker.patch.object(Spout, 'ack', autospec=True)
mocker.patch.object(Spout, 'emit', autospec=True)
mocker.patch.object(Spout, 'emit_many', autospec=True)
mocker.patch.object(Spout, 'send_message', autospec=True)
tupj = prep_spout_tup('next')
spout = WordSpout(input_stream=BytesIO(tupj), output_stream=BytesIO())
spout.initialize({}, {})
spout._run()
assert Spout.send_message.call_args_list[0] == \
call(spout, {u'command': u'sync'})
assert Spout.emit.call_args_list[0] == \
call(spout, [u'dog'])
@tdhopper
Copy link

@topiaruss Have you done more on this. I'm interested!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment