Skip to content

Instantly share code, notes, and snippets.

@johngrimes
Last active December 14, 2023 23:08
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save johngrimes/60ca2c1dadeb16b3ad2e1668a557148e to your computer and use it in GitHub Desktop.
Save johngrimes/60ca2c1dadeb16b3ad2e1668a557148e to your computer and use it in GitHub Desktop.
Method for checking for inactive codes within a set of JSON FHIR resources

Dependencies

Step 1

bash get-codings.sh fhir/*.json >codings.ndjson

Step 2

python find-inactives.py --input codings.ndjson --output inactives.csv
import argparse
from pathling import PathlingContext, property_of, PropertyType
from pyspark.sql.functions import array_contains
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType
# Set up argument parser.
parser = argparse.ArgumentParser(description='Process coding data.')
parser.add_argument('--input', type=str, help='Input NDJSON file', required=True)
parser.add_argument('--output', type=str, help='Output CSV file', required=True)
# Parse arguments.
args = parser.parse_args()
# Create a Pathling context for processing FHIR data.
pc = PathlingContext.create()
# Define the schema for a 'Coding' object in FHIR.
coding_schema = StructType([
StructField("id", StringType(), True),
StructField("system", StringType(), True),
StructField("version", StringType(), True),
StructField("code", StringType(), True),
StructField("display", StringType(), True),
StructField("userSelected", BooleanType(), True)
])
# Define the schema for the main structure which includes 'Coding' as a nested structure.
schema = StructType([
StructField("file", StringType(), True),
StructField("line", IntegerType(), True),
StructField("coding", coding_schema, True)
])
# Read the JSONL data into a DataFrame with the specified schema.
codings = pc.spark.read.json(args.input, schema=schema)
# Add a new column 'inactive' which checks if the 'coding' array contains 'inactive' boolean property.
with_inactive = codings.withColumn("inactive", array_contains(
property_of(codings.coding, "inactive", PropertyType.BOOLEAN), True))
# Select specific fields from the DataFrame and rename some for clarity.
result = with_inactive.select(
with_inactive.file,
with_inactive.line,
with_inactive.coding.getField("system").alias("system"),
with_inactive.coding.getField("code").alias("code"))
# Filter for rows where 'inactive' is True and write the result to a CSV file.
result.filter(with_inactive.inactive).repartition(1).write.csv(args.output, header=True)
#!/usr/bin/env bash
set -xe
# Takes a list of JSON files as arguments and outputs NDJSON with file, line and coding.
# Loop through each file path provided as an argument to the script.
for FILENAME in "$@"; do
# Use jq to process the JSON file.
# --compact-output: Produces more compact output
# '..': Recursively process all objects and arrays in the JSON
# '.coding?, .valueCoding?': Select the 'coding' and 'valueCoding' fields, '?' ensures no error if the field is absent
# 'select(type == "array")[]': Filter out only arrays and expand them to individual elements
# '{file: $FILENAME, line: input_line_number, coding: .}': For each coding object, create a new object containing the file name, approximate line number, and the coding object itself
# --arg FILENAME "$FILENAME": Passes the filename to jq as a variable
jq --compact-output '.. | (.coding?, .valueCoding?, .contains?) | select(type == "array")[] | {file: $FILENAME, line: input_line_number, coding: .}' --arg FILENAME "$FILENAME" "$FILENAME"
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment