Summary
This post will demonstrate search functionality in Redis Enterprise (RE) with
FHIR data. I'll generate FHIR patient bundles with the Synthea application. Then I'll build a three-node RE cluster and sharded Redis DB via Docker scripting and RE REST API. Finally, I'll show multiple search scenarios on that healthcare data.
Overall Architecture
Data Generation
The shell script below pulls the Synthea jar file, if necessary, and then creates FHIR patient record bundles for every US state. One to ten bundles are randomly created for each state.
if [ ! -f synthea-with-dependencies.jar ]
then
wget -q https://github.com/synthetichealth/synthea/releases/download/master-branch-latest/synthea-with-dependencies.jar
fi
STATES=("Alabama" "Alaska" "Arizona" "Arkansas" "California" "Colorado" "Connecticut"
"Delaware" "District of Columbia" "Florida" "Georgia" "Hawaii" "Idaho" "Illinois"
"Indiana" "Iowa" "Kansas" "Kentucky" "Louisiana" "Maine" "Montana" "Nebraska"
"Nevada" "New Hampshire" "New Jersey" "New Mexico" "New York" "North Carolina"
"North Dakota" "Ohio" "Oklahoma" "Oregon" "Maryland" "Massachusetts" "Michigan"
"Minnesota" "Mississippi" "Missouri" "Pennsylvania" "Rhode Island" "South Carolina"
"South Dakota" "Tennessee" "Texas" "Utah" "Vermont" "Virginia" "Washington"
"West Virginia" "Wisconsin" "Wyoming")
MAX_POP=10
for state in "${STATES[@]}"; do
pop=$(($RANDOM%$MAX_POP + 1))
java -jar synthea-with-dependencies.jar -c ./syntheaconfig.txt -p $pop "$state"
done
RE Build
This shell script uses a docker-compose file to create a 3-node Redis Enterprise cluster. It pulls down the latest GA copies of the Search and JSON modules, executes the compose script, assembles a cluster, loads the Search and JSON modules via REST API, and then finally - creates a 2-shard, replicated database on the cluster via REST API.
SEARCH_LATEST=redisearch.Linux-ubuntu18.04-x86_64.2.6.3.zip
JSON_LATEST=rejson.Linux-ubuntu18.04-x86_64.2.4.2.zip
if [ ! -f $SEARCH_LATEST ]
then
wget -q https://redismodules.s3.amazonaws.com/redisearch/$SEARCH_LATEST
fi
if [ ! -f $JSON_LATEST ]
then
wget https://redismodules.s3.amazonaws.com/rejson/$JSON_LATEST
fi
echo "Launch Redis Enterprise docker containers"
docker compose up -d
echo "*** Wait for Redis Enterprise to come up ***"
curl -s -o /dev/null --retry 5 --retry-all-errors --retry-delay 3 -f -k -u "redis@redis.com:redis" https://localhost:19443/v1/bootstrap
echo "*** Build Cluster ***"
docker exec -it re1 /opt/redislabs/bin/rladmin cluster create name cluster.local username redis@redis.com password redis
docker exec -it re2 /opt/redislabs/bin/rladmin cluster join nodes 192.168.20.2 username redis@redis.com password redis
docker exec -it re3 /opt/redislabs/bin/rladmin cluster join nodes 192.168.20.2 username redis@redis.com password redis
echo "*** Load Modules ***"
curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/modules -F module=@$SEARCH_LATEST
curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/modules -F module=@$JSON_LATEST
echo "*** Build FHIR DB ***"
curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/bdbs -H "Content-Type:application/json" -d @fhirdb.json
RE Architecture
The diagram below depicts the resulting RE architecture that is created. Two shards (labeled M1 and M2) and their replicas (R1 and R2) are distributed across the cluster.
Screenshots below of the admin interfaces of the RE cluster and database that was created.
Search Examples
Below are some snippets of some of the search/aggregation examples implemented in Python.
Medical Facility Geographic Search
Below are the Redis index and search commands to find the closest medical facility (that is in the database) to a geographic coordinate. In this, case the coordinates are for Woodland Park, CO.
Index - JavaScript
await this.client.ft.create('location_idx', {
'$.status': {
type: SchemaFieldTypes.TAG,
AS: 'status'
},
'$.name': {
type: SchemaFieldTypes.TEXT,
AS: 'name'
},
'$.address.city': {
type: SchemaFieldTypes.TAG,
AS: 'city'
},
'$.address.state': {
type: SchemaFieldTypes.TAG,
AS: 'state'
},
'$.position.longitude': {
type: SchemaFieldTypes.NUMERIC,
AS: 'longitude'
},
'$.position.latitude': {
type: SchemaFieldTypes.NUMERIC,
AS: 'latitude'
}
}, { ON: 'JSON', PREFIX: 'Location:'});
Index - Python
idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['Location:'])
schema = [ TagField('$.status', as_name='status'),
TextField('$.name', as_name='name'),
TagField('$.address.city', as_name='city'),
TagField('$.address.state', as_name='state'),
NumericField('$.position.longitude', as_name='longitude'),
NumericField('$.position.latitude', as_name='latitude')
]
connection.ft('location_idx').create_index(schema, definition=idx_def)
Search - JavaScript
result = await this.client.ft.aggregate('location_idx','@status:{active}', {
LOAD: ['@name', '@city', '@state', '@longitude', '@latitude'],
STEPS: [
{ type: AggregateSteps.APPLY,
expression: 'geodistance(@longitude, @latitude, -105.0569, 38.9939)',
AS: 'meters'
},
{ type: AggregateSteps.APPLY ,
expression: 'ceil(@meters*0.000621371)',
AS: 'miles'
},
{
type: AggregateSteps.SORTBY,
BY: {
BY: '@miles',
DIRECTION: 'ASC'
}
},
{
type: AggregateSteps.LIMIT,
from: 0,
size: 1
}
]
});
Search - Python
request = AggregateRequest('@status:{active}')\
.load('@name', '@city', '@state', '@longitude', '@latitude')\
.apply(meters='geodistance(@longitude, @latitude, -105.0569, 38.9939)')\
.apply(miles='ceil(@meters*0.000621371)')\
.sort_by(Asc('@miles'))\
.limit(0,1)
result = connection.ft('location_idx').aggregate(request)
Results
[[b'name', b'ARETI COMPREHENSIVE PRIMARY CARE', b'city', b'COLORADO SPRINGS', b'state', b'CO',
b'longitude', b'-104.768591624', b'latitude', b'38.9006726282', b'meters', b'27009.43', b'miles', b'17']]
Medication Prescriptions
Below are the index and search commands to compile a list of the Top 3 physicians prescribing opioids by script count.
Index - JavaScript
await this.client.ft.create('medicationRequest_idx', {
'$.status': {
type: SchemaFieldTypes.TAG,
AS: 'status'
},
'$.medicationCodeableConcept.text': {
type: SchemaFieldTypes.TEXT,
AS: 'drug'
},
'$.requester.display': {
type: SchemaFieldTypes.TEXT,
AS: 'prescriber',
SORTABLE: true
},
'$.reasonReference[*].display': {
type: SchemaFieldTypes.TEXT,
AS: 'reason'
}
}, {ON: 'JSON', PREFIX: 'MedicationRequest:'});
Index - Python
idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['MedicationRequest:'])
schema = [ TagField('$.status', as_name='status'),
TextField('$.medicationCodeableConcept.text', as_name='drug'),
TextField('$.requester.display', as_name='prescriber', sortable=True),
TextField('$.reasonReference[*].display', as_name='reason')
]
connection.ft('medicationRequest_idx').create_index(schema, definition=idx_def)
Search - JavaScript
const opioids = 'Hydrocodone|Oxycodone|Oxymorphone|Morphine|Codeine|Fentanyl|Hydromorphone|Tapentadol|Methadone';
result = await this.client.ft.aggregate('medicationRequest_idx', `@drug:${opioids}`, {
STEPS: [
{ type: AggregateSteps.GROUPBY,
properties: ['@prescriber'],
REDUCE: [
{ type: AggregateGroupByReducers.COUNT,
property: '@prescriber',
AS: 'opioids_prescribed'
}
]
},
{
type: AggregateSteps.SORTBY,
BY: {
BY: '@opioids_prescribed',
DIRECTION: 'DESC'
}
},
{
type: AggregateSteps.LIMIT,
from: 0,
size: 3
}
]
});
Search - Python
opioids = 'Hydrocodone|Oxycodone|Oxymorphone|Morphine|Codeine|Fentanyl|Hydromorphone|Tapentadol|Methadone'
request = AggregateRequest(f'@drug:{opioids}')\
.group_by('@prescriber', reducers.count().alias('opioids_prescribed'))\
.sort_by(Desc('@opioids_prescribed'))\
.limit(0,3)
result = connection.ft('medicationRequest_idx').aggregate(request)
Results
[[b'prescriber', b'Dr. Aja848 McKenzie376', b'opiods_prescribed', b'53'],
[b'prescriber', b'Dr. Jaquelyn689 Bernier607', b'opiods_prescribed', b'52'],
[b'prescriber', b'Dr. Aurora248 Kessler503', b'opiods_prescribed', b'49']]
Insurer Claim Values
Below are the index and search commands to find the Top 3 insurers by total claim dollar value.
Index - JavaScript
await this.client.ft.create('claims_idx', {
'$.status': {
type: SchemaFieldTypes.TAG,
AS: 'status'
},
'$.insurance[*].coverage.display': {
type: SchemaFieldTypes.TEXT,
AS: 'insurer',
SORTABLE: true
},
'$.total.value': {
type: SchemaFieldTypes.NUMERIC,
AS: 'value'
}
}, {ON: 'JSON', PREFIX: 'Claim:'});
Index - Python
idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['Claim:'])
schema = [ TagField('$.status', as_name='status'),
TextField('$.insurance[*].coverage.display', as_name='insurer', sortable=True),
NumericField('$.total.value', as_name='value')
]
connection.ft('claims_idx').create_index(schema, definition=idx_def)
Search - JavaScript
result = await this.client.ft.aggregate('claims_idx', '@status:{active}', {
STEPS: [
{ type: AggregateSteps.GROUPBY,
properties: ['@insurer'],
REDUCE: [{
type: AggregateGroupByReducers.SUM,
property: '@value',
AS: 'total_value'
}]},
{
type: AggregateSteps.FILTER,
expression: '@total_value > 0'
},
{ type: AggregateSteps.SORTBY,
BY: {
BY: '@total_value',
DIRECTION: 'DESC'
}},
{ type: AggregateSteps.LIMIT,
from: 0,
size: 5
}
]
});
Search - Python
request = AggregateRequest('@status:{active}')\
.group_by('@insurer', reducers.sum('@value').alias('total_value'))\
.filter('@total_value > 0')\
.sort_by(Desc('@total_value'))\
.limit(0,3)
result = connection.ft('claims_idx').aggregate(request)
Results
[[b'insurer', b'Medicare', b'total_value', b'29841923.54'], [b'insurer', b'NO_INSURANCE', b'total_value', b'9749265.48'],
[b'insurer', b'UnitedHealthcare', b'total_value', b'8859141.59']]
Source
Copyright ©1993-2024 Joey E Whelan, All rights reserved.