Create views using DuckDB

The synthea generator generates FHIR data in json format. The FHIR data has a nested structure which can be challenging for analysis. For that reason we create a flattened view based on the different FHIR resources, that can be used for further analysis. In this handbook we create a patient_timeline, including all procedures that a woman had in her pregnancy journey.

What is duckDB?

DuckDB is an open-source database known for its fast query processing. It supports SQl and nested datastructures like Fhir.

Installing and importing data in duckDB

First install DuckDB.
Then import the library:

import duckdb
from pathlib import Path

Now we can import the fhir data in duckdb as follows:

To use DuckDB, we first need to create a connection to a database. To do so we need to give a parameter that refers to a database (duckdb) file. If the file already exists, duckdb connects to that file, if not a new file will be generated and connected to. The file extension can be anything, but usually .db or .duckdb are used.

side note: when working with schemas a duckdb file can become large rapidly. For that reason in this notebook two duckdb files were created: one in which all raw data and manipulations are stored and has multiple schemas and one in which only the outcome is stored, which will be much smaller in size.

Creating and connecting to duckdb databases

# we setup a data storage schema following the Medallion structure
# https://www.databricks.com/glossary/medallion-architecture#:~:text=A%20medallion%20architecture%20is%20a,Silver%20%E2%87%92%20Gold%20layer%20tables).

# define where data is stored

ROOT = Path('.')
if 'src' in str(ROOT.resolve()): # locally
    ROOT = '..' / ROOT # go one folder up
BRONZE = ROOT / 'data' / 'bronze'
SILVER = ROOT / 'data' / 'silver'
SYNTHEA_DUMP = ROOT /'data'/'bronze'/'synthea'/'fhir'

connection_bronze = duckdb.connect(f"{BRONZE}/raw.duckdb")
connection_silver = duckdb.connect(f"{SILVER}/pregnancy.duckdb")

we are now connected to our duckdb databases, and can run any SQL commands on them.

SET up duckdb database and import fhir data

First, we can create a schema structure in the database, in this example we use a medallion database structure where the raw fhir data is stored in the BRONZE layer.

schema_list = ['bronze','silver','gold']
for schema in schema_list: 
    connection_bronze.sql(f'create schema if not exists {schema}')

Now, we import the FHIR data from the ndjson FHIR resources into the DuckDB database. DuckDB allows you to directly read the data from the ndjson files.

schema = 'bronze'
# loop through the different resources, note that the numbers behind organization, practitioner, location, etc. will be different for each set of generated Synthea data.
resource_list = ['DiagnosticReport','Claim','Provenance','ExplanationOfBenefit','DocumentReference','Encounter','Patient','Organization.1693914717904','Location.1693914717904','Immunization','Practitioner.1693914717904','PractitionerRole.1693914717904']

for resource in resource_list:
    resource_table_name = resource.split('.')[0] # get rid of number behind resource name
    connection_bronze.sql(f"create table if not exists {schema}.{resource_table_name} as select * from '{SYNTHEA_DUMP}/{resource}.ndjson'")

To check if all resources are present we can check the information_schema:

#check if all resources are present
connection_bronze.sql(f"select * from information_schema.tables").to_df()
table_catalog table_schema table_name table_type self_referencing_column_name reference_generation user_defined_type_catalog user_defined_type_schema user_defined_type_name is_insertable_into is_typed commit_action
0 raw bronze PractitionerRole BASE TABLE NaN NaN NaN NaN NaN YES NO None
1 raw bronze Practitioner BASE TABLE NaN NaN NaN NaN NaN YES NO None
2 raw bronze Immunization BASE TABLE NaN NaN NaN NaN NaN YES NO None
3 raw bronze Location BASE TABLE NaN NaN NaN NaN NaN YES NO None
4 raw bronze Organization BASE TABLE NaN NaN NaN NaN NaN YES NO None
5 raw bronze Patient BASE TABLE NaN NaN NaN NaN NaN YES NO None
6 raw bronze Encounter BASE TABLE NaN NaN NaN NaN NaN YES NO None
7 raw bronze DocumentReference BASE TABLE NaN NaN NaN NaN NaN YES NO None
8 raw bronze ExplanationOfBenefit BASE TABLE NaN NaN NaN NaN NaN YES NO None
9 raw bronze Provenance BASE TABLE NaN NaN NaN NaN NaN YES NO None
10 raw bronze Claim BASE TABLE NaN NaN NaN NaN NaN YES NO None
11 raw bronze DiagnosticReport BASE TABLE NaN NaN NaN NaN NaN YES NO None
12 raw silver patient_timeline BASE TABLE NaN NaN NaN NaN NaN YES NO None
13 raw silver price_list BASE TABLE NaN NaN NaN NaN NaN YES NO None

Export data to parquest files

Using parquet files enables us to separate storage and compute. This provides us with the flexibility to (for example) scale out storage engine, or change our compute backend, without affecting the other.

storage_compute storage storage compute compute storage–compute

In addition, parquet files require less space than duckdb files.

One can convert each table separately to a parquet file, which is shown below.

resource_list = ['DiagnosticReport','Claim','Provenance','ExplanationOfBenefit','DocumentReference','Encounter','Patient','Organization','Location','Immunization','Practitioner','PractitionerRole']
#check if all resources are present
for resource in resource_list: 
    connection_bronze.sql(f"COPY bronze.{resource} TO '{BRONZE}/parquet/{resource}.parquet' (FORMAT PARQUET)")

It is also possible to do a database export in parquet format. a load.sql and schema.sql file are then also available, to make it easy to import the data at once in DuckDB.

connection_bronze.sql(f"EXPORT DATABASE '{BRONZE}/parquet_export' (FORMAT PARQUET);")

It can be observed that the size of the parquet files (16.5 MB) is smaller than the size of the duckdb file (18.5 MB). Both duckdb and parquet file formats are significantly smaller in size than the ndjson files, which take up 146.5 MB in total.

Investigating Nested structures

FHIR data has a nested structure. This can be observed by looking at the Claim data as an example:

connection_bronze.sql('select * from bronze.Claim limit 1').to_df()
resourceType id status type use patient billablePeriod created provider priority facility supportingInfo insurance item total
0 Claim 105a1262-aef1-422c-837e-b12b907dfeac active {'coding': [{'system': 'http://terminology.hl7... claim {'reference': 'Patient/884c04fc-2b31-c9c0-51b8... {'start': '2014-05-25T13:57:31+02:00', 'end': ... 2014-05-25T14:12:31+02:00 {'reference': 'Organization?identifier=https:/... {'coding': [{'system': 'http://terminology.hl7... {'reference': 'Location?identifier=https://git... [{'sequence': 1, 'category': {'coding': [{'sys... [{'sequence': 1, 'focal': True, 'coverage': {'... [{'sequence': 1, 'productOrService': {'coding'... {'value': 272.8, 'currency': 'USD'}

Let’s observe the data in the item column

connection_bronze.sql('select * from bronze.Claim limit 1').to_df()['item'][0]
[{'sequence': 1,
  'productOrService': {'coding': [{'system': 'http://snomed.info/sct',
     'code': '410620009',
     'display': 'Well child visit (procedure)'}],
   'text': 'Well child visit (procedure)'},
  'encounter': [{'reference': 'Encounter/a6b75b4c-0cfa-f85e-a317-33420c73b34b'}],
  'informationSequence': None,
  'net': None},
 {'sequence': 2,
  'productOrService': {'coding': [{'system': 'http://hl7.org/fhir/sid/cvx',
     'code': '140',
     'display': 'Influenza, seasonal, injectable, preservative free'}],
   'text': 'Influenza, seasonal, injectable, preservative free'},
  'encounter': None,
  'informationSequence': [1],
  'net': {'value': 136.0, 'currency': 'USD'}}]

It can be observed that an item is a list of procedures/products with a price. Within each list item under the productOrService, we can find another list in which the coding is specified.

To deal with this nested structure, the unnest() and struct_extract() functions are frequently used.

Let’s try to create a list of the claims where each productOrService of a claim is on a new line. To do so, a cross join can be used to merge the results with the original table.

query = '''
Select 
c.patient.reference, 
struct_extract(i,'productOrService') as productOrService
from bronze.Claim c
cross join (SELECT UNNEST(item)i)
'''
connection_bronze.sql(query).to_df()
reference productOrService
0 Patient/884c04fc-2b31-c9c0-51b8-3909e34ee92f {'coding': [{'system': 'http://snomed.info/sct...
1 Patient/884c04fc-2b31-c9c0-51b8-3909e34ee92f {'coding': [{'system': 'http://hl7.org/fhir/si...
2 Patient/71d269a3-d04c-727d-c5bc-aaaff28e16b4 {'coding': [{'system': 'http://snomed.info/sct...
3 Patient/71d269a3-d04c-727d-c5bc-aaaff28e16b4 {'coding': [{'system': 'http://hl7.org/fhir/si...
4 Patient/f9133ee8-f952-0e7d-f642-99d84fc9c6ad {'coding': [{'system': 'http://snomed.info/sct...
... ... ...
22662 Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd {'coding': [{'system': 'http://hl7.org/fhir/si...
22663 Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd {'coding': [{'system': 'http://snomed.info/sct...
22664 Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd {'coding': [{'system': 'http://hl7.org/fhir/si...
22665 Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd {'coding': [{'system': 'http://snomed.info/sct...
22666 Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd {'coding': [{'system': 'http://hl7.org/fhir/si...

22667 rows × 2 columns

Create flattened tables to use for data wrangling

First, let’s create a pricelist from the claims data

query = '''
create table if not exists silver.price_list as(
        Select 
        distinct
        struct_extract(codes,'code') as code,
        case
            when struct_extract(codes,'system') like '%snomed%' then 'SNOMED'
            when struct_extract(codes,'system') like '%cvx%' then 'CVX'
            else NULL 
        end as system,
        struct_extract(codes,'display') as item_claimed,
        struct_extract(struct_extract(i,'net'),'value') as USD


        from 
        bronze.Claim c
        cross join (SELECT unnest(item) i)
        cross join (SELECT unnest(struct_extract(struct_extract(i,'productOrService'),'coding')) codes)
    );

Select * from silver.price_list limit 5
'''
connection_bronze.sql(query).to_df()
code system item_claimed USD
0 119 CVX rotavirus, monovalent 136.0
1 62 CVX HPV, quadrivalent 136.0
2 140 CVX Influenza, seasonal, injectable, preservative ... 136.0
3 43 CVX Hep B, adult 136.0
4 207 CVX SARS-COV-2 (COVID-19) vaccine, mRNA, spike pro... 136.0

We also want to create a table that contains all encounters of the patient, information about the patient, and all vaccinations the patient required in one table.

query ='''
create or replace table silver.patient_timeline as(
    with patient_info as(
        Select 
            cast(p.id as string) as patient_id, -- both pandas and byspark dont allow the UUID type (FIXED_BYTE_LEN_ARRAY in parquet)therefore the UUID is converted into string
            struct_extract(i,'value') as social_security_number,
            struct_extract(n,'prefix')[1] as prefix,
            struct_extract(n,'given')[1] as first_name,
            struct_extract(n,'family') as last_name,
            p.birthDate
        from bronze.Patient p
        cross join (SELECT unnest(p.identifier) i)
        cross join (SELECT unnest(p.name) n)
        where struct_extract(n,'use') = 'official'
        and struct_extract(i,'system') = 'http://hl7.org/fhir/sid/us-ssn'
    ),
    encounter_info as(
        Select 
            e.id as encounter_id,
            struct_extract(c,'code') as code,
            'SNOMED' as system,
            struct_extract(c,'display') as procedure_name,
            str_split(e.subject.reference,'/')[2] as patient_id,
            str_split(e.serviceProvider.reference,'synthea|')[-1] as organization_id,
            e.serviceProvider.display as organization_name,
            struct_extract(struct_extract(p,'individual'),'display') as practitioner_name,
            str_split(struct_extract(struct_extract(p,'individual'),'reference'),'us-npi|')[-1] as practitioner_id,
            e.period.start as start_time,
            e.period.end as end_time    
        from bronze.encounter e
        cross join (SELECT unnest(e.type) t)
        cross join (SELECT unnest(t.coding) c)
        cross join (SELECT unnest(e.participant) p)
        where struct_extract(c,'system') = 'http://snomed.info/sct'
    ),
    immunization_info as(
        Select 
            i.id as immunization_id,
            i.vaccineCode.text as Vaccine_name,
            struct_extract(vc,'code') as code,
            'CVX' as system,
            str_split(i.patient.reference,'/')[2] as patient_id,
            str_split(i.encounter.reference,'/')[2] as encounter_id
        from 
        bronze.Immunization i
        cross join (SELECT unnest(vaccineCode.coding) vc)
        where struct_extract(vc,'system') = 'http://hl7.org/fhir/sid/cvx'
    ),
    price_list as (
        Select 
            distinct
            struct_extract(codes,'code') as code,
            case
                when struct_extract(codes,'system') like '%snomed%' then 'SNOMED'
                when struct_extract(codes,'system') like '%cvx%' then 'CVX'
                else NULL 
            end as system,
            struct_extract(codes,'display') as item_claimed,
            struct_extract(struct_extract(i,'net'),'value') as USD
        from 
        bronze.Claim c
        cross join (SELECT unnest(item) i)
        cross join (SELECT unnest(struct_extract(struct_extract(i,'productOrService'),'coding')) codes)
    )
    Select 
    p.*,
    e.code,
    e.system,
    e.organization_id,
    e.organization_name,
    e.practitioner_name,
    e.practitioner_id,
    e.procedure_name,
    e.start_time,
    e.end_time,
    i.vaccine_name,
    i.code as vaccine_code,
    i.system as vaccine_code_system
    from patient_info p
    join encounter_info e on p.patient_id = e.patient_id
    left join immunization_info i on i.encounter_id = e.encounter_id and e.patient_id = p.patient_id
);

select * from silver.patient_timeline limit 5
    
'''
connection_bronze.sql(query).to_df()
patient_id social_security_number prefix first_name last_name birthDate code system organization_id organization_name practitioner_name practitioner_id procedure_name start_time end_time Vaccine_name vaccine_code vaccine_code_system
0 4fc244f3-2c0e-4017-d64d-c2c4cd03655f 999-53-5813 Mrs. Alyce744 Bergstrom287 1965-01-07 162673000 SNOMED b03b624d-c939-3688-986d-9555b8009a3b HOLYOKE HEALTH CENTER INC Dr. Bennett146 Hartmann983 9999981894 General examination of patient (procedure) 2015-01-08T07:11:47+01:00 2015-01-08T07:26:47+01:00 zoster vaccine, live 121 CVX
1 4fc244f3-2c0e-4017-d64d-c2c4cd03655f 999-53-5813 Mrs. Alyce744 Bergstrom287 1965-01-07 162673000 SNOMED b03b624d-c939-3688-986d-9555b8009a3b HOLYOKE HEALTH CENTER INC Dr. Bennett146 Hartmann983 9999981894 General examination of patient (procedure) 2015-01-08T07:11:47+01:00 2015-01-08T07:26:47+01:00 Influenza, seasonal, injectable, preservative ... 140 CVX
2 fca8d2ca-7aef-2c27-3bba-3f94723012f5 999-24-8599 None Marlen929 Greenholt190 2016-04-01 410620009 SNOMED 3f12ebb4-e03c-3453-88d2-4fc9682383df NEW BEDFORD INTERNAL MEDICINE & GERIATRICS, LLC Dr. Homero668 Rolón954 9999962894 Well child visit (procedure) 2016-04-01T04:09:48+02:00 2016-04-01T04:24:48+02:00 Hep B, adolescent or pediatric 08 CVX
3 19936964-a432-d501-2cd5-fa52db6b9f41 999-33-7974 None Monique148 Haley279 2022-01-07 410620009 SNOMED c6b019eb-28ec-36f6-abf3-bcc4d1d58966 DUTTON FAMILY CARE ASSOCIATES LLP Dr. Maren639 Aufderhar910 9999950790 Well child visit (procedure) 2022-01-07T02:19:02+01:00 2022-01-07T02:34:02+01:00 Hep B, adolescent or pediatric 08 CVX
4 0d016955-26db-966c-3d52-26d441bfcb97 999-47-8500 Ms. Tracy345 Smith67 2000-07-17 410620009 SNOMED 39c15c0f-5c49-311e-99d2-1fb99d80e06e HARBOR HEALTH SERVICES INC Dr. Salvador46 Homenick806 9999977496 Well child visit (procedure) 2014-08-18T14:52:06+02:00 2014-08-18T15:07:06+02:00 Influenza, seasonal, injectable, preservative ... 140 CVX

Export the outcome in a separate duckdb file

As the database has grown in size, we have extracted the “patient_timeline” and “pricelist” into another DuckDB instance. This decreases the size from 20.5MB to 2.9MB.

# first extract tables into a dataframe
price_list = connection_bronze.sql('Select * from silver.price_list').to_df()
timeline = connection_bronze.sql('Select * from silver.patient_timeline').to_df()


# load from dataframe into SILVER database
connection_silver.sql('create or replace table price_list as select * from price_list')

connection_silver.sql('create or replace table patient_timeline as select * from timeline')

connection_silver.sql('Select * from patient_timeline limit 5').to_df()
patient_id social_security_number prefix first_name last_name birthDate code system organization_id organization_name practitioner_name practitioner_id procedure_name start_time end_time Vaccine_name vaccine_code vaccine_code_system
0 4fc244f3-2c0e-4017-d64d-c2c4cd03655f 999-53-5813 Mrs. Alyce744 Bergstrom287 1965-01-07 162673000 SNOMED b03b624d-c939-3688-986d-9555b8009a3b HOLYOKE HEALTH CENTER INC Dr. Bennett146 Hartmann983 9999981894 General examination of patient (procedure) 2015-01-08T07:11:47+01:00 2015-01-08T07:26:47+01:00 zoster vaccine, live 121 CVX
1 4fc244f3-2c0e-4017-d64d-c2c4cd03655f 999-53-5813 Mrs. Alyce744 Bergstrom287 1965-01-07 162673000 SNOMED b03b624d-c939-3688-986d-9555b8009a3b HOLYOKE HEALTH CENTER INC Dr. Bennett146 Hartmann983 9999981894 General examination of patient (procedure) 2015-01-08T07:11:47+01:00 2015-01-08T07:26:47+01:00 Influenza, seasonal, injectable, preservative ... 140 CVX
2 fca8d2ca-7aef-2c27-3bba-3f94723012f5 999-24-8599 None Marlen929 Greenholt190 2016-04-01 410620009 SNOMED 3f12ebb4-e03c-3453-88d2-4fc9682383df NEW BEDFORD INTERNAL MEDICINE & GERIATRICS, LLC Dr. Homero668 Rolón954 9999962894 Well child visit (procedure) 2016-04-01T04:09:48+02:00 2016-04-01T04:24:48+02:00 Hep B, adolescent or pediatric 08 CVX
3 19936964-a432-d501-2cd5-fa52db6b9f41 999-33-7974 None Monique148 Haley279 2022-01-07 410620009 SNOMED c6b019eb-28ec-36f6-abf3-bcc4d1d58966 DUTTON FAMILY CARE ASSOCIATES LLP Dr. Maren639 Aufderhar910 9999950790 Well child visit (procedure) 2022-01-07T02:19:02+01:00 2022-01-07T02:34:02+01:00 Hep B, adolescent or pediatric 08 CVX
4 0d016955-26db-966c-3d52-26d441bfcb97 999-47-8500 Ms. Tracy345 Smith67 2000-07-17 410620009 SNOMED 39c15c0f-5c49-311e-99d2-1fb99d80e06e HARBOR HEALTH SERVICES INC Dr. Salvador46 Homenick806 9999977496 Well child visit (procedure) 2014-08-18T14:52:06+02:00 2014-08-18T15:07:06+02:00 Influenza, seasonal, injectable, preservative ... 140 CVX

Export results in parquet files

Again to minimize storage space, we convert the duckdb file to parquet files. The size decreases from 2.9 MB to 550kB.

# Export database to parquet files
connection_silver.sql(f"EXPORT DATABASE '{SILVER}/parquet_export' (FORMAT PARQUET);")

Close the database connection

When you finish your analysis always make sure to close the duckdb connection. You can only have one duckdb connection at a time.

connection_silver.close()
connection_bronze.close()

SQL on FHIR

TODO In this document we created our own standardized tables: the patient_timeline and the pricelist table.

SQL on FHIR is a project that deals with handling FHIR resources in SQL ecosystems that do not accept the nested FHIR structures. In this project, rules are set up and guidelines are given on how to flatten the FHIR resources into tabular views. In addition a Columnar Schema Guidance is written.