Friday, December 30, 2022

Redis Vector Similarity Search

Summary

I'll show some examples of how to utilize vector similarity search (VSS) in Redis.  I'll generate embeddings from pictures, store the resulting vectors in Redis, and then perform searches against those stored vectors.


Architecture




Data Set

I used the Fashion Dataset on Kaggle for the photos to vectorize and store in Redis.

Application

The sample app is written in Python and organized as a single class.  That class does a one-time load of the dataset photos as vectors and stores the vectors in a file.  That file is subsequently used to store the vectors and their associated photo IDs in Redis.  Vector searches of other photos can then be performed against Redis.

Embedding

The code below reads a directory containing the dataset photos, vectorizes each photo, and then writes those vectors to file.  As mentioned, this is a one-time operation.
  1. if (not os.path.exists(VECTOR_FILE) and len(os.listdir(IMAGE_DIR)) > 0):
  2. img2vec = Img2Vec(cuda=False)
  3. images: list = os.listdir(IMAGE_DIR)
  4. images = images[0:NUM_IMAGES]
  5. with open(VECTOR_FILE, 'w') as outfile:
  6. for image in images:
  7. img: Image = Image.open(f'{IMAGE_DIR}/{image}').convert('RGB').resize((224, 224))
  8. vector: list = img2vec.get_vec(img)
  9. id: str = os.path.splitext(image)[0]
  10. json.dump({'image_id': id, 'image_vector': vector.tolist()}, outfile)
  11. outfile.write('\n')

Redis Data Loading

Redis supports VSS for both JSON and Hash Set data types.  I parameterized a function to allow the creation of Redis VSS indices for either data type.  One important difference in working with JSON or Hashes with Redis VSS:  vectors can be stored as is (array of floats) for JSON documents, but need to be reduced to a BLOB for Hash Sets.


  1. def _get_images(self) -> dict:
  2.  
  3. with open(VECTOR_FILE, 'r') as infile:
  4. for line in infile:
  5. obj: object = json.loads(line)
  6. id: str = str(obj['image_id'])
  7. match self.object_type:
  8. case OBJECT_TYPE.HASH:
  9. self.image_dict[id] = np.array(obj['image_vector'], dtype=np.float32).tobytes()
  10. case OBJECT_TYPE.JSON:
  11. self.image_dict[id] = obj['image_vector']
  12. def _load_db(self) -> None:
  13.  
  14. self.connection.flushdb()
  15. self._get_images()
  16.  
  17. match self.object_type:
  18. case OBJECT_TYPE.HASH:
  19. schema = [ VectorField('image_vector',
  20. self.index_type.value,
  21. { "TYPE": 'FLOAT32',
  22. "DIM": 512,
  23. "DISTANCE_METRIC": self.metric_type.value
  24. }
  25. ),
  26. TagField('image_id')
  27. ]
  28. idx_def = IndexDefinition(index_type=IndexType.HASH, prefix=['key:'])
  29. self.connection.ft('idx').create_index(schema, definition=idx_def)
  30.  
  31. pipe: Connection = self.connection.pipeline()
  32. for id, vec in self.image_dict.items():
  33. pipe.hset(f'key:{id}', mapping={'image_id': id, 'image_vector': vec})
  34. pipe.execute()
  35. case OBJECT_TYPE.JSON:
  36. schema = [ VectorField('$.image_vector',
  37. self.index_type.value,
  38. { "TYPE": 'FLOAT32',
  39. "DIM": 512,
  40. "DISTANCE_METRIC": self.metric_type.value
  41. }, as_name='image_vector'
  42. ),
  43. TagField('$.image_id', as_name='image_id')
  44. ]
  45. idx_def: IndexDefinition = IndexDefinition(index_type=IndexType.JSON, prefix=['key:'])
  46. self.connection.ft('idx').create_index(schema, definition=idx_def)
  47. pipe: Connection = self.connection.pipeline()
  48. for id, vec in self.image_dict.items():
  49. pipe.json().set(f'key:{id}', '$', {'image_id': id, 'image_vector': vec})
  50. pipe.execute()

Search

With vectors loaded and indices created in Redis, a vector search looks the same for either JSON or Hash Sets.  The search vector must be reduced to a BLOB.  Searches can be strictly for vector similarity (KNN search) or a combination of VSS and a traditional Redis Search query (hybrid search).  The function below is parameterized to support both.

  1. def search(self, query_vector: list, search_type: SEARCH_TYPE, hyb_str=None) -> list:
  2. match search_type:
  3. case SEARCH_TYPE.VECTOR:
  4. q_str = f'*=>[KNN {TOPK} @image_vector $vec_param AS vector_score]'
  5. case SEARCH_TYPE.HYBRID:
  6. q_str = f'(@image_id:{{{hyb_str}}})=>[KNN {TOPK} @image_vector $vec_param AS vector_score]'
  7. q = Query(q_str)\
  8. .sort_by('vector_score')\
  9. .paging(0,TOPK)\
  10. .return_fields('vector_score','image_id')\
  11. .dialect(2)
  12. params_dict = {"vec_param": query_vector}
  13.  
  14. results = self.connection.ft('idx').search(q, query_params=params_dict)
  15. return results

Source

https://github.com/Redislabs-Solution-Architects/vss-ops

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Monday, December 26, 2022

Redis Search with FHIR Data

Summary

This post will demonstrate search functionality in Redis Enterprise (RE) with FHIR data.  I'll generate FHIR patient bundles with the Synthea application.  Then I'll build a three-node RE cluster and sharded Redis DB via Docker scripting and RE REST API.  Finally, I'll show multiple search scenarios on that healthcare data.

Overall Architecture







Data Generation

The shell script below pulls the Synthea jar file, if necessary, and then creates FHIR patient record bundles for every US state.  One to ten bundles are randomly created for each state.
  1. if [ ! -f synthea-with-dependencies.jar ]
  2. then
  3. wget -q https://github.com/synthetichealth/synthea/releases/download/master-branch-latest/synthea-with-dependencies.jar
  4. fi
  5.  
  6. STATES=("Alabama" "Alaska" "Arizona" "Arkansas" "California" "Colorado" "Connecticut"
  7. "Delaware" "District of Columbia" "Florida" "Georgia" "Hawaii" "Idaho" "Illinois"
  8. "Indiana" "Iowa" "Kansas" "Kentucky" "Louisiana" "Maine" "Montana" "Nebraska"
  9. "Nevada" "New Hampshire" "New Jersey" "New Mexico" "New York" "North Carolina"
  10. "North Dakota" "Ohio" "Oklahoma" "Oregon" "Maryland" "Massachusetts" "Michigan"
  11. "Minnesota" "Mississippi" "Missouri" "Pennsylvania" "Rhode Island" "South Carolina"
  12. "South Dakota" "Tennessee" "Texas" "Utah" "Vermont" "Virginia" "Washington"
  13. "West Virginia" "Wisconsin" "Wyoming")
  14.  
  15. MAX_POP=10
  16.  
  17. for state in "${STATES[@]}"; do
  18. pop=$(($RANDOM%$MAX_POP + 1))
  19. java -jar synthea-with-dependencies.jar -c ./syntheaconfig.txt -p $pop "$state"
  20. done

RE Build

This shell script uses a docker-compose file to create a 3-node Redis Enterprise cluster.  It pulls down the latest GA copies of the Search and JSON modules, executes the compose script, assembles a cluster, loads the Search and JSON modules via REST API, and then finally - creates a 2-shard, replicated database on the cluster via REST API.
  1. SEARCH_LATEST=redisearch.Linux-ubuntu18.04-x86_64.2.6.3.zip
  2. JSON_LATEST=rejson.Linux-ubuntu18.04-x86_64.2.4.2.zip
  3.  
  4. if [ ! -f $SEARCH_LATEST ]
  5. then
  6. wget -q https://redismodules.s3.amazonaws.com/redisearch/$SEARCH_LATEST
  7. fi
  8.  
  9. if [ ! -f $JSON_LATEST ]
  10. then
  11. wget https://redismodules.s3.amazonaws.com/rejson/$JSON_LATEST
  12. fi
  13.  
  14. echo "Launch Redis Enterprise docker containers"
  15. docker compose up -d
  16. echo "*** Wait for Redis Enterprise to come up ***"
  17. curl -s -o /dev/null --retry 5 --retry-all-errors --retry-delay 3 -f -k -u "redis@redis.com:redis" https://localhost:19443/v1/bootstrap
  18. echo "*** Build Cluster ***"
  19. docker exec -it re1 /opt/redislabs/bin/rladmin cluster create name cluster.local username redis@redis.com password redis
  20. docker exec -it re2 /opt/redislabs/bin/rladmin cluster join nodes 192.168.20.2 username redis@redis.com password redis
  21. docker exec -it re3 /opt/redislabs/bin/rladmin cluster join nodes 192.168.20.2 username redis@redis.com password redis
  22. echo "*** Load Modules ***"
  23. curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/modules -F module=@$SEARCH_LATEST
  24. curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/modules -F module=@$JSON_LATEST
  25. echo "*** Build FHIR DB ***"
  26. curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/bdbs -H "Content-Type:application/json" -d @fhirdb.json

RE Architecture

The diagram below depicts the resulting RE architecture that is created.  Two shards (labeled M1 and M2) and their replicas (R1 and R2) are distributed across the cluster.




Screenshots below of the admin interfaces of the RE cluster and database that was created.











Search Examples

Below are some snippets of some of the search/aggregation examples implemented in Python.

Medical Facility Geographic Search


Below are the Redis index and search commands to find the closest medical facility (that is in the database) to a geographic coordinate.  In this, case the coordinates are for Woodland Park, CO.

Index - JavaScript

  1. await this.client.ft.create('location_idx', {
  2. '$.status': {
  3. type: SchemaFieldTypes.TAG,
  4. AS: 'status'
  5. },
  6. '$.name': {
  7. type: SchemaFieldTypes.TEXT,
  8. AS: 'name'
  9. },
  10. '$.address.city': {
  11. type: SchemaFieldTypes.TAG,
  12. AS: 'city'
  13. },
  14. '$.address.state': {
  15. type: SchemaFieldTypes.TAG,
  16. AS: 'state'
  17. },
  18. '$.position.longitude': {
  19. type: SchemaFieldTypes.NUMERIC,
  20. AS: 'longitude'
  21. },
  22. '$.position.latitude': {
  23. type: SchemaFieldTypes.NUMERIC,
  24. AS: 'latitude'
  25. }
  26. }, { ON: 'JSON', PREFIX: 'Location:'});

Index - Python

  1. idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['Location:'])
  2. schema = [ TagField('$.status', as_name='status'),
  3. TextField('$.name', as_name='name'),
  4. TagField('$.address.city', as_name='city'),
  5. TagField('$.address.state', as_name='state'),
  6. NumericField('$.position.longitude', as_name='longitude'),
  7. NumericField('$.position.latitude', as_name='latitude')
  8. ]
  9. connection.ft('location_idx').create_index(schema, definition=idx_def)

Search - JavaScript

  1. result = await this.client.ft.aggregate('location_idx','@status:{active}', {
  2. LOAD: ['@name', '@city', '@state', '@longitude', '@latitude'],
  3. STEPS: [
  4. { type: AggregateSteps.APPLY,
  5. expression: 'geodistance(@longitude, @latitude, -105.0569, 38.9939)',
  6. AS: 'meters'
  7. },
  8. { type: AggregateSteps.APPLY ,
  9. expression: 'ceil(@meters*0.000621371)',
  10. AS: 'miles'
  11. },
  12. {
  13. type: AggregateSteps.SORTBY,
  14. BY: {
  15. BY: '@miles',
  16. DIRECTION: 'ASC'
  17. }
  18. },
  19. {
  20. type: AggregateSteps.LIMIT,
  21. from: 0,
  22. size: 1
  23. }
  24. ]
  25. });

Search - Python

  1. request = AggregateRequest('@status:{active}')\
  2. .load('@name', '@city', '@state', '@longitude', '@latitude')\
  3. .apply(meters='geodistance(@longitude, @latitude, -105.0569, 38.9939)')\
  4. .apply(miles='ceil(@meters*0.000621371)')\
  5. .sort_by(Asc('@miles'))\
  6. .limit(0,1)
  7. result = connection.ft('location_idx').aggregate(request)

Results

[[b'name', b'ARETI COMPREHENSIVE PRIMARY CARE', b'city', b'COLORADO SPRINGS', b'state', b'CO', 
b'longitude', b'-104.768591624', b'latitude', b'38.9006726282', b'meters', b'27009.43', b'miles', b'17']]

Medication Prescriptions


Below are the index and search commands to compile a list of the Top 3 physicians prescribing opioids by script count.

Index - JavaScript

  1. await this.client.ft.create('medicationRequest_idx', {
  2. '$.status': {
  3. type: SchemaFieldTypes.TAG,
  4. AS: 'status'
  5. },
  6. '$.medicationCodeableConcept.text': {
  7. type: SchemaFieldTypes.TEXT,
  8. AS: 'drug'
  9. },
  10. '$.requester.display': {
  11. type: SchemaFieldTypes.TEXT,
  12. AS: 'prescriber',
  13. SORTABLE: true
  14. },
  15. '$.reasonReference[*].display': {
  16. type: SchemaFieldTypes.TEXT,
  17. AS: 'reason'
  18. }
  19. }, {ON: 'JSON', PREFIX: 'MedicationRequest:'});

Index - Python

  1. idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['MedicationRequest:'])
  2. schema = [ TagField('$.status', as_name='status'),
  3. TextField('$.medicationCodeableConcept.text', as_name='drug'),
  4. TextField('$.requester.display', as_name='prescriber', sortable=True),
  5. TextField('$.reasonReference[*].display', as_name='reason')
  6. ]
  7. connection.ft('medicationRequest_idx').create_index(schema, definition=idx_def)

Search - JavaScript

  1. const opioids = 'Hydrocodone|Oxycodone|Oxymorphone|Morphine|Codeine|Fentanyl|Hydromorphone|Tapentadol|Methadone';
  2. result = await this.client.ft.aggregate('medicationRequest_idx', `@drug:${opioids}`, {
  3. STEPS: [
  4. { type: AggregateSteps.GROUPBY,
  5. properties: ['@prescriber'],
  6. REDUCE: [
  7. { type: AggregateGroupByReducers.COUNT,
  8. property: '@prescriber',
  9. AS: 'opioids_prescribed'
  10. }
  11. ]
  12. },
  13. {
  14. type: AggregateSteps.SORTBY,
  15. BY: {
  16. BY: '@opioids_prescribed',
  17. DIRECTION: 'DESC'
  18. }
  19. },
  20. {
  21. type: AggregateSteps.LIMIT,
  22. from: 0,
  23. size: 3
  24. }
  25. ]
  26. });

Search - Python

  1. opioids = 'Hydrocodone|Oxycodone|Oxymorphone|Morphine|Codeine|Fentanyl|Hydromorphone|Tapentadol|Methadone'
  2. request = AggregateRequest(f'@drug:{opioids}')\
  3. .group_by('@prescriber', reducers.count().alias('opioids_prescribed'))\
  4. .sort_by(Desc('@opioids_prescribed'))\
  5. .limit(0,3)
  6. result = connection.ft('medicationRequest_idx').aggregate(request)

Results

[[b'prescriber', b'Dr. Aja848 McKenzie376', b'opiods_prescribed', b'53'], 
[b'prescriber', b'Dr. Jaquelyn689 Bernier607', b'opiods_prescribed', b'52'], 
[b'prescriber', b'Dr. Aurora248 Kessler503', b'opiods_prescribed', b'49']]

Insurer Claim Values


Below are the index and search commands to find the Top 3 insurers by total claim dollar value.

Index - JavaScript


  1. await this.client.ft.create('claims_idx', {
  2. '$.status': {
  3. type: SchemaFieldTypes.TAG,
  4. AS: 'status'
  5. },
  6. '$.insurance[*].coverage.display': {
  7. type: SchemaFieldTypes.TEXT,
  8. AS: 'insurer',
  9. SORTABLE: true
  10. },
  11. '$.total.value': {
  12. type: SchemaFieldTypes.NUMERIC,
  13. AS: 'value'
  14. }
  15. }, {ON: 'JSON', PREFIX: 'Claim:'});

Index - Python


  1. idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['Claim:'])
  2. schema = [ TagField('$.status', as_name='status'),
  3. TextField('$.insurance[*].coverage.display', as_name='insurer', sortable=True),
  4. NumericField('$.total.value', as_name='value')
  5. ]
  6. connection.ft('claims_idx').create_index(schema, definition=idx_def)

Search - JavaScript

  1. result = await this.client.ft.aggregate('claims_idx', '@status:{active}', {
  2. STEPS: [
  3. { type: AggregateSteps.GROUPBY,
  4. properties: ['@insurer'],
  5. REDUCE: [{
  6. type: AggregateGroupByReducers.SUM,
  7. property: '@value',
  8. AS: 'total_value'
  9. }]},
  10. {
  11. type: AggregateSteps.FILTER,
  12. expression: '@total_value > 0'
  13. },
  14. { type: AggregateSteps.SORTBY,
  15. BY: {
  16. BY: '@total_value',
  17. DIRECTION: 'DESC'
  18. }},
  19. { type: AggregateSteps.LIMIT,
  20. from: 0,
  21. size: 5
  22. }
  23. ]
  24. });

Search - Python

  1. request = AggregateRequest('@status:{active}')\
  2. .group_by('@insurer', reducers.sum('@value').alias('total_value'))\
  3. .filter('@total_value > 0')\
  4. .sort_by(Desc('@total_value'))\
  5. .limit(0,3)
  6. result = connection.ft('claims_idx').aggregate(request)

Results

[[b'insurer', b'Medicare', b'total_value', b'29841923.54'], [b'insurer', b'NO_INSURANCE', b'total_value', b'9749265.48'], 
[b'insurer', b'UnitedHealthcare', b'total_value', b'8859141.59']]

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.