Using SQL on FHIR to Generate Patient Timelines

This guide demonstrates how to leverage SQL on FHIR to extract events data into a patient timeline from bulk FHIR (Fast Healthcare Interoperability Resources) exports. By combining the power of SQL with FHIR’s standardized healthcare data, you can efficiently analyze and derive insights to support clinical and operational decision-making.

## Example: Using Anonymized Data

This section provides an example of how to work with anonymized data to generate value points.

Step 1: Creating View Definitions for FHIR Resources

The first step involves creating view definitions for various FHIR resources. These view definitions are essential for transforming the hierarchical structure of FHIR resources into flattened relational tables, enabling easier querying and analysis using SQL. Below is an example of a view definition for the Patient resource:

{
  "name": "mnch_patient",
  "title": "MNCH Patient View",
  "version": "0.1.0",
  "url": "https://momcare.cot.pharmaccess.org/fhir/ViewDefinition/patient",
  "meta": {
    "profile": [
      "http://hl7.org/fhir/uv/sql-on-fhir/StructureDefinition/ShareableViewDefinition",
      "http://hl7.org/fhir/uv/sql-on-fhir/StructureDefinition/TabularViewDefinition"
    ]
  },
  "status": "draft",
  "resource": "Patient",
  "fhirVersion": [
    "4.0.1"
  ],
  "select": [
    {
      "column": [
        {
          "name": "birth_date",
          "path": "birthDate",
          "type": "datetime",
          "collection": false
        },
        {
          "name": "patient_id",
          "path": "id",
          "type": "ID",
          "collection": false
        }
      ]
    },
    {
      "forEachOrNull": "identifier",
      "column": [
        {
          "name": "identifier_code",
          "path": "type.coding.code",
          "type": "code",
          "collection": false
        },
        {
          "name": "identifier_system_id",
          "path": "value",
          "type": "string",
          "collection": false
        }
      ]
    }
  ]
}

Step 2: Extracting Tabular Results with Pathling

In this step, we use Pathling, an open-source solution built on Apache Spark, to extract tabular results from bulk FHIR exports using the view definitions created in Step 1.

The output of Pathling is a Spark DataFrame, which can be easily loaded into a database for further analysis. For this demonstration, we will use DuckDB to store and query the extracted data.

Code
from pathling import DataSource, PathlingContext
from pyspark.sql import DataFrame, SparkSession, Window

# create a spark session
spark = (
            SparkSession.builder.config(
                "spark.jars.packages",
                "au.csiro.pathling:library-runtime:8.0.0-SNAPSHOT,"
                "io.delta:delta-spark_2.12:3.2.0,"
                "org.apache.hadoop:hadoop-aws:3.3.4",
            )
            .config("spark.jars.repositories", "https://oss.sonatype.org/content/repositories/snapshots/")
            .config(
                "spark.sql.extensions",
                "io.delta.sql.DeltaSparkSessionExtension",
            )
            .config(
                "spark.sql.catalog.spark_catalog",
                "org.apache.spark.sql.delta.catalog.DeltaCatalog",
            )
            .config("spark.driver.memory", "5g")
            .getOrCreate()
        )
pathling_context = PathlingContext.create(spark=spark)

2.1 preview the flattened patient data

in this example we will be assuming the bulk exports are in ndjson format and we will be using the patient resource type.

Code
import builtins
fhir_data = pathling_context.read.ndjson('bulk_fhir')
# use the view definition to flatten the data
with builtins.open('views/Patient.ViewDefinition.json') as f:
    patient_view = fhir_data.view(resource='Patient', json=f.read())
    patient_view.show(5)
    
+----------+--------------------+---------------+--------------------+
|birth_date|          patient_id|identifier_code|identifier_system_id|
+----------+--------------------+---------------+--------------------+
|1989-09-30|4F12AC21954DF9A3B...|           ANON|4F12AC21954DF9A3B...|
|1989-09-21|4F12AC21954DF9A3B...|           ANON|4F12AC21954DF9A3B...|
|1989-09-20|4F12AC21954DF9A3B...|           ANON|4F12AC21954DF9A3B...|
|1991-12-31|2276EE71DE1500D8F...|           ANON|2276EE71DE1500D8F...|
|1995-12-02|07BF8F9537F9D0EC8...|           ANON|07BF8F9537F9D0EC8...|
+----------+--------------------+---------------+--------------------+
only showing top 5 rows
                                                                                

Step 3: Defining Resources and Creating Patient Timeline

In this step, we define all the required FHIR resources, flatten them using the view definitions, and transform the data into a patient timeline. This timeline provides a comprehensive view of patient events, enabling detailed analysis and insights.

Code
from helper_functions import helpers
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, LongType, StringType
# define required resources
required_resources = [
            "Condition",
            "Encounter",
            "MedicationDispense",
            "Observation",
            "Patient",
            "Procedure",
        ]
# read bulk data
fhir_data = pathling_context.read.ndjson('bulk_fhir')

reference_columns = [
            "visit_provider_id",
            "patient_id",
            "encounter_id",
            "account_id",
        ]

views = {}
for resource in required_resources:
    with builtins.open(f'views/{resource}.ViewDefinition.json') as f:
        resource_view = fhir_data.view(resource=resource, json=f.read())
        # this removes backward references i.e paitent/1 -> 1
        # this can also be avoided by using correct view definition
        cleaned_view = helpers.clean_resource_references(
            resource_view, reference_columns
        )
        views[resource] = cleaned_view
        
# create a base view to join other tables
encounters = views["Encounter"]
encounters = encounters.withColumn(
    "visit_type_code", encounters.visit_type_code.cast(LongType())
)
encounters = encounters.withColumn(
    "visit_start_date", encounters.visit_start_date.cast(DateType())
)
encounters = encounters.withColumn(
    "visit_end_date", encounters.visit_end_date.cast(DateType())
)
base = encounters.where(F.col("visit_start_date") > "1900-01-01")

base.show(5)
+--------------------+--------------------+--------------------+----------------+--------------+---------------+---------------+----------+
|        encounter_id|          patient_id|   visit_provider_id|visit_start_date|visit_end_date|      visitType|visit_type_code|account_id|
+--------------------+--------------------+--------------------+----------------+--------------+---------------+---------------+----------+
|37482718382B982A0...|CD43D56AF8297A01B...|B13EC87C08F977590...|      2021-02-12|    2021-02-12|Postpartum care|      133906008|      NULL|
|5BFCFF8433B92C5EE...|03251A523C043ECCD...|3944139CC9A4DBEF6...|      2021-02-12|    2021-02-12| Antenatal care|      424525001|      NULL|
|E84B5D9B9CD2311B6...|C4E75C7F1D3047EB1...|3944139CC9A4DBEF6...|      2021-02-12|    2021-02-12| Antenatal care|      424525001|      NULL|
|D7849ED48AF653D1B...|1224B1BBCDBC89AAB...|6DD8675CA9CA308D6...|      2021-02-15|    2021-02-15| Antenatal care|      424525001|      NULL|
|2F20473C1A6F76AFF...|D4388E67F5919BB73...|E6FF8AF215034B4B2...|      2021-02-15|    2021-02-15| Antenatal care|      424525001|      NULL|
+--------------------+--------------------+--------------------+----------------+--------------+---------------+---------------+----------+
only showing top 5 rows

Step 3.2: use helper functions to extract patient timeline

Code
# import importlib
# importlib.reload(helpers)

procedure = helpers.get_procedure(base, views)
condition = helpers.get_diagnosis(base, views)
medication = helpers.get_medication(base, views)
observation = helpers.get_observation(base, views)

timeline = procedure.union(condition).union(medication).union(observation)
patient_timeline = timeline.withColumn(
            "value_string", F.col("value_string").cast(StringType())
        )
patient_timeline.show(5)
[Stage 23:===========================================>              (3 + 1) / 4]
+--------------------+--------------------+--------------------+----------+---------+--------------------+---------+--------------------+-------------------+---------------+----------+---------------+------------+
|          patient_id|        encounter_id|   visit_provider_id|event_time|     code|              system|     type|    description_name|          visitType|visit_type_code|account_id|value_date_time|value_string|
+--------------------+--------------------+--------------------+----------+---------+--------------------+---------+--------------------+-------------------+---------------+----------+---------------+------------+
|9E9F306D1D3CA1A70...|000002D2633497E56...|23E5E82BB1E8D9AA8...|2020-08-03| 46973005|http://snomed.inf...|procedure|Blood pressure ta...|    Postpartum care|      133906008|      NULL|           NULL|        NULL|
|9E9F306D1D3CA1A70...|000002D2633497E56...|23E5E82BB1E8D9AA8...|2020-08-03| 11466000|http://snomed.inf...|procedure|Cesarean section ...|    Postpartum care|      133906008|      NULL|           NULL|        NULL|
|9E9F306D1D3CA1A70...|000002D2633497E56...|23E5E82BB1E8D9AA8...|2020-08-03|384634009|http://snomed.inf...|procedure|Postnatal materna...|    Postpartum care|      133906008|      NULL|           NULL|        NULL|
|9E9F306D1D3CA1A70...|000002D2633497E56...|23E5E82BB1E8D9AA8...|2020-08-03| 47821001|http://snomed.inf...|procedure|Postpartum hemorr...|    Postpartum care|      133906008|      NULL|           NULL|        NULL|
|9E9F306D1D3CA1A70...|000002D2633497E56...|23E5E82BB1E8D9AA8...|2020-08-03| 46973005|http://snomed.inf...|procedure|Blood pressure ta...|Active immunization|       33879002|      NULL|           NULL|        NULL|
+--------------------+--------------------+--------------------+----------+---------+--------------------+---------+--------------------+-------------------+---------------+----------+---------------+------------+
only showing top 5 rows
                                                                                

Step 4: Extracting Visit Information for Value Points Calculation

Once the patient timeline is generated, the next step involves extracting detailed visit information. By aggregating and transforming this data, one can calculate value points, which serve as key indicators for clinical and operational decision-making.