Skip to content

Instantly share code, notes, and snippets.

@johngrimes
Last active July 9, 2024 04:28
Show Gist options
  • Save johngrimes/64c737f0cb0e8623ed91ac292729de06 to your computer and use it in GitHub Desktop.
Save johngrimes/64c737f0cb0e8623ed91ac292729de06 to your computer and use it in GitHub Desktop.
A proof-of-concept for the flexible schema approach described in the draft "Parquet on FHIR" specification
{
"resourceType": "Condition",
"id": "1",
"clinicalStatus": {
"coding": [
{
"system": "http://terminology.hl7.org/CodeSystem/condition-clinical",
"code": "active"
}
]
},
"subject": {
"reference": "Patient/13866099-28d7-1249-3e44-ecb490d40fef"
},
"onsetDateTime": "1998-11-08T01:26:08+10:00"
}
{
"resourceType": "Condition",
"id": "2",
"verificationStatus": {
"coding": [
{
"system": "http://terminology.hl7.org/CodeSystem/condition-ver-status",
"code": "confirmed"
}
]
},
"subject": {
"identifier": {
"system": "http://example.org",
"value": "123456"
},
"display": "Mr. Test Patient"
},
"recordedDate": "1998-11-08T01:26:08+10:00"
}
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.config(
"spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0"
)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.config("spark.databricks.delta.schema.autoMerge.enabled", "true")
.getOrCreate()
)
df1 = DeltaTable.forPath(spark, "data/delta/1.Condition.parquet")
df1.toDF().printSchema()
df2 = DeltaTable.forPath(spark, "data/delta/2.Condition.parquet")
df2.toDF().printSchema()
df1.alias("old").merge(
df2.alias("new").toDF(), "new.id = old.id"
).whenMatchedUpdateAll().execute()
df3 = DeltaTable.forPath(spark, "data/delta/1.Condition.parquet")
df3.toDF().printSchema()
root
|-- clinicalStatus: struct (nullable = true)
| |-- coding: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- code: string (nullable = true)
| | | |-- system: string (nullable = true)
|-- id: string (nullable = true)
|-- onsetDateTime: string (nullable = true)
|-- resourceType: string (nullable = true)
|-- subject: struct (nullable = true)
| |-- reference: string (nullable = true)
root
|-- id: string (nullable = true)
|-- recordedDate: string (nullable = true)
|-- resourceType: string (nullable = true)
|-- subject: struct (nullable = true)
| |-- display: string (nullable = true)
| |-- identifier: struct (nullable = true)
| | |-- system: string (nullable = true)
| | |-- value: string (nullable = true)
|-- verificationStatus: struct (nullable = true)
| |-- coding: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- code: string (nullable = true)
| | | |-- system: string (nullable = true)
24/07/09 11:45:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
root
|-- clinicalStatus: struct (nullable = true)
| |-- coding: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- code: string (nullable = true)
| | | |-- system: string (nullable = true)
|-- id: string (nullable = true)
|-- onsetDateTime: string (nullable = true)
|-- resourceType: string (nullable = true)
|-- subject: struct (nullable = true)
| |-- reference: string (nullable = true)
| |-- display: string (nullable = true)
| |-- identifier: struct (nullable = true)
| | |-- system: string (nullable = true)
| | |-- value: string (nullable = true)
|-- recordedDate: string (nullable = true)
|-- verificationStatus: struct (nullable = true)
| |-- coding: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- code: string (nullable = true)
| | | |-- system: string (nullable = true)
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
spark = SparkSession.builder.getOrCreate()
df1 = spark.read.parquet("data/parquet/1.Condition.parquet")
schema = df1.schema
df1.printSchema()
def safe_column(df, col_name):
if col_name in schema.names:
return df[col_name]
else:
return lit(None).alias(col_name)
result = df1.select(
safe_column(df1, "id"), safe_column(df1, "foo"), safe_column(df1, "onsetDateTime")
)
result.show(truncate=False)
+---+----+-------------------------+
|id |foo |onsetDateTime |
+---+----+-------------------------+
|1 |NULL|1998-11-08T01:26:08+10:00|
+---+----+-------------------------+
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.config(
"spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0"
)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.getOrCreate()
)
df1 = spark.read.json("data/json/1.Condition.json", multiLine=True)
df1.printSchema()
df1.write.format("delta").save("data/delta/1.Condition.parquet", mode="overwrite")
df2 = spark.read.json("data/json/2.Condition.json", multiLine=True)
df2.printSchema()
df2.write.format("delta").save("data/delta/2.Condition.parquet", mode="overwrite")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment