Skip to content

Instantly share code, notes, and snippets.

@pascalwhoop
Created October 20, 2019 15:00
Show Gist options
  • Save pascalwhoop/90bbc002715bb52c548baaeb5404fe12 to your computer and use it in GitHub Desktop.
Save pascalwhoop/90bbc002715bb52c548baaeb5404fe12 to your computer and use it in GitHub Desktop.
mport unittest
import apache_beam as beam
import cleanup_pipeline as cp
ec = cp.ElementCleanup()
sample_in = [
{
"battery_status": "12",
"bluetooth_status": "on",
"cell_id": "%CELLID",
"cell_strength": "0",
"gps_status": "on",
"last_app": "%LAPP",
"location_gps": "40.07467195951462,15.631200237930196",
"location_net": "50.8578066,4.4326876",
"location_accuracy": "0.0",
"altitude": "52.71343706168826",
"speed": "0.3097678",
"location_seconds": "%LOCTMS",
"timestamp": "1564727400",
}
]
sample_out = [
{
"battery_status": 12,
"bluetooth_status": "on",
"cell_id": None,
"cell_strength": 0,
"gps_status": "on",
"last_app": None,
"location_gps": "40.07467195951462,15.631200237930196",
"location_net": "50.8578066,4.4326876",
"location_accuracy": 0.0,
"altitude": 52.71343706168826,
"speed": 0.3097678,
"location_seconds": None,
"timestamp": 1564727400,
}
]
class TestCleanup(unittest.TestCase):
# more unit tests hidden...
def test_cleanup_step(self):
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.testing.test_pipeline import TestPipeline
# taken from
# https://beam.apache.org/get-started/wordcount-example/#testing-your-pipeline-with-asserts
with TestPipeline() as p:
assert_that(p | beam.Create(sample_in) | beam.ParDo(ec), equal_to(sample_out))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment