Created
August 25, 2022 03:24
-
-
Save rjurney/6d4999a4b114f90be44c934e35cf1e2e to your computer and use it in GitHub Desktop.
Test code for attempt at PyDantic ETL code for PySpark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_graphlet_etl(spark_session_context) -> None: | |
"""Test the classes with Spark UDFs.""" | |
spark, sc = spark_session_context | |
@F.pandas_udf("long") | |
def text_runtime_to_minutes_pandas_udf(x: pd.Series) -> pd.Series: | |
"""text_runtime_to_minutes_pandas_udf PySpark pandas_udf to run text_runtime_to_minutes. | |
Parameters | |
---------- | |
x: pd.Series | |
Column to run text_runtime_to_minutes on | |
Returns | |
------- | |
pd.Series | |
a Column of integers | |
""" | |
return x.apply(text_runtime_to_minutes) | |
# Movie awards | |
awards = spark.read.option("header", "true").csv("tests/data/awards.csv") | |
awards.show() | |
# A genre of movies | |
comedies = spark.read.option("header", "true").csv("tests/data/comedy.csv") | |
comedies.show() | |
# Another genre of movies | |
horror = spark.read.option("header", "true").csv("tests/data/horror.csv") | |
horror.show() | |
class Movie(NodeBase): | |
"""A film node in hollywood.""" | |
entity_type: str = "movie" | |
genre: str | |
title: str | |
year: str | |
length: int = 0 | |
gross: int = 0 | |
rating: str | |
@validator("length", pre=True) | |
def convert_hours_minutes_to_int_minutes(cls, x): | |
if x and isinstance(x, str): | |
x = text_runtime_to_minutes(x) | |
return x | |
class Person(NodeBase): | |
"""A class about people in the film industry.""" | |
entity_type = "person" | |
name: str | |
role: str | |
class Directed(EdgeBase): | |
"""A director directed a movie.""" | |
entity_type = "directed" | |
class ActedIn(EdgeBase): | |
"""An actor acted in a movie.""" | |
entity_type = "acted_in" | |
def horror_to_movie(dfs: typing.Iterable[pd.DataFrame]) -> typing.Iterable[pd.DataFrame]: | |
"""horror_to_movie Iterates entire pandas DataFrames, transforming from original schema to the pydantic schema. | |
Parameters | |
---------- | |
x : typing.Iterable[pd.DataFrame] | |
Accepts an iterable of pd.DataFrames | |
Returns | |
------- | |
typing.Iterable[pd.DataFrame] | |
Yields | |
------ | |
Iterator[typing.Iterable[pd.DataFrame]] | |
pd.DataFrames of the new schema | |
""" | |
def map_to_movie(x): | |
"""Map the DataFrame rows to a Movie, validate them, then back out to a dict.""" | |
print(x, type(x)) | |
return Movie( | |
genre="horror", | |
title=x["Title"], | |
year=x["Year"], | |
length=x["Length"], | |
rating=x["Rating"], | |
).dict() | |
for df in dfs: | |
# yield pd.concat(pd.DataFrame(x) for x in df.map(map_to_movie)) | |
yield df.apply(map_to_movie, axis=1) | |
movie_schema = T.StructType.fromJson(Movie.spark_schema()) | |
horror_movies = horror.mapInPandas(horror_to_movie, schema=movie_schema) | |
horror_movies.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment