Skip to content

Instantly share code, notes, and snippets.

@ian-whitestone
Last active July 29, 2020 16:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ian-whitestone/18443c4b8338d8ac0b5f720b24d9de77 to your computer and use it in GitHub Desktop.
Save ian-whitestone/18443c4b8338d8ac0b5f720b24d9de77 to your computer and use it in GitHub Desktop.
Test starscream stages locally with `dev console --local`
from __future__ import unicode_literals
import pytest
import starscream.reusable_stages as RS
from starscream.contract import Contract
from starscream.utils.dataframe import as_dicts, from_dicts
import pyspark.sql.types as T
contract = Contract({
'id': {'type': T.LongType()},
'foo': {'type': T.StringType(), 'nullable': True},
'bar': {'type': T.StringType(), 'nullable': True},
})
input_df = from_dicts(sc, contract, [
{'id': 1, 'foo': 'US', 'bar': 'US'}, # should get filtered out
{'id': 2, 'foo': 'CA', 'bar': 'CA'}, # should get filtered out
{'id': 3, 'foo': 'CA', 'bar': None},
{'id': 4, 'foo': None, 'bar': 'CA'},
{'id': 5, 'foo': None, 'bar': None},
])
input_df.show()
# Filter to cases where foo/bar address are NOT the same (or both are null)
stage = RS.FilterRows(
logical_expression=RS.Any(
RS.Not(RS.EqualToColumn('foo', 'bar')),
RS.IsNull('foo'),
RS.IsNull('bar')
),
)
output_df = stage.run({'__output__': input_df})['__output__']
output_df.show()
from __future__ import unicode_literals
import pytest
from starscream.pipeline.stage import TransformStage
from pyspark.sql import functions as F, types as T
from starscream.contract import Contract
from starscream.utils.dataframe import as_dicts, from_dicts
import pyspark.sql.types as T
contract = Contract({
'foo': {'type': T.LongType()},
'bar': {'type': T.LongType()},
})
input_df = from_dicts(sc, contract, [
{'foo': 1, 'bar': 2},
{'foo': 2, 'bar': 3},
{'foo': 3, 'bar': 4},
{'foo': 4, 'bar': 5},
{'foo': 5, 'bar': 6},
])
input_df.select(['foo', 'bar']).show()
class MyStage(TransformStage):
OUTPUT = Contract({
'foo': {'type': T.LongType()},
'bar': {'type': T.LongType()},
'baz': {'type': T.LongType()},
})
def apply(self, sc, my_input_df):
return (
my_input_df
.withColumn('baz', F.col('foo') + F.col('bar'))
)
output_df = MyStage().apply(sc, input_df)
output_df.select(['foo', 'bar', 'baz']).show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment