Skip to content

Instantly share code, notes, and snippets.

@imsickofmaps
Created March 12, 2014 08:37
Show Gist options
  • Save imsickofmaps/9503054 to your computer and use it in GitHub Desktop.
Save imsickofmaps/9503054 to your computer and use it in GitHub Desktop.
def test_filter_pipeline_chaining(self):
fp = FilterPipeline([
DirectionalFilter('inbound').chain(
MSISDNFilter('from_addr', '+27817030792')),
TimestampFilter('2013-09-10 00:00:00', '2013-09-10 23:59:59')
])
stdin = StringIO(self.SAMPLE)
stdout = StringIO()
fp.process(stdin=stdin, stdout=stdout)
self.assertEqual(stdout.getvalue(), self.HEADER + self.INBOUND)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment