Skip to content

Instantly share code, notes, and snippets.

@rjurney
Created August 25, 2022 03:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjurney/6d4999a4b114f90be44c934e35cf1e2e to your computer and use it in GitHub Desktop.
Save rjurney/6d4999a4b114f90be44c934e35cf1e2e to your computer and use it in GitHub Desktop.
Test code for attempt at PyDantic ETL code for PySpark
def test_graphlet_etl(spark_session_context) -> None:
"""Test the classes with Spark UDFs."""
spark, sc = spark_session_context
@F.pandas_udf("long")
def text_runtime_to_minutes_pandas_udf(x: pd.Series) -> pd.Series:
"""text_runtime_to_minutes_pandas_udf PySpark pandas_udf to run text_runtime_to_minutes.
Parameters
----------
x: pd.Series
Column to run text_runtime_to_minutes on
Returns
-------
pd.Series
a Column of integers
"""
return x.apply(text_runtime_to_minutes)
# Movie awards
awards = spark.read.option("header", "true").csv("tests/data/awards.csv")
awards.show()
# A genre of movies
comedies = spark.read.option("header", "true").csv("tests/data/comedy.csv")
comedies.show()
# Another genre of movies
horror = spark.read.option("header", "true").csv("tests/data/horror.csv")
horror.show()
class Movie(NodeBase):
"""A film node in hollywood."""
entity_type: str = "movie"
genre: str
title: str
year: str
length: int = 0
gross: int = 0
rating: str
@validator("length", pre=True)
def convert_hours_minutes_to_int_minutes(cls, x):
if x and isinstance(x, str):
x = text_runtime_to_minutes(x)
return x
class Person(NodeBase):
"""A class about people in the film industry."""
entity_type = "person"
name: str
role: str
class Directed(EdgeBase):
"""A director directed a movie."""
entity_type = "directed"
class ActedIn(EdgeBase):
"""An actor acted in a movie."""
entity_type = "acted_in"
def horror_to_movie(dfs: typing.Iterable[pd.DataFrame]) -> typing.Iterable[pd.DataFrame]:
"""horror_to_movie Iterates entire pandas DataFrames, transforming from original schema to the pydantic schema.
Parameters
----------
x : typing.Iterable[pd.DataFrame]
Accepts an iterable of pd.DataFrames
Returns
-------
typing.Iterable[pd.DataFrame]
Yields
------
Iterator[typing.Iterable[pd.DataFrame]]
pd.DataFrames of the new schema
"""
def map_to_movie(x):
"""Map the DataFrame rows to a Movie, validate them, then back out to a dict."""
print(x, type(x))
return Movie(
genre="horror",
title=x["Title"],
year=x["Year"],
length=x["Length"],
rating=x["Rating"],
).dict()
for df in dfs:
# yield pd.concat(pd.DataFrame(x) for x in df.map(map_to_movie))
yield df.apply(map_to_movie, axis=1)
movie_schema = T.StructType.fromJson(Movie.spark_schema())
horror_movies = horror.mapInPandas(horror_to_movie, schema=movie_schema)
horror_movies.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment