import duckdb
from pathlib import Path
Create views using DuckDB
The synthea generator generates FHIR data in json format. The FHIR data has a nested structure which can be challenging for analysis. For that reason we create a flattened view based on the different FHIR resources, that can be used for further analysis. In this handbook we create a patient_timeline, including all procedures that a woman had in her pregnancy journey.
What is duckDB?
DuckDB is an open-source database known for its fast query processing. It supports SQl and nested datastructures like Fhir.
Installing and importing data in duckDB
First install DuckDB.
Then import the library:
Now we can import the fhir data in duckdb as follows:
To use DuckDB, we first need to create a connection to a database. To do so we need to give a parameter that refers to a database (duckdb) file. If the file already exists, duckdb connects to that file, if not a new file will be generated and connected to. The file extension can be anything, but usually .db or .duckdb are used.
side note: when working with schemas a duckdb file can become large rapidly. For that reason in this notebook two duckdb files were created: one in which all raw data and manipulations are stored and has multiple schemas and one in which only the outcome is stored, which will be much smaller in size.
Creating and connecting to duckdb databases
# we setup a data storage schema following the Medallion structure
# https://www.databricks.com/glossary/medallion-architecture#:~:text=A%20medallion%20architecture%20is%20a,Silver%20%E2%87%92%20Gold%20layer%20tables).
# define where data is stored
= Path('.')
ROOT if 'src' in str(ROOT.resolve()): # locally
= '..' / ROOT # go one folder up
ROOT = ROOT / 'data' / 'bronze'
BRONZE = ROOT / 'data' / 'silver'
SILVER = ROOT /'data'/'bronze'/'synthea'/'fhir'
SYNTHEA_DUMP
= duckdb.connect(f"{BRONZE}/raw.duckdb")
connection_bronze = duckdb.connect(f"{SILVER}/pregnancy.duckdb") connection_silver
we are now connected to our duckdb databases, and can run any SQL commands on them.
SET up duckdb database and import fhir data
First, we can create a schema structure in the database, in this example we use a medallion database structure where the raw fhir data is stored in the BRONZE layer.
= ['bronze','silver','gold']
schema_list for schema in schema_list:
f'create schema if not exists {schema}') connection_bronze.sql(
Now, we import the FHIR data from the ndjson FHIR resources into the DuckDB database. DuckDB allows you to directly read the data from the ndjson files.
= 'bronze'
schema # loop through the different resources, note that the numbers behind organization, practitioner, location, etc. will be different for each set of generated Synthea data.
= ['DiagnosticReport','Claim','Provenance','ExplanationOfBenefit','DocumentReference','Encounter','Patient','Organization.1693914717904','Location.1693914717904','Immunization','Practitioner.1693914717904','PractitionerRole.1693914717904']
resource_list
for resource in resource_list:
= resource.split('.')[0] # get rid of number behind resource name
resource_table_name f"create table if not exists {schema}.{resource_table_name} as select * from '{SYNTHEA_DUMP}/{resource}.ndjson'") connection_bronze.sql(
To check if all resources are present we can check the information_schema:
#check if all resources are present
f"select * from information_schema.tables").to_df() connection_bronze.sql(
table_catalog | table_schema | table_name | table_type | self_referencing_column_name | reference_generation | user_defined_type_catalog | user_defined_type_schema | user_defined_type_name | is_insertable_into | is_typed | commit_action | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | raw | bronze | PractitionerRole | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
1 | raw | bronze | Practitioner | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
2 | raw | bronze | Immunization | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
3 | raw | bronze | Location | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
4 | raw | bronze | Organization | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
5 | raw | bronze | Patient | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
6 | raw | bronze | Encounter | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
7 | raw | bronze | DocumentReference | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
8 | raw | bronze | ExplanationOfBenefit | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
9 | raw | bronze | Provenance | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
10 | raw | bronze | Claim | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
11 | raw | bronze | DiagnosticReport | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
12 | raw | silver | patient_timeline | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
13 | raw | silver | price_list | BASE TABLE | NaN | NaN | NaN | NaN | NaN | YES | NO | None |
Export data to parquest files
Using parquet files enables us to separate storage and compute. This provides us with the flexibility to (for example) scale out storage engine, or change our compute backend, without affecting the other.
In addition, parquet files require less space than duckdb files.
One can convert each table separately to a parquet file, which is shown below.
= ['DiagnosticReport','Claim','Provenance','ExplanationOfBenefit','DocumentReference','Encounter','Patient','Organization','Location','Immunization','Practitioner','PractitionerRole']
resource_list #check if all resources are present
for resource in resource_list:
f"COPY bronze.{resource} TO '{BRONZE}/parquet/{resource}.parquet' (FORMAT PARQUET)") connection_bronze.sql(
It is also possible to do a database export in parquet format. a load.sql and schema.sql file are then also available, to make it easy to import the data at once in DuckDB.
f"EXPORT DATABASE '{BRONZE}/parquet_export' (FORMAT PARQUET);") connection_bronze.sql(
It can be observed that the size of the parquet files (16.5 MB) is smaller than the size of the duckdb file (18.5 MB). Both duckdb and parquet file formats are significantly smaller in size than the ndjson files, which take up 146.5 MB in total.
Investigating Nested structures
FHIR data has a nested structure. This can be observed by looking at the Claim data as an example:
'select * from bronze.Claim limit 1').to_df() connection_bronze.sql(
resourceType | id | status | type | use | patient | billablePeriod | created | provider | priority | facility | supportingInfo | insurance | item | total | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | Claim | 105a1262-aef1-422c-837e-b12b907dfeac | active | {'coding': [{'system': 'http://terminology.hl7... | claim | {'reference': 'Patient/884c04fc-2b31-c9c0-51b8... | {'start': '2014-05-25T13:57:31+02:00', 'end': ... | 2014-05-25T14:12:31+02:00 | {'reference': 'Organization?identifier=https:/... | {'coding': [{'system': 'http://terminology.hl7... | {'reference': 'Location?identifier=https://git... | [{'sequence': 1, 'category': {'coding': [{'sys... | [{'sequence': 1, 'focal': True, 'coverage': {'... | [{'sequence': 1, 'productOrService': {'coding'... | {'value': 272.8, 'currency': 'USD'} |
Let’s observe the data in the item column
'select * from bronze.Claim limit 1').to_df()['item'][0] connection_bronze.sql(
[{'sequence': 1,
'productOrService': {'coding': [{'system': 'http://snomed.info/sct',
'code': '410620009',
'display': 'Well child visit (procedure)'}],
'text': 'Well child visit (procedure)'},
'encounter': [{'reference': 'Encounter/a6b75b4c-0cfa-f85e-a317-33420c73b34b'}],
'informationSequence': None,
'net': None},
{'sequence': 2,
'productOrService': {'coding': [{'system': 'http://hl7.org/fhir/sid/cvx',
'code': '140',
'display': 'Influenza, seasonal, injectable, preservative free'}],
'text': 'Influenza, seasonal, injectable, preservative free'},
'encounter': None,
'informationSequence': [1],
'net': {'value': 136.0, 'currency': 'USD'}}]
It can be observed that an item is a list of procedures/products with a price. Within each list item under the productOrService, we can find another list in which the coding is specified.
To deal with this nested structure, the unnest() and struct_extract() functions are frequently used.
Let’s try to create a list of the claims where each productOrService of a claim is on a new line. To do so, a cross join can be used to merge the results with the original table.
= '''
query Select
c.patient.reference,
struct_extract(i,'productOrService') as productOrService
from bronze.Claim c
cross join (SELECT UNNEST(item)i)
'''
connection_bronze.sql(query).to_df()
reference | productOrService | |
---|---|---|
0 | Patient/884c04fc-2b31-c9c0-51b8-3909e34ee92f | {'coding': [{'system': 'http://snomed.info/sct... |
1 | Patient/884c04fc-2b31-c9c0-51b8-3909e34ee92f | {'coding': [{'system': 'http://hl7.org/fhir/si... |
2 | Patient/71d269a3-d04c-727d-c5bc-aaaff28e16b4 | {'coding': [{'system': 'http://snomed.info/sct... |
3 | Patient/71d269a3-d04c-727d-c5bc-aaaff28e16b4 | {'coding': [{'system': 'http://hl7.org/fhir/si... |
4 | Patient/f9133ee8-f952-0e7d-f642-99d84fc9c6ad | {'coding': [{'system': 'http://snomed.info/sct... |
... | ... | ... |
22662 | Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd | {'coding': [{'system': 'http://hl7.org/fhir/si... |
22663 | Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd | {'coding': [{'system': 'http://snomed.info/sct... |
22664 | Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd | {'coding': [{'system': 'http://hl7.org/fhir/si... |
22665 | Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd | {'coding': [{'system': 'http://snomed.info/sct... |
22666 | Patient/f7a1f5c2-825b-8c93-82f0-16cc44e19fdd | {'coding': [{'system': 'http://hl7.org/fhir/si... |
22667 rows × 2 columns
Create flattened tables to use for data wrangling
First, let’s create a pricelist from the claims data
= '''
query create table if not exists silver.price_list as(
Select
distinct
struct_extract(codes,'code') as code,
case
when struct_extract(codes,'system') like '%snomed%' then 'SNOMED'
when struct_extract(codes,'system') like '%cvx%' then 'CVX'
else NULL
end as system,
struct_extract(codes,'display') as item_claimed,
struct_extract(struct_extract(i,'net'),'value') as USD
from
bronze.Claim c
cross join (SELECT unnest(item) i)
cross join (SELECT unnest(struct_extract(struct_extract(i,'productOrService'),'coding')) codes)
);
Select * from silver.price_list limit 5
'''
connection_bronze.sql(query).to_df()
code | system | item_claimed | USD | |
---|---|---|---|---|
0 | 119 | CVX | rotavirus, monovalent | 136.0 |
1 | 62 | CVX | HPV, quadrivalent | 136.0 |
2 | 140 | CVX | Influenza, seasonal, injectable, preservative ... | 136.0 |
3 | 43 | CVX | Hep B, adult | 136.0 |
4 | 207 | CVX | SARS-COV-2 (COVID-19) vaccine, mRNA, spike pro... | 136.0 |
We also want to create a table that contains all encounters of the patient, information about the patient, and all vaccinations the patient required in one table.
='''
query create or replace table silver.patient_timeline as(
with patient_info as(
Select
cast(p.id as string) as patient_id, -- both pandas and byspark dont allow the UUID type (FIXED_BYTE_LEN_ARRAY in parquet)therefore the UUID is converted into string
struct_extract(i,'value') as social_security_number,
struct_extract(n,'prefix')[1] as prefix,
struct_extract(n,'given')[1] as first_name,
struct_extract(n,'family') as last_name,
p.birthDate
from bronze.Patient p
cross join (SELECT unnest(p.identifier) i)
cross join (SELECT unnest(p.name) n)
where struct_extract(n,'use') = 'official'
and struct_extract(i,'system') = 'http://hl7.org/fhir/sid/us-ssn'
),
encounter_info as(
Select
e.id as encounter_id,
struct_extract(c,'code') as code,
'SNOMED' as system,
struct_extract(c,'display') as procedure_name,
str_split(e.subject.reference,'/')[2] as patient_id,
str_split(e.serviceProvider.reference,'synthea|')[-1] as organization_id,
e.serviceProvider.display as organization_name,
struct_extract(struct_extract(p,'individual'),'display') as practitioner_name,
str_split(struct_extract(struct_extract(p,'individual'),'reference'),'us-npi|')[-1] as practitioner_id,
e.period.start as start_time,
e.period.end as end_time
from bronze.encounter e
cross join (SELECT unnest(e.type) t)
cross join (SELECT unnest(t.coding) c)
cross join (SELECT unnest(e.participant) p)
where struct_extract(c,'system') = 'http://snomed.info/sct'
),
immunization_info as(
Select
i.id as immunization_id,
i.vaccineCode.text as Vaccine_name,
struct_extract(vc,'code') as code,
'CVX' as system,
str_split(i.patient.reference,'/')[2] as patient_id,
str_split(i.encounter.reference,'/')[2] as encounter_id
from
bronze.Immunization i
cross join (SELECT unnest(vaccineCode.coding) vc)
where struct_extract(vc,'system') = 'http://hl7.org/fhir/sid/cvx'
),
price_list as (
Select
distinct
struct_extract(codes,'code') as code,
case
when struct_extract(codes,'system') like '%snomed%' then 'SNOMED'
when struct_extract(codes,'system') like '%cvx%' then 'CVX'
else NULL
end as system,
struct_extract(codes,'display') as item_claimed,
struct_extract(struct_extract(i,'net'),'value') as USD
from
bronze.Claim c
cross join (SELECT unnest(item) i)
cross join (SELECT unnest(struct_extract(struct_extract(i,'productOrService'),'coding')) codes)
)
Select
p.*,
e.code,
e.system,
e.organization_id,
e.organization_name,
e.practitioner_name,
e.practitioner_id,
e.procedure_name,
e.start_time,
e.end_time,
i.vaccine_name,
i.code as vaccine_code,
i.system as vaccine_code_system
from patient_info p
join encounter_info e on p.patient_id = e.patient_id
left join immunization_info i on i.encounter_id = e.encounter_id and e.patient_id = p.patient_id
);
select * from silver.patient_timeline limit 5
'''
connection_bronze.sql(query).to_df()
patient_id | social_security_number | prefix | first_name | last_name | birthDate | code | system | organization_id | organization_name | practitioner_name | practitioner_id | procedure_name | start_time | end_time | Vaccine_name | vaccine_code | vaccine_code_system | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 4fc244f3-2c0e-4017-d64d-c2c4cd03655f | 999-53-5813 | Mrs. | Alyce744 | Bergstrom287 | 1965-01-07 | 162673000 | SNOMED | b03b624d-c939-3688-986d-9555b8009a3b | HOLYOKE HEALTH CENTER INC | Dr. Bennett146 Hartmann983 | 9999981894 | General examination of patient (procedure) | 2015-01-08T07:11:47+01:00 | 2015-01-08T07:26:47+01:00 | zoster vaccine, live | 121 | CVX |
1 | 4fc244f3-2c0e-4017-d64d-c2c4cd03655f | 999-53-5813 | Mrs. | Alyce744 | Bergstrom287 | 1965-01-07 | 162673000 | SNOMED | b03b624d-c939-3688-986d-9555b8009a3b | HOLYOKE HEALTH CENTER INC | Dr. Bennett146 Hartmann983 | 9999981894 | General examination of patient (procedure) | 2015-01-08T07:11:47+01:00 | 2015-01-08T07:26:47+01:00 | Influenza, seasonal, injectable, preservative ... | 140 | CVX |
2 | fca8d2ca-7aef-2c27-3bba-3f94723012f5 | 999-24-8599 | None | Marlen929 | Greenholt190 | 2016-04-01 | 410620009 | SNOMED | 3f12ebb4-e03c-3453-88d2-4fc9682383df | NEW BEDFORD INTERNAL MEDICINE & GERIATRICS, LLC | Dr. Homero668 Rolón954 | 9999962894 | Well child visit (procedure) | 2016-04-01T04:09:48+02:00 | 2016-04-01T04:24:48+02:00 | Hep B, adolescent or pediatric | 08 | CVX |
3 | 19936964-a432-d501-2cd5-fa52db6b9f41 | 999-33-7974 | None | Monique148 | Haley279 | 2022-01-07 | 410620009 | SNOMED | c6b019eb-28ec-36f6-abf3-bcc4d1d58966 | DUTTON FAMILY CARE ASSOCIATES LLP | Dr. Maren639 Aufderhar910 | 9999950790 | Well child visit (procedure) | 2022-01-07T02:19:02+01:00 | 2022-01-07T02:34:02+01:00 | Hep B, adolescent or pediatric | 08 | CVX |
4 | 0d016955-26db-966c-3d52-26d441bfcb97 | 999-47-8500 | Ms. | Tracy345 | Smith67 | 2000-07-17 | 410620009 | SNOMED | 39c15c0f-5c49-311e-99d2-1fb99d80e06e | HARBOR HEALTH SERVICES INC | Dr. Salvador46 Homenick806 | 9999977496 | Well child visit (procedure) | 2014-08-18T14:52:06+02:00 | 2014-08-18T15:07:06+02:00 | Influenza, seasonal, injectable, preservative ... | 140 | CVX |
Export the outcome in a separate duckdb file
As the database has grown in size, we have extracted the “patient_timeline” and “pricelist” into another DuckDB instance. This decreases the size from 20.5MB to 2.9MB.
# first extract tables into a dataframe
= connection_bronze.sql('Select * from silver.price_list').to_df()
price_list = connection_bronze.sql('Select * from silver.patient_timeline').to_df()
timeline
# load from dataframe into SILVER database
'create or replace table price_list as select * from price_list')
connection_silver.sql(
'create or replace table patient_timeline as select * from timeline')
connection_silver.sql(
'Select * from patient_timeline limit 5').to_df() connection_silver.sql(
patient_id | social_security_number | prefix | first_name | last_name | birthDate | code | system | organization_id | organization_name | practitioner_name | practitioner_id | procedure_name | start_time | end_time | Vaccine_name | vaccine_code | vaccine_code_system | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 4fc244f3-2c0e-4017-d64d-c2c4cd03655f | 999-53-5813 | Mrs. | Alyce744 | Bergstrom287 | 1965-01-07 | 162673000 | SNOMED | b03b624d-c939-3688-986d-9555b8009a3b | HOLYOKE HEALTH CENTER INC | Dr. Bennett146 Hartmann983 | 9999981894 | General examination of patient (procedure) | 2015-01-08T07:11:47+01:00 | 2015-01-08T07:26:47+01:00 | zoster vaccine, live | 121 | CVX |
1 | 4fc244f3-2c0e-4017-d64d-c2c4cd03655f | 999-53-5813 | Mrs. | Alyce744 | Bergstrom287 | 1965-01-07 | 162673000 | SNOMED | b03b624d-c939-3688-986d-9555b8009a3b | HOLYOKE HEALTH CENTER INC | Dr. Bennett146 Hartmann983 | 9999981894 | General examination of patient (procedure) | 2015-01-08T07:11:47+01:00 | 2015-01-08T07:26:47+01:00 | Influenza, seasonal, injectable, preservative ... | 140 | CVX |
2 | fca8d2ca-7aef-2c27-3bba-3f94723012f5 | 999-24-8599 | None | Marlen929 | Greenholt190 | 2016-04-01 | 410620009 | SNOMED | 3f12ebb4-e03c-3453-88d2-4fc9682383df | NEW BEDFORD INTERNAL MEDICINE & GERIATRICS, LLC | Dr. Homero668 Rolón954 | 9999962894 | Well child visit (procedure) | 2016-04-01T04:09:48+02:00 | 2016-04-01T04:24:48+02:00 | Hep B, adolescent or pediatric | 08 | CVX |
3 | 19936964-a432-d501-2cd5-fa52db6b9f41 | 999-33-7974 | None | Monique148 | Haley279 | 2022-01-07 | 410620009 | SNOMED | c6b019eb-28ec-36f6-abf3-bcc4d1d58966 | DUTTON FAMILY CARE ASSOCIATES LLP | Dr. Maren639 Aufderhar910 | 9999950790 | Well child visit (procedure) | 2022-01-07T02:19:02+01:00 | 2022-01-07T02:34:02+01:00 | Hep B, adolescent or pediatric | 08 | CVX |
4 | 0d016955-26db-966c-3d52-26d441bfcb97 | 999-47-8500 | Ms. | Tracy345 | Smith67 | 2000-07-17 | 410620009 | SNOMED | 39c15c0f-5c49-311e-99d2-1fb99d80e06e | HARBOR HEALTH SERVICES INC | Dr. Salvador46 Homenick806 | 9999977496 | Well child visit (procedure) | 2014-08-18T14:52:06+02:00 | 2014-08-18T15:07:06+02:00 | Influenza, seasonal, injectable, preservative ... | 140 | CVX |
Export results in parquet files
Again to minimize storage space, we convert the duckdb file to parquet files. The size decreases from 2.9 MB to 550kB.
# Export database to parquet files
f"EXPORT DATABASE '{SILVER}/parquet_export' (FORMAT PARQUET);") connection_silver.sql(
Close the database connection
When you finish your analysis always make sure to close the duckdb connection. You can only have one duckdb connection at a time.
connection_silver.close() connection_bronze.close()
SQL on FHIR
TODO In this document we created our own standardized tables: the patient_timeline and the pricelist table.
SQL on FHIR is a project that deals with handling FHIR resources in SQL ecosystems that do not accept the nested FHIR structures. In this project, rules are set up and guidelines are given on how to flatten the FHIR resources into tabular views. In addition a Columnar Schema Guidance is written.