Saturday, October 15, 2022

Redis Lookaside Cache with MongoDB

Summary

In this post, I'll demonstrate how to implement caching with Redis against a Mongo database.  All code is Python using the redis-py client lib.  This is implemented as a REST API server using the Fast API module.

Architecture

High Level


Detailed


Data Set

A snippet of the data set is below

  1. [
  2. {
  3. "airport": {
  4. "code": "ATL",
  5. "name": "Atlanta, GA: Hartsfield-Jackson Atlanta International"
  6. },
  7. "time": {
  8. "label": "2003/06",
  9. "month": 6,
  10. "month_name": "June",
  11. "year": 2003
  12. },
  13. "statistics": {
  14. "num_delays": {
  15. "carrier": 1009,
  16. "late_aircraft": 1275,
  17. "national_aviation_system": 3217,
  18. "security": 17,
  19. "weather": 328
  20. },
  21. "carriers": {
  22. "names": [
  23. "American Airlines Inc.",
  24. "JetBlue Airways",
  25. "Continental Air Lines Inc.",
  26. "Delta Air Lines Inc.",
  27. "Atlantic Southeast Airlines",
  28. "AirTran Airways Corporation",
  29. "America West Airlines Inc.",
  30. "Northwest Airlines Inc.",
  31. "ExpressJet Airlines Inc.",
  32. "United Air Lines Inc.",
  33. "US Airways Inc."
  34. ],
  35. "total": 11
  36. },
  37. "flights": {
  38. "canceled": 216,
  39. "delayed": 5843,
  40. "diverted": 27,
  41. "on_time": 23974,
  42. "total": 30060
  43. },
  44. "minutes_delayed": {
  45. "carrier": 61606,
  46. "late_aircraft": 68335,
  47. "national_aviation_system": 118831,
  48. "security": 518,
  49. "total": 268764,
  50. "weather": 19474
  51. }
  52. }
  53. },

Data Loading


  1. async def startup_event():
  2. #mongo startup
  3. """Reset the db to an empty state and then load data from file
  4. """
  5. col = mdb.airlines.delays
  6. col.drop()
  7. with open('data.json') as json_file:
  8. data = load(json_file)
  9. await col.insert_many(data)
  10. await col.create_index([('airport.code', 1),('time.year', 1),('time.month', 1)])
  11. #redis startup
  12. """Empty cache and rebuild index.
  13. """
  14. await rdb.flushdb()
  15. idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['delayStat:'])
  16. schema = (TextField('$.airport.code', as_name='airport', sortable=True),
  17. NumericField('$.time.year', as_name='year', sortable=True),
  18. NumericField('$.time.month', as_name='month', sortable=True))
  19. await rdb.ft('idx').create_index(schema, definition=idx_def)

Data Fetch Logic


  1. async def get_cancellations(airport: str, year: int, month: int):
  2. try:
  3. result, duration = await time_func(rdb.ft('idx').search,
  4. Query(f"(@airport:{airport}) (@year:[{year} {year}]) (@month:[{month} {month}])"))
  5.  
  6. if len(result.docs) > 0: # cache hit
  7. jsonval = loads(result.docs[0].json)
  8. metrics.incr_hits(duration)
  9. json_res = {"result": jsonval['statistics']['flights']['canceled']}
  10. else: # cache miss
  11. col = mdb.airlines.delays
  12. lock = await lock_mgr.lock(f"lock:{airport}:{year}:{month}") #fine-grained, distributed lock
  13. result, duration = await time_func(col.find_one,
  14. { "airport.code": airport,
  15. "time.year": {"$eq": year},
  16. "time.month": {"$eq": month}
  17. })
  18. metrics.incr_misses(duration)
  19. if result:
  20. id = result.pop('_id') # this field can't be serialized and needs to be removed
  21. await rdb.json().set(f"delayStat:{id}", '$', result) #add val to cache and set TTL to 1 hour
  22. await rdb.expire(f"delayStat:{id}", 3600)
  23. json_res = {"result": result['statistics']['flights']['canceled']}
  24. else: # not found in cache or db
  25. raise HTTPException(status_code=404, detail=f"Data not found for {airport} {year} {month}")
  26. except Exception as err:
  27. if type(err) == HTTPException:
  28. raise err
  29. else:
  30. raise HTTPException(status_code=400, detail=str(err))
  31. else:
  32. return json_res
  33. finally:
  34. if lock and lock.valid:
  35. await lock_mgr.unlock(lock)

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.