Skip to content

Instantly share code, notes, and snippets.

@szczeles
Created March 31, 2020 15:26
Show Gist options
  • Save szczeles/61ece5afd79e9ac3e155e463dcf30ca8 to your computer and use it in GitHub Desktop.
Save szczeles/61ece5afd79e9ac3e155e463dcf30ca8 to your computer and use it in GitHub Desktop.
then.py
class TestIntegration(unittest.TestCase):
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
# when
exit_code = self.run_app("2020-03-31")
# then -> ETL succeeded
self.assertEqual(exit_code, 0)
# then -> verify one joined file
joined = self.spark.read.parquet(self.OUTPUT_JOINED)
self.assertEqual(joined.where(joined.dt == "2020-03-31").count(), 1)
record = joined.where(joined.dt == "2020-03-31").first()
self.assertEquals(record.ts, datetime(2020, 3, 31, 13, 15))
self.assertEquals(record.user_id, 'USER1')
self.assertEquals(record.name, 'Sample ad')
# then -> verify no invalid rows
invalid = self.spark.read.parquet(self.OUTPUT_INVALID)
self.assertEqual(invalid.where(invalid.dt == "2020-03-31").count(), 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment