Friday, December 30, 2022

Redis Vector Similarity Search

Summary

I'll show some examples of how to utilize vector similarity search (VSS) in Redis.  I'll generate embeddings from pictures, store the resulting vectors in Redis, and then perform searches against those stored vectors.


Architecture




Data Set

I used the Fashion Dataset on Kaggle for the photos to vectorize and store in Redis.

Application

The sample app is written in Python and organized as a single class.  That class does a one-time load of the dataset photos as vectors and stores the vectors in a file.  That file is subsequently used to store the vectors and their associated photo IDs in Redis.  Vector searches of other photos can then be performed against Redis.

Embedding

The code below reads a directory containing the dataset photos, vectorizes each photo, and then writes those vectors to file.  As mentioned, this is a one-time operation.
  1. if (not os.path.exists(VECTOR_FILE) and len(os.listdir(IMAGE_DIR)) > 0):
  2. img2vec = Img2Vec(cuda=False)
  3. images: list = os.listdir(IMAGE_DIR)
  4. images = images[0:NUM_IMAGES]
  5. with open(VECTOR_FILE, 'w') as outfile:
  6. for image in images:
  7. img: Image = Image.open(f'{IMAGE_DIR}/{image}').convert('RGB').resize((224, 224))
  8. vector: list = img2vec.get_vec(img)
  9. id: str = os.path.splitext(image)[0]
  10. json.dump({'image_id': id, 'image_vector': vector.tolist()}, outfile)
  11. outfile.write('\n')

Redis Data Loading

Redis supports VSS for both JSON and Hash Set data types.  I parameterized a function to allow the creation of Redis VSS indices for either data type.  One important difference in working with JSON or Hashes with Redis VSS:  vectors can be stored as is (array of floats) for JSON documents, but need to be reduced to a BLOB for Hash Sets.


  1. def _get_images(self) -> dict:
  2.  
  3. with open(VECTOR_FILE, 'r') as infile:
  4. for line in infile:
  5. obj: object = json.loads(line)
  6. id: str = str(obj['image_id'])
  7. match self.object_type:
  8. case OBJECT_TYPE.HASH:
  9. self.image_dict[id] = np.array(obj['image_vector'], dtype=np.float32).tobytes()
  10. case OBJECT_TYPE.JSON:
  11. self.image_dict[id] = obj['image_vector']
  12. def _load_db(self) -> None:
  13.  
  14. self.connection.flushdb()
  15. self._get_images()
  16.  
  17. match self.object_type:
  18. case OBJECT_TYPE.HASH:
  19. schema = [ VectorField('image_vector',
  20. self.index_type.value,
  21. { "TYPE": 'FLOAT32',
  22. "DIM": 512,
  23. "DISTANCE_METRIC": self.metric_type.value
  24. }
  25. ),
  26. TagField('image_id')
  27. ]
  28. idx_def = IndexDefinition(index_type=IndexType.HASH, prefix=['key:'])
  29. self.connection.ft('idx').create_index(schema, definition=idx_def)
  30.  
  31. pipe: Connection = self.connection.pipeline()
  32. for id, vec in self.image_dict.items():
  33. pipe.hset(f'key:{id}', mapping={'image_id': id, 'image_vector': vec})
  34. pipe.execute()
  35. case OBJECT_TYPE.JSON:
  36. schema = [ VectorField('$.image_vector',
  37. self.index_type.value,
  38. { "TYPE": 'FLOAT32',
  39. "DIM": 512,
  40. "DISTANCE_METRIC": self.metric_type.value
  41. }, as_name='image_vector'
  42. ),
  43. TagField('$.image_id', as_name='image_id')
  44. ]
  45. idx_def: IndexDefinition = IndexDefinition(index_type=IndexType.JSON, prefix=['key:'])
  46. self.connection.ft('idx').create_index(schema, definition=idx_def)
  47. pipe: Connection = self.connection.pipeline()
  48. for id, vec in self.image_dict.items():
  49. pipe.json().set(f'key:{id}', '$', {'image_id': id, 'image_vector': vec})
  50. pipe.execute()

Search

With vectors loaded and indices created in Redis, a vector search looks the same for either JSON or Hash Sets.  The search vector must be reduced to a BLOB.  Searches can be strictly for vector similarity (KNN search) or a combination of VSS and a traditional Redis Search query (hybrid search).  The function below is parameterized to support both.

  1. def search(self, query_vector: list, search_type: SEARCH_TYPE, hyb_str=None) -> list:
  2. match search_type:
  3. case SEARCH_TYPE.VECTOR:
  4. q_str = f'*=>[KNN {TOPK} @image_vector $vec_param AS vector_score]'
  5. case SEARCH_TYPE.HYBRID:
  6. q_str = f'(@image_id:{{{hyb_str}}})=>[KNN {TOPK} @image_vector $vec_param AS vector_score]'
  7. q = Query(q_str)\
  8. .sort_by('vector_score')\
  9. .paging(0,TOPK)\
  10. .return_fields('vector_score','image_id')\
  11. .dialect(2)
  12. params_dict = {"vec_param": query_vector}
  13.  
  14. results = self.connection.ft('idx').search(q, query_params=params_dict)
  15. return results

Source

https://github.com/Redislabs-Solution-Architects/vss-ops

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Monday, December 26, 2022

Redis Search with FHIR Data

Summary

This post will demonstrate search functionality in Redis Enterprise (RE) with FHIR data.  I'll generate FHIR patient bundles with the Synthea application.  Then I'll build a three-node RE cluster and sharded Redis DB via Docker scripting and RE REST API.  Finally, I'll show multiple search scenarios on that healthcare data.

Overall Architecture







Data Generation

The shell script below pulls the Synthea jar file, if necessary, and then creates FHIR patient record bundles for every US state.  One to ten bundles are randomly created for each state.
  1. if [ ! -f synthea-with-dependencies.jar ]
  2. then
  3. wget -q https://github.com/synthetichealth/synthea/releases/download/master-branch-latest/synthea-with-dependencies.jar
  4. fi
  5.  
  6. STATES=("Alabama" "Alaska" "Arizona" "Arkansas" "California" "Colorado" "Connecticut"
  7. "Delaware" "District of Columbia" "Florida" "Georgia" "Hawaii" "Idaho" "Illinois"
  8. "Indiana" "Iowa" "Kansas" "Kentucky" "Louisiana" "Maine" "Montana" "Nebraska"
  9. "Nevada" "New Hampshire" "New Jersey" "New Mexico" "New York" "North Carolina"
  10. "North Dakota" "Ohio" "Oklahoma" "Oregon" "Maryland" "Massachusetts" "Michigan"
  11. "Minnesota" "Mississippi" "Missouri" "Pennsylvania" "Rhode Island" "South Carolina"
  12. "South Dakota" "Tennessee" "Texas" "Utah" "Vermont" "Virginia" "Washington"
  13. "West Virginia" "Wisconsin" "Wyoming")
  14.  
  15. MAX_POP=10
  16.  
  17. for state in "${STATES[@]}"; do
  18. pop=$(($RANDOM%$MAX_POP + 1))
  19. java -jar synthea-with-dependencies.jar -c ./syntheaconfig.txt -p $pop "$state"
  20. done

RE Build

This shell script uses a docker-compose file to create a 3-node Redis Enterprise cluster.  It pulls down the latest GA copies of the Search and JSON modules, executes the compose script, assembles a cluster, loads the Search and JSON modules via REST API, and then finally - creates a 2-shard, replicated database on the cluster via REST API.
  1. SEARCH_LATEST=redisearch.Linux-ubuntu18.04-x86_64.2.6.3.zip
  2. JSON_LATEST=rejson.Linux-ubuntu18.04-x86_64.2.4.2.zip
  3.  
  4. if [ ! -f $SEARCH_LATEST ]
  5. then
  6. wget -q https://redismodules.s3.amazonaws.com/redisearch/$SEARCH_LATEST
  7. fi
  8.  
  9. if [ ! -f $JSON_LATEST ]
  10. then
  11. wget https://redismodules.s3.amazonaws.com/rejson/$JSON_LATEST
  12. fi
  13.  
  14. echo "Launch Redis Enterprise docker containers"
  15. docker compose up -d
  16. echo "*** Wait for Redis Enterprise to come up ***"
  17. curl -s -o /dev/null --retry 5 --retry-all-errors --retry-delay 3 -f -k -u "redis@redis.com:redis" https://localhost:19443/v1/bootstrap
  18. echo "*** Build Cluster ***"
  19. docker exec -it re1 /opt/redislabs/bin/rladmin cluster create name cluster.local username redis@redis.com password redis
  20. docker exec -it re2 /opt/redislabs/bin/rladmin cluster join nodes 192.168.20.2 username redis@redis.com password redis
  21. docker exec -it re3 /opt/redislabs/bin/rladmin cluster join nodes 192.168.20.2 username redis@redis.com password redis
  22. echo "*** Load Modules ***"
  23. curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/modules -F module=@$SEARCH_LATEST
  24. curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/modules -F module=@$JSON_LATEST
  25. echo "*** Build FHIR DB ***"
  26. curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/bdbs -H "Content-Type:application/json" -d @fhirdb.json

RE Architecture

The diagram below depicts the resulting RE architecture that is created.  Two shards (labeled M1 and M2) and their replicas (R1 and R2) are distributed across the cluster.




Screenshots below of the admin interfaces of the RE cluster and database that was created.











Search Examples

Below are some snippets of some of the search/aggregation examples implemented in Python.

Medical Facility Geographic Search


Below are the Redis index and search commands to find the closest medical facility (that is in the database) to a geographic coordinate.  In this, case the coordinates are for Woodland Park, CO.

Index - JavaScript

  1. await this.client.ft.create('location_idx', {
  2. '$.status': {
  3. type: SchemaFieldTypes.TAG,
  4. AS: 'status'
  5. },
  6. '$.name': {
  7. type: SchemaFieldTypes.TEXT,
  8. AS: 'name'
  9. },
  10. '$.address.city': {
  11. type: SchemaFieldTypes.TAG,
  12. AS: 'city'
  13. },
  14. '$.address.state': {
  15. type: SchemaFieldTypes.TAG,
  16. AS: 'state'
  17. },
  18. '$.position.longitude': {
  19. type: SchemaFieldTypes.NUMERIC,
  20. AS: 'longitude'
  21. },
  22. '$.position.latitude': {
  23. type: SchemaFieldTypes.NUMERIC,
  24. AS: 'latitude'
  25. }
  26. }, { ON: 'JSON', PREFIX: 'Location:'});

Index - Python

  1. idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['Location:'])
  2. schema = [ TagField('$.status', as_name='status'),
  3. TextField('$.name', as_name='name'),
  4. TagField('$.address.city', as_name='city'),
  5. TagField('$.address.state', as_name='state'),
  6. NumericField('$.position.longitude', as_name='longitude'),
  7. NumericField('$.position.latitude', as_name='latitude')
  8. ]
  9. connection.ft('location_idx').create_index(schema, definition=idx_def)

Search - JavaScript

  1. result = await this.client.ft.aggregate('location_idx','@status:{active}', {
  2. LOAD: ['@name', '@city', '@state', '@longitude', '@latitude'],
  3. STEPS: [
  4. { type: AggregateSteps.APPLY,
  5. expression: 'geodistance(@longitude, @latitude, -105.0569, 38.9939)',
  6. AS: 'meters'
  7. },
  8. { type: AggregateSteps.APPLY ,
  9. expression: 'ceil(@meters*0.000621371)',
  10. AS: 'miles'
  11. },
  12. {
  13. type: AggregateSteps.SORTBY,
  14. BY: {
  15. BY: '@miles',
  16. DIRECTION: 'ASC'
  17. }
  18. },
  19. {
  20. type: AggregateSteps.LIMIT,
  21. from: 0,
  22. size: 1
  23. }
  24. ]
  25. });

Search - Python

  1. request = AggregateRequest('@status:{active}')\
  2. .load('@name', '@city', '@state', '@longitude', '@latitude')\
  3. .apply(meters='geodistance(@longitude, @latitude, -105.0569, 38.9939)')\
  4. .apply(miles='ceil(@meters*0.000621371)')\
  5. .sort_by(Asc('@miles'))\
  6. .limit(0,1)
  7. result = connection.ft('location_idx').aggregate(request)

Results

[[b'name', b'ARETI COMPREHENSIVE PRIMARY CARE', b'city', b'COLORADO SPRINGS', b'state', b'CO', 
b'longitude', b'-104.768591624', b'latitude', b'38.9006726282', b'meters', b'27009.43', b'miles', b'17']]

Medication Prescriptions


Below are the index and search commands to compile a list of the Top 3 physicians prescribing opioids by script count.

Index - JavaScript

  1. await this.client.ft.create('medicationRequest_idx', {
  2. '$.status': {
  3. type: SchemaFieldTypes.TAG,
  4. AS: 'status'
  5. },
  6. '$.medicationCodeableConcept.text': {
  7. type: SchemaFieldTypes.TEXT,
  8. AS: 'drug'
  9. },
  10. '$.requester.display': {
  11. type: SchemaFieldTypes.TEXT,
  12. AS: 'prescriber',
  13. SORTABLE: true
  14. },
  15. '$.reasonReference[*].display': {
  16. type: SchemaFieldTypes.TEXT,
  17. AS: 'reason'
  18. }
  19. }, {ON: 'JSON', PREFIX: 'MedicationRequest:'});

Index - Python

  1. idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['MedicationRequest:'])
  2. schema = [ TagField('$.status', as_name='status'),
  3. TextField('$.medicationCodeableConcept.text', as_name='drug'),
  4. TextField('$.requester.display', as_name='prescriber', sortable=True),
  5. TextField('$.reasonReference[*].display', as_name='reason')
  6. ]
  7. connection.ft('medicationRequest_idx').create_index(schema, definition=idx_def)

Search - JavaScript

  1. const opioids = 'Hydrocodone|Oxycodone|Oxymorphone|Morphine|Codeine|Fentanyl|Hydromorphone|Tapentadol|Methadone';
  2. result = await this.client.ft.aggregate('medicationRequest_idx', `@drug:${opioids}`, {
  3. STEPS: [
  4. { type: AggregateSteps.GROUPBY,
  5. properties: ['@prescriber'],
  6. REDUCE: [
  7. { type: AggregateGroupByReducers.COUNT,
  8. property: '@prescriber',
  9. AS: 'opioids_prescribed'
  10. }
  11. ]
  12. },
  13. {
  14. type: AggregateSteps.SORTBY,
  15. BY: {
  16. BY: '@opioids_prescribed',
  17. DIRECTION: 'DESC'
  18. }
  19. },
  20. {
  21. type: AggregateSteps.LIMIT,
  22. from: 0,
  23. size: 3
  24. }
  25. ]
  26. });

Search - Python

  1. opioids = 'Hydrocodone|Oxycodone|Oxymorphone|Morphine|Codeine|Fentanyl|Hydromorphone|Tapentadol|Methadone'
  2. request = AggregateRequest(f'@drug:{opioids}')\
  3. .group_by('@prescriber', reducers.count().alias('opioids_prescribed'))\
  4. .sort_by(Desc('@opioids_prescribed'))\
  5. .limit(0,3)
  6. result = connection.ft('medicationRequest_idx').aggregate(request)

Results

[[b'prescriber', b'Dr. Aja848 McKenzie376', b'opiods_prescribed', b'53'], 
[b'prescriber', b'Dr. Jaquelyn689 Bernier607', b'opiods_prescribed', b'52'], 
[b'prescriber', b'Dr. Aurora248 Kessler503', b'opiods_prescribed', b'49']]

Insurer Claim Values


Below are the index and search commands to find the Top 3 insurers by total claim dollar value.

Index - JavaScript


  1. await this.client.ft.create('claims_idx', {
  2. '$.status': {
  3. type: SchemaFieldTypes.TAG,
  4. AS: 'status'
  5. },
  6. '$.insurance[*].coverage.display': {
  7. type: SchemaFieldTypes.TEXT,
  8. AS: 'insurer',
  9. SORTABLE: true
  10. },
  11. '$.total.value': {
  12. type: SchemaFieldTypes.NUMERIC,
  13. AS: 'value'
  14. }
  15. }, {ON: 'JSON', PREFIX: 'Claim:'});

Index - Python


  1. idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['Claim:'])
  2. schema = [ TagField('$.status', as_name='status'),
  3. TextField('$.insurance[*].coverage.display', as_name='insurer', sortable=True),
  4. NumericField('$.total.value', as_name='value')
  5. ]
  6. connection.ft('claims_idx').create_index(schema, definition=idx_def)

Search - JavaScript

  1. result = await this.client.ft.aggregate('claims_idx', '@status:{active}', {
  2. STEPS: [
  3. { type: AggregateSteps.GROUPBY,
  4. properties: ['@insurer'],
  5. REDUCE: [{
  6. type: AggregateGroupByReducers.SUM,
  7. property: '@value',
  8. AS: 'total_value'
  9. }]},
  10. {
  11. type: AggregateSteps.FILTER,
  12. expression: '@total_value > 0'
  13. },
  14. { type: AggregateSteps.SORTBY,
  15. BY: {
  16. BY: '@total_value',
  17. DIRECTION: 'DESC'
  18. }},
  19. { type: AggregateSteps.LIMIT,
  20. from: 0,
  21. size: 5
  22. }
  23. ]
  24. });

Search - Python

  1. request = AggregateRequest('@status:{active}')\
  2. .group_by('@insurer', reducers.sum('@value').alias('total_value'))\
  3. .filter('@total_value > 0')\
  4. .sort_by(Desc('@total_value'))\
  5. .limit(0,3)
  6. result = connection.ft('claims_idx').aggregate(request)

Results

[[b'insurer', b'Medicare', b'total_value', b'29841923.54'], [b'insurer', b'NO_INSURANCE', b'total_value', b'9749265.48'], 
[b'insurer', b'UnitedHealthcare', b'total_value', b'8859141.59']]

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Saturday, October 15, 2022

Web Crawler with Redis Indexing

Summary

I'll be demonstrating a simple web crawler implementation in Nodejs in this post.  The crawled results are then stored in Redis and indexed with RediSearch.  Apache Tika is used for document parsing.

Architecture

High Level


Detailed


Crawler POST End Point


  1. /**
  2. * Crawl endpoint. Starts a Worker thread (spider.js) that crawls the given fqdn, extracts text via Tika,
  3. * and then store the resulting text in Redis as JSON documents
  4. * This returns immediately and provides a taskID for the Worker thread.
  5. */
  6. app.post('/crawl', (req, res) => {
  7. console.log(`app - POST /crawl ${req.body.fqdn}`);
  8. const taskID = uuidv4();
  9. try {
  10. new Worker('./app/spider.js', { workerData : { 'fqdn': req.body.fqdn, 'taskID': taskID }});
  11. res.status(201).json({'taskID': taskID});
  12. }
  13. catch (err) {
  14. console.error(`app - POST /crawl ${req.body.fqdn} - ${err.message}`)
  15. res.status(400).json({ 'error': err.message });
  16. }
  17. });

Text Extraction with Tika

  1. async extract(doc, data, hash) {
  2. const stream = Readable.from(data); //get a stream from the arrayBuffer obj
  3. const response = await axios({ //send that stream to Tika for automatic mime-type detection and text extraction
  4. method: 'PUT',
  5. url: `${tikaUrl}/tika`,
  6. data: stream,
  7. responseType: 'text',
  8. headers: {
  9. 'Content-Type': 'application/octet-stream',
  10. 'Accept': 'text/plain'
  11. }
  12. });
  13. const json = { "doc": doc, "text": response.data, "hash": hash };
  14. await this.client.json.set(`${PREFIX}:${doc}`, '.', json);
  15. }

Index Creation

  1. async function buildIndex() {
  2. console.log(`app - buildIndex`);
  3. let rc = await clientFactory();
  4. try {
  5. await rc.ft.create('docIdx', {
  6. '$.doc': {
  7. type: redis.SchemaFieldTypes.TEXT,
  8. AS: 'doc'
  9. },
  10. '$.text': {
  11. type: redis.SchemaFieldTypes.TEXT,
  12. AS: 'text'
  13. }
  14. }, {
  15. ON: 'JSON',
  16. PREFIX: 'DOC'
  17. });
  18. }
  19. catch(err) {
  20. console.error(`app - buildIndex - ${err.message}`);
  21. }
  22. }

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Redis Lookaside Cache with MongoDB

Summary

In this post, I'll demonstrate how to implement caching with Redis against a Mongo database.  All code is Python using the redis-py client lib.  This is implemented as a REST API server using the Fast API module.

Architecture

High Level


Detailed


Data Set

A snippet of the data set is below

  1. [
  2. {
  3. "airport": {
  4. "code": "ATL",
  5. "name": "Atlanta, GA: Hartsfield-Jackson Atlanta International"
  6. },
  7. "time": {
  8. "label": "2003/06",
  9. "month": 6,
  10. "month_name": "June",
  11. "year": 2003
  12. },
  13. "statistics": {
  14. "num_delays": {
  15. "carrier": 1009,
  16. "late_aircraft": 1275,
  17. "national_aviation_system": 3217,
  18. "security": 17,
  19. "weather": 328
  20. },
  21. "carriers": {
  22. "names": [
  23. "American Airlines Inc.",
  24. "JetBlue Airways",
  25. "Continental Air Lines Inc.",
  26. "Delta Air Lines Inc.",
  27. "Atlantic Southeast Airlines",
  28. "AirTran Airways Corporation",
  29. "America West Airlines Inc.",
  30. "Northwest Airlines Inc.",
  31. "ExpressJet Airlines Inc.",
  32. "United Air Lines Inc.",
  33. "US Airways Inc."
  34. ],
  35. "total": 11
  36. },
  37. "flights": {
  38. "canceled": 216,
  39. "delayed": 5843,
  40. "diverted": 27,
  41. "on_time": 23974,
  42. "total": 30060
  43. },
  44. "minutes_delayed": {
  45. "carrier": 61606,
  46. "late_aircraft": 68335,
  47. "national_aviation_system": 118831,
  48. "security": 518,
  49. "total": 268764,
  50. "weather": 19474
  51. }
  52. }
  53. },

Data Loading


  1. async def startup_event():
  2. #mongo startup
  3. """Reset the db to an empty state and then load data from file
  4. """
  5. col = mdb.airlines.delays
  6. col.drop()
  7. with open('data.json') as json_file:
  8. data = load(json_file)
  9. await col.insert_many(data)
  10. await col.create_index([('airport.code', 1),('time.year', 1),('time.month', 1)])
  11. #redis startup
  12. """Empty cache and rebuild index.
  13. """
  14. await rdb.flushdb()
  15. idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['delayStat:'])
  16. schema = (TextField('$.airport.code', as_name='airport', sortable=True),
  17. NumericField('$.time.year', as_name='year', sortable=True),
  18. NumericField('$.time.month', as_name='month', sortable=True))
  19. await rdb.ft('idx').create_index(schema, definition=idx_def)

Data Fetch Logic


  1. async def get_cancellations(airport: str, year: int, month: int):
  2. try:
  3. result, duration = await time_func(rdb.ft('idx').search,
  4. Query(f"(@airport:{airport}) (@year:[{year} {year}]) (@month:[{month} {month}])"))
  5.  
  6. if len(result.docs) > 0: # cache hit
  7. jsonval = loads(result.docs[0].json)
  8. metrics.incr_hits(duration)
  9. json_res = {"result": jsonval['statistics']['flights']['canceled']}
  10. else: # cache miss
  11. col = mdb.airlines.delays
  12. lock = await lock_mgr.lock(f"lock:{airport}:{year}:{month}") #fine-grained, distributed lock
  13. result, duration = await time_func(col.find_one,
  14. { "airport.code": airport,
  15. "time.year": {"$eq": year},
  16. "time.month": {"$eq": month}
  17. })
  18. metrics.incr_misses(duration)
  19. if result:
  20. id = result.pop('_id') # this field can't be serialized and needs to be removed
  21. await rdb.json().set(f"delayStat:{id}", '$', result) #add val to cache and set TTL to 1 hour
  22. await rdb.expire(f"delayStat:{id}", 3600)
  23. json_res = {"result": result['statistics']['flights']['canceled']}
  24. else: # not found in cache or db
  25. raise HTTPException(status_code=404, detail=f"Data not found for {airport} {year} {month}")
  26. except Exception as err:
  27. if type(err) == HTTPException:
  28. raise err
  29. else:
  30. raise HTTPException(status_code=400, detail=str(err))
  31. else:
  32. return json_res
  33. finally:
  34. if lock and lock.valid:
  35. await lock_mgr.unlock(lock)

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

RediSearch - ioredis client lib

Summary

I'll be demonstrating various Redis Search operations with the ioredis (Nodejs) client lib in this post.

Data Set Creation

A random set of objects are created and are then saved in Redis as hash sets.
  1. const pipeline = client.pipeline();
  2. for (let i=0; i < NUM; i++) {
  3. const colors = (COLORS.sort(() => .5 - Math.random())).slice(0, Math.floor(Math.random() * COLORS.length))
  4. const fields = {
  5. 'textField': `text${Math.floor(Math.random() * NUM)}`,
  6. 'numericField': Math.floor(Math.random() * NUM),
  7. 'tagField': colors
  8. };
  9. await pipeline.hmset(`item:${i}`, fields);
  10. }
  11. await pipeline.exec();

Index Creation

  1. await client.call('FT.CREATE', 'idx', 'ON', 'HASH', 'PREFIX', '1', 'item:', 'SCHEMA',
  2. 'textField', 'TEXT', 'SORTABLE',
  3. 'numericField', 'NUMERIC', 'SORTABLE',
  4. 'tagField', 'TAG'
  5. );

Search Operations

  1. //Search for exact match in a Text field
  2. let result = await client.call('FT.SEARCH', 'idx', '@textField:text1');
  3.  
  4. //Search for a range in a Numeric field
  5. result = await client.call('FT.SEARCH', 'idx', '@numericField:[1,3]');
  6.  
  7. //Search for a match in a Tag Field
  8. result = await client.call('FT.SEARCH', 'idx', '@tagField:{blue}');

Aggregation Operations

  1. //Aggregate on a text field across all hashes in the index
  2. let result = await client.call('FT.AGGREGATE', 'idx', '*', 'GROUPBY', '1', '@textField',
  3. 'REDUCE', 'COUNT', '0', 'AS', 'CNT');
  4. //Search on a numeric range and then apply the SQRT function to a numeric field in the matches
  5. const upper = Math.floor(Math.random() * NUM) + 1;
  6. result = await client.call('FT.AGGREGATE', 'idx', `@numericField:[0,${upper}]`,
  7. 'APPLY', 'SQRT(@numericField)', 'AS', 'SQRT');
  8. //Search for logical OR of two tag values and use a CURSOR. Limit return
  9. //to 2 values to illustrate looping on a cursor.
  10. result = await client.call('FT.AGGREGATE', 'idx', '@tagField:{ yellow | red }',
  11. 'LOAD', '3', '@textField', '@numericField', '@tagField',
  12. 'WITHCURSOR', 'COUNT', '2'
  13. );
  14. console.log('FT.AGGREGATE idx @tagField:{ yellow | red } LOAD 3 @textField @numericField @tagField WITHCURSOR COUNT 2');
  15. let items = result[0];
  16. let cursor = result[1];
  17. while (true) {
  18. for (let item of items) {
  19. if (Array.isArray(item)) {
  20. console.log(JSON.stringify(item))
  21. }
  22. }
  23. if (cursor) {
  24. result = await client.call('FT.CURSOR', 'READ', 'idx', cursor, 'COUNT', '2');
  25. items = result[0];
  26. cursor = result[1];
  27. }
  28. else {
  29. break;
  30. }
  31. }

Index Alteration

  1. await client.call('FT.ALTER', 'idx', 'SCHEMA', 'ADD', 'newField', 'TEXT', 'SORTABLE');
  2. let result = await client.call('FT.INFO', 'idx');

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Redis + Mongo Document (JSON/Search) Commands Comparison

Summary

This post will demonstrate equivalent Create, Read, Update and Delete (CRUD) commands in MongoDB and Redis.  I provide examples from Mongo's shell (mongosh), Redis CLI, node-redis client lib, and redis-py client lib.

Sample Data Set

Below is a snippet of the JSON document that is used in all the examples.

  1. [
  2. {
  3. "item": "journal",
  4. "qty": 25,
  5. "tags": ["blank", "red"],
  6. "dim_cm": [ 14, 21 ],
  7. "size": {
  8. "h": 14,
  9. "w": 21,
  10. "uom": "cm"
  11. },
  12. "status": "E"
  13. },
  14. {
  15. "item": "notebook",
  16. "qty": 50,
  17. "tags": ["red", "blank"],
  18. "dim_cm": [ 14, 21 ],
  19. "size": {
  20. "h": 14,
  21. "w": 21,
  22. "uom": "cm"
  23. },
  24. "status": "E"
  25. },

Create Operation

Mongo Shell

  1. db.inventory.insertMany([
  2. { _id: 1, item: "journal", qty: 25, tags: ["blank", "red"], dim_cm: [ 14, 21 ], size: { h: 14, w: 21, uom: "cm" }, status: "E" },
  3. { _id: 2, item: "notebook", qty: 50, tags: ["red", "blank"], dim_cm: [ 14, 21 ], size: { h: 14, w: 21, uom: "cm" }, status: "E" },
  4. { _id: 3, item: "paper", qty: 100, tags: ["red", "blank", "plain"], dim_cm: [ 14, 21 ], size: { h: 19, w: 22.85, uom: "cm" },status: "B" },
  5. { _id: 4, item: "planner", qty: 75, tags: ["blank", "red"], dim_cm: [ 22.85, 30 ], status: "C" },
  6. { _id: 5, item: "postcard", qty: 45, tags: ["blue"], dim_cm: [ 10, 15.25 ], status: "D" }
  7. ])
  8.  
  9. db.inventory.createIndexes([
  10. {qty: 1},
  11. {item: 1},
  12. {tags: 1},
  13. {size: 1},
  14. {status: 1}
  15. ])

Redis CLI

  1. JSON.SET inventory:1 . '{"item":"journal","qty":25,"tags":["blank","red"],"dim_cm":[14,21],"size":{"h":14,"w":21,"uom":"cm"},"status":"E"}'
  2. JSON.SET inventory:2 . '{"item":"notebook","qty":50,"tags":["red","blank"],"dim_cm":[14,21],"size":{"h":14,"w":21,"uom":"cm"},"status":"E"}'
  3. JSON.SET inventory:3 . '{"item":"paper","qty":100,"tags":["red","blank","plain"],"dim_cm":[14,21],"size":{"h":19,"w":22.85,"uom":"cm"},"status":"B"}'
  4. JSON.SET inventory:4 . '{"item":"planner","qty":75,"tags":["blank","red"],"dim_cm":[22.85,30],"status":"C"}'
  5. JSON.SET inventory:5 . '{"item":"postcard","qty":45,"tags":["blue"],"dim_cm":[10,15.25],"status":"D"}'
  6.  
  7. FT.CREATE inventoryIdx ON JSON PREFIX 1 inventory: SCHEMA $.item AS item TEXT $.qty AS qty NUMERIC $.tags.* AS tags TAG $.dim_cm[0] AS dim_cm_0 NUMERIC $.dim_cm[1] AS dim_cm_1 NUMERIC $.status AS status TEXT $.size.h AS sizeh NUMERIC $.size.w AS sizew NUMERIC $.size.uom AS sizeuom TEXT

node-redis (Javascript)

  1. for (let item of dataset) {
  2. await client.json.set(`inventory:${itemNum++}`, '.', item);
  3. }
  4. await client.ft.create('inventoryIdx', {
  5. '$.item': {
  6. type: SchemaFieldTypes.TEXT,
  7. AS: 'item'
  8. },
  9. '$.qty': {
  10. type: SchemaFieldTypes.NUMERIC,
  11. AS: 'qty'
  12. },
  13. '$.tags.*': {
  14. type: SchemaFieldTypes.TAG,
  15. AS: 'tags'
  16. },
  17. '$.dim_cm[0]': {
  18. type: SchemaFieldTypes.NUMERIC,
  19. AS: 'dim_cm_0'
  20. },
  21. '$.dim_cm[1]': {
  22. type: SchemaFieldTypes.NUMERIC,
  23. AS: 'dim_cm_1',
  24. },
  25. '$.status': {
  26. type: SchemaFieldTypes.TEXT,
  27. AS: 'status'
  28. },
  29. '$.size.h': {
  30. type: SchemaFieldTypes.NUMERIC,
  31. AS: 'sizeh'
  32. },
  33. '$.size.w': {
  34. type: SchemaFieldTypes.NUMERIC,
  35. AS: 'sizew'
  36. },
  37. '$.size.uom': {
  38. type: SchemaFieldTypes.TEXT,
  39. AS: 'sizeuom'
  40. }
  41. }, {
  42. ON: 'JSON',
  43. PREFIX: 'inventory:'
  44. });

redis-py (Python)

  1. itemNum = 1
  2.  
  3. for dataItem in dataset:
  4. client.json().set(f'inventory:{itemNum}', '.', dataItem)
  5. itemNum += 1
  6.  
  7. index_def = IndexDefinition(
  8. index_type=IndexType.JSON,
  9. prefix=['inventory:']
  10. )
  11. schema = ( TextField('$.item', as_name='item'),
  12. NumericField('$.qty', as_name='qty'),
  13. TagField('$.tags.*', as_name='tags'),
  14. NumericField('$.dim_cm[0]', as_name='dim_cm_0'),
  15. NumericField('$.dim_cm[1]', as_name='dim_cm_1'),
  16. TextField('$.status', as_name='status'),
  17. NumericField('$.size.h', as_name='sizeh'),
  18. NumericField('$.size.w', as_name='sizew'),
  19. TextField('$.size.uom', as_name='sizeuom')
  20. )
  21. client.ft('inventoryIdx').create_index(schema,definition=index_def)

Read Operation - Compound Query

Mongo Shell

  1. db.inventory.find( {
  2. status: "E",
  3. $or: [ { qty: { $lt: 30 } }, { item: /^pa/ } ]
  4. } )

Redis CLI

  1. FT.SEARCH inventoryIdx '(@status:E) ((@qty:[-inf (30])|(@item:pa*))'

node-redis (Javascript)

  1. await client.ft.search('inventoryIdx', '(@status:E) ((@qty:[-inf (30])|(@item:pa*))');

redis-py (Python)

  1. client.ft('inventoryIdx').search(Query('(@status:E) ((@qty:[-inf (30])|(@item:pa*))')).docs

Update Operation

Mongo Shell

  1. db.inventory.update(
  2. { _id: 2 },
  3. {
  4. $inc: { qty: 5 },
  5. $set: {
  6. item: "spiral notebook",
  7. }
  8. }
  9. )

Redis CLI

  1. MULTI
  2. JSON.NUMINCRBY inventory:2 '.qty' 5
  3. JSON.SET inventory:2 .item '"spiral notebook"'
  4. EXEC

node-redis (Javascript)

  1. await client
  2. .multi()
  3. .json.numIncrBy('inventory:2', '.qty', 5)
  4. .json.set('inventory:2', '.item', 'spiral notebook' )
  5. .exec();

redis-py (Python)

  1. pipe = client.pipeline()
  2. pipe.json().numincrby('inventory:2', '.qty', 5)
  3. pipe.json().set('inventory:2', '.item', 'spiral notebook')
  4. pipe.execute()

Delete Operation

Mongo Shell

  1. db.inventory.deleteOne( { "_id" : 2 } )

Redis CLI

  1. JSON.DEL inventory:2

node-redis (Javascript)

  1. await client.json.del('inventory:2');

redis-py (Python)

  1. client.json().delete('inventory:2')

Source (with more examples)

https://github.com/Redislabs-Solution-Architects/docdevdemo

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Monday, May 30, 2022

RedisJSON - CRUD Ops

Summary

I cover a shopping cart application use case in this post.  I implement a full CRUD set for the users, products, and carts using the RedisJSON module.

Architecture

The Server-side is implemented as an Express.js app with the node-redis module.  The test client is also Node-based with node-fetch used for the HTTP client and a random data generator I concocted with the uniqueNamesGenerator module.


Code Snippets

Random Data Generation

  1. static generateUser() {
  2. return {
  3. "userID": uuidv4(),
  4. "lastName": RandomData.#getLastName(),
  5. "firstName": RandomData.#getFirstName(),
  6. "street": RandomData.#getStreet(),
  7. "city": RandomData.#getCity(),
  8. "state": RandomData.#getState(),
  9. "zip": RandomData.#getZip()
  10. };
  11. };
  12. static #getFirstName() {
  13. return uniqueNamesGenerator({
  14. dictionaries:[names],
  15. length: 1
  16. });
  17. };
  18.  
  19. static #getLastName() {
  20. return uniqueNamesGenerator({
  21. dictionaries: [adjectives],
  22. length: 1,
  23. style: 'capital'
  24. });
  25. };


Create User - Client-side

  1. async function createUser(dbType, user) {
  2. const response = await fetch(`${SERVER.url}/${dbType}/user`, {
  3. method: 'POST',
  4. body: JSON.stringify(user),
  5. headers: {
  6. 'Content-Type': 'application/json',
  7. 'Authorization': AUTH
  8. }
  9. });
  10. return await response.json();
  11. };
  12.  
  13. const user = RandomData.generateUser();
  14. res = await createUser('redis', user);


Update Cart - Server-side

  1. app.patch('/:dbType/cart/:cartID', async (req, res) => {
  2. switch (req.params.dbType) {
  3. case 'redis':
  4. try {
  5. var client = await redisConnect();
  6. const updatedItem = req.body;
  7. const items = await client.json.get(`cart:${req.params.cartID}`, {path:'.items'});
  8. const newItems = [];
  9. let found = false
  10. for (let item of items) {
  11. if (updatedItem.sku == item.sku) {
  12. found = true;
  13. if (updatedItem.quantity == 0) {
  14. continue;
  15. }
  16. else {
  17. newItems.push(updatedItem)
  18. }
  19. break;
  20. }
  21. else {
  22. newItems.push(item);
  23. }
  24. }
  25. if (!found) {
  26. newItems.push(updatedItem)
  27. }
  28. const val = await client.json.set(`cart:${req.params.cartID}`, `.items`, newItems);
  29.  
  30. if (val == 'OK') {
  31. console.log(`200: Cart ${req.params.cartID} updated`);
  32. res.status(200).json({'cartID': req.params.cartID});
  33. }
  34. else {
  35. throw new Error(`Cart ${req.params.sku} not fully updated`);
  36. }
  37. }
  38. catch (err) {
  39. console.error(`400: ${err.message}`);
  40. res.status(400).json({error: err.message});
  41. }
  42. finally {
  43. await client.quit();
  44. };
  45. break;
  46. default:
  47. const msg = 'Unknown DB Type';
  48. console.error(`400: ${msg}`);
  49. res.status(400).json({error: msg});
  50. break;
  51. };
  52. });


Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Saturday, May 21, 2022

Azure Container Apps

Summary

This is a continuation of a previous post on proxying a SOAP API to REST.  In this post, I'll deploy the containerized proxy to Azure Container Apps and front end it with Azure API Management (APIM).

Architecture



Code

Proxy App

I modified the Python FastAPI app slightly to serve up an OpenAPI file.  That file is used by APIM during provisioning.

  1. from fastapi import FastAPI, HTTPException
  2. from fastapi.responses import FileResponse
  3. from zeep import Client
  4. import logging
  5.  
  6. logging.getLogger('zeep').setLevel(logging.ERROR)
  7. client = Client('https://www.w3schools.com/xml/tempconvert.asmx?wsdl')
  8. app = FastAPI()
  9.  
  10. @app.get("/openapi.yml")
  11. async def openapi():
  12. return FileResponse("openapi.yml")
  13.  
  14. @app.get("/CelsiusToFahrenheit")
  15. async def celsiusToFahrenheit(temp: int):
  16. try:
  17. soapResponse = client.service.CelsiusToFahrenheit(temp)
  18. fahrenheit = int(round(float(soapResponse),0))
  19. except:
  20. raise HTTPException(status_code=400, detail="SOAP request error")
  21. else:
  22. return {"temp": fahrenheit}
  23.  
  24.  
  25. @app.get("/FahrenheitToCelsius")
  26. async def fahrenheitToCelsius(temp: int):
  27. try:
  28. soapResponse = client.service.FahrenheitToCelsius(temp)
  29. celsius = int(round(float(soapResponse),0))
  30. except:
  31. raise HTTPException(status_code=400, detail="SOAP request error")
  32. else:
  33. return {"temp": celsius}

OpenAPI Spec


  1. swagger: '2.0'
  2. info:
  3. title: apiproxy
  4. description: REST to SOAP proxy
  5. version: 1.0.0
  6. schemes:
  7. - http
  8. produces:
  9. - application/json
  10. paths:
  11. /CelsiusToFahrenheit:
  12. get:
  13. summary: Convert celsius temp to fahrenheit
  14. parameters:
  15. - name: temp
  16. in: path
  17. required: true
  18. type: integer
  19. responses:
  20. '200':
  21. description: converted temp
  22. schema:
  23. type: object
  24. properties:
  25. temp:
  26. type: integer
  27. '400':
  28. description: General error
  29. /FahrenheitToCelsius:
  30. get:
  31. summary: Convert fahrenheit temp to celsius
  32. parameters:
  33. - name: temp
  34. in: path
  35. required: true
  36. type: integer
  37. responses:
  38. '200':
  39. description: converted temp
  40. schema:
  41. type: object
  42. properties:
  43. temp:
  44. type: integer
  45. '400':
  46. description: General error

Deployment


Create + Configure Azure Container Registry





Visual Studio Code - Build Image in Azure







Create + Configure Azure Container App






Execution

Deploy and Test Container App in APIM



Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Sunday, May 15, 2022

RedisTimeSeries

Summary

I'll be covering an IoT data feed use case in this post.  That data feed originates from a Tempest weather station and is stored on a Redis instance as TimeSeries data.  The application is implemented as a container on Google Cloud Run.  The Redis TimeSeries data is visualized using Grafana.

Architecture



Application


ExpressJS REST API Server

Very simple server-side app to start and stop the data flow.
  1. app.post('/start', async (req, res) => {
  2. try {
  3. if (!tc) {
  4. tc = new TempestClient();
  5. await tc.start();
  6. res.status(201).json({'message': 'success'});
  7. }
  8. else {
  9. throw new Error('tempest client already instantiated');
  10. }
  11. }
  12. catch (err) {
  13. res.status(400).json({error: err.message})
  14. };
  15. });
  16.  
  17. app.post('/stop', async (req, res) => {
  18. try {
  19. if (tc) {
  20. await tc.stop();
  21. tc = null;
  22. res.status(201).json({'message': 'success'});
  23. }
  24. else {
  25. throw new Error('tempest client does not exist');
  26. }
  27. }
  28. catch (err) {
  29. res.status(400).json({error: err.message})
  30. };
  31. });

Tempest Client

Weatherflow provides a published REST and Websocket API.  In this case, I used their Websocket interface to provide a 3-second feed of wind data from the weather station.

  1. async start() {
  2. if (!this.ts && !this.ws) {
  3. this.ts = new TimeSeriesClient(redis.user, redis.password, redis.url);
  4. await this.ts.connect();
  5. this.ws = new WebSocket(`${tempest.url}?token=${tempest.password}`);
  6.  
  7. this.ws.on('open', () => {
  8. console.log('Websocket opened');
  9. this.ws.send(JSON.stringify(this.wsRequest));
  10. });
  11. this.ws.on('message', async (data) => {
  12. const obj = JSON.parse(data);
  13. if ("ob" in obj) {
  14. const time = Date.now()
  15. const speed = Number(obj.ob[1] * MS_TO_MPH).toFixed(1);
  16. const direction = obj.ob[2];
  17. console.log(`time: ${time} speed: ${speed} direction: ${direction}`);
  18. await this.ts.update(tempest.deviceId, time, speed, direction);
  19. }
  20. });
  21.  
  22. this.ws.on('close', async () => {
  23. console.log('Websocket closed')
  24. await this.ts.quit();
  25. this.ts = null;
  26. this.ws = null;
  27. });
  28.  
  29. this.ws.on('error', async (err) => {
  30. await this.ts.quit();
  31. this.ws.close();
  32. this.ts = null;
  33. this.ws = null;
  34. console.error('ws err: ' + err);
  35. });
  36. }
  37. }
  38.  
  39. async stop() {
  40. this.ws.close();
  41. }

Redis TimeSeries Client

I used the Node-Redis client to implement a function that performs a TimeSeries Add.  
  1. async update(deviceId, time, speed, direction) {
  2. await this.client.ts.add(`wind_direction:${deviceId}`, time, direction);
  3. await this.client.ts.add(`wind_speed:${deviceId}`, time, speed);
  4. }

Deployment

Dockerfile


  1. FROM node:18-slim
  2. WORKDIR /usr/src/app
  3. COPY package*.json ./
  4. RUN npm install
  5. COPY . .
  6. EXPOSE 8080
  7. CMD ["npm", "start"]

Redis Cloud + Insight




Google Cloud Code Integration with VS Code

The app container is deployed to Cloud Run using the Cloud Code tools.








Grafana Data Connection to Redis



Execution

CURL POST To Start Data Flow


curl -X POST https://redis-demo-y6pby4qk2a-uc.a.run.app/start -u yourUser:yourPassword

Redis Insight Real-time Feed


Cloud Run Console



Grafana Dashboard



Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Stable Marriage Problem - Python

 Summary

This short post covers a Python implementation of the Gale-Shapely algorithm for the Stable Marriage Problem (SMP).  Stable matching is a well-studied problem in multiple fields.  It has applications in nearly any two-sided market scenario.

Code Snippets

Preference Generator

Below is a Python function to generate a random set of preferences for two classes.  Those preferences are then subsequently used by Gale-Shapley to determine a stable matching.
  1. def generate_prefs(class1, class2):
  2. if len(class1) != len(class2):
  3. raise Exception("Invalid input: unequal list sizes")
  4.  
  5. prefs = {}
  6. for item in class1:
  7. random.shuffle(class2)
  8. prefs[item] = class2.copy()
  9. for item in class2:
  10. random.shuffle(class1)
  11. prefs[item] = class1.copy()
  12.  
  13. return dict(sorted(prefs.items()))

Gale-Shapley

  1. def gale_shapley(prefs, proposers):
  2. matches = []
  3. while len(proposers) > 0: #terminating condition - all proposers are matched
  4. proposer = proposers.pop(0) #Each round - proposer is popped from the free list
  5. proposee = prefs[proposer].pop(0) #Each round - the proposer's top preference is popped
  6. matchLen= len(matches)
  7. found = False
  8. for index in range(matchLen):
  9. match = matches[index]
  10. if proposee in match: #proposee is already matched
  11. found = True
  12. temp = match.copy()
  13. temp.remove(proposee)
  14. matchee = temp.pop()
  15. if prefs[proposee].index(proposer) < prefs[proposee].index(matchee): #proposer is a higher preference
  16. matches.remove(match) #remove old match
  17. matches.append([proposer, proposee]) #create new match with proposer
  18. proposers.append(matchee) #add the previous proposer to the free list of proposers
  19. else:
  20. proposers.append(proposer) #proposer wasn't a higher prefence, so gets put back on free list
  21. break
  22. else:
  23. continue
  24. if not found: #proposee was not previously matched so is automatically matched to proposer
  25. matches.append([proposer, proposee])
  26. else:
  27. continue
  28. return matches

Output

Below is a sample run with two three-member classes: (a1, a2, a3) and (b1, b2, b3).
Preferences
{'a1': ['b2', 'b3', 'b1'],
 'a2': ['b3', 'b2', 'b1'],
 'a3': ['b1', 'b2', 'b3'],
 'b1': ['a2', 'a1', 'a3'],
 'b2': ['a1', 'a3', 'a2'],
 'b3': ['a3', 'a1', 'a2']}

Matches
[['a3', 'b1'], ['a1', 'b2'], ['a2', 'b3']]

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Sunday, February 20, 2022

API Proxy - Docker

Summary

This post covers a machine-to-machine use case where the provider has only a SOAP interface to their services but the client can only support a REST interface.  One possible solution to that dilemma is to deploy middleware in the form of a proxy between the two entities.

Concept

The diagram below depicts overall the architectural concept.  The SOAP API is mediated by a REST API.



App Architecture

The REST proxy is implemented via the FastAPI framework.  SOAP interactions are managed via the Zeep package.  The HTTP server is provided by Uvicorn.  The entire package is deployed as a container via Docker.  The SOAP service being used here is simple/toy example here.


Environment Set Up

I'm using Visual Studio Code to develop this in Python.  A Python virtual environment can be set up with the command below:

python3 -m venv env

Code

The Python code below implements a REST proxy for two different SOAP endpoints.

  1. client = Client('https://www.w3schools.com/xml/tempconvert.asmx?wsdl')
  2. app = FastAPI()
  3.  
  4. @app.get("/CelsiusToFahrenheit")
  5. async def celsiusToFahrenheit(temp: int):
  6. try:
  7. soapResponse = client.service.CelsiusToFahrenheit(temp)
  8. fahrenheit = int(round(float(soapResponse),0))
  9. except:
  10. raise HTTPException(status_code=400, detail="SOAP request error")
  11. else:
  12. return {"temp": fahrenheit}
  13.  
  14.  
  15. @app.get("/FahrenheitToCelsius")
  16. async def fahrenheitToCelsius(temp: int):
  17. try:
  18. soapResponse = client.service.FahrenheitToCelsius(temp)
  19. celsius = int(round(float(soapResponse),0))
  20. except:
  21. raise HTTPException(status_code=400, detail="SOAP request error")
  22. else:
  23. return {"temp": celsius}

Container Set Up

Python library requirements for the app are stored to a text file with the command below:

pip freeze > requirements.txt
Docker file below:

  1. FROM python:3.9-slim
  2. COPY proxy.py proxy.py
  3. COPY requirements.txt requirements.txt
  4. RUN pip install --no-cache-dir --upgrade -r requirements.txt
  5. CMD ["uvicorn", "proxy:app", "--host", "0.0.0.0", "--port", "80"]
Screenshot of the resulting container in Studio Code's Docker tool:

Execution


$ curl 'http://localhost/FahrenheitToCelsius?temp=32'
{"temp":0}

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.