Created
January 3, 2019 07:58
-
-
Save korkridake/972e315e5ce094096e17c6ad1ef599fd to your computer and use it in GitHub Desktop.
How to melt Spark DataFrame?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import array, col, explode, lit, struct | |
from pyspark.sql import DataFrame | |
from typing import Iterable | |
def melt( | |
df: DataFrame, | |
id_vars: Iterable[str], value_vars: Iterable[str], | |
var_name: str="variable", value_name: str="value") -> DataFrame: | |
""" | |
Convert :class:`DataFrame` from wide to long format. | |
Source: https://stackoverflow.com/questions/41670103/how-to-melt-spark-dataframe | |
""" | |
# ------------------------------------------------------------------------------- | |
# Create array<struct<variable: str, value: ...>> | |
# ------------------------------------------------------------------------------- | |
_vars_and_vals = array(*( | |
struct(lit(c).alias(var_name), col(c).alias(value_name)) | |
for c in value_vars)) | |
# ------------------------------------------------------------------------------- | |
# Add to the DataFrame and explode | |
# ------------------------------------------------------------------------------- | |
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) | |
cols = id_vars + [ | |
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]] | |
return _tmp.select(*cols) | |
# ------------------------------------------------------------------------------- | |
# Let's Implement Wide to Long in Pyspark! | |
# ------------------------------------------------------------------------------- | |
melt(df_web_browsing_full_test, | |
id_vars=['ID_variable'], | |
value_vars=['VALUE_variable_1', 'VALUE_variable_2']).show() |
from pyspark.sql.functions import array, col, explode, lit, struct
def melt(df, id_vars, value_vars, var_name, value_name):
"""Convert :class:`DataFrame` from wide to long format."""
_vars_and_vals = F.array(*[F.struct(F.lit(c).alias(var_name),
F.col(c).alias(value_name))
for c in value_vars])
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", F.explode(_vars_and_vals))
cols = id_vars + [F.col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
This function will fail if the variable has either a full stop in it or two adjacent spaces. Any idea how to exception them out?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I am unable to use this solution here. Could you perhaps help me with the logic?