Summary
In this post, I'll demonstrate how to implement caching with Redis against a Mongo database. All code is Python using the redis-py client lib. This is implemented as a REST API server using the Fast API module.
Architecture
High Level
Detailed
Data Set
A snippet of the data set is below
- [
- {
- "airport": {
- "code": "ATL",
- "name": "Atlanta, GA: Hartsfield-Jackson Atlanta International"
- },
- "time": {
- "label": "2003/06",
- "month": 6,
- "month_name": "June",
- "year": 2003
- },
- "statistics": {
- "num_delays": {
- "carrier": 1009,
- "late_aircraft": 1275,
- "national_aviation_system": 3217,
- "security": 17,
- "weather": 328
- },
- "carriers": {
- "names": [
- "American Airlines Inc.",
- "JetBlue Airways",
- "Continental Air Lines Inc.",
- "Delta Air Lines Inc.",
- "Atlantic Southeast Airlines",
- "AirTran Airways Corporation",
- "America West Airlines Inc.",
- "Northwest Airlines Inc.",
- "ExpressJet Airlines Inc.",
- "United Air Lines Inc.",
- "US Airways Inc."
- ],
- "total": 11
- },
- "flights": {
- "canceled": 216,
- "delayed": 5843,
- "diverted": 27,
- "on_time": 23974,
- "total": 30060
- },
- "minutes_delayed": {
- "carrier": 61606,
- "late_aircraft": 68335,
- "national_aviation_system": 118831,
- "security": 518,
- "total": 268764,
- "weather": 19474
- }
- }
- },
Data Loading
- async def startup_event():
- #mongo startup
- """Reset the db to an empty state and then load data from file
- """
- col = mdb.airlines.delays
- col.drop()
- with open('data.json') as json_file:
- data = load(json_file)
- await col.insert_many(data)
- await col.create_index([('airport.code', 1),('time.year', 1),('time.month', 1)])
- #redis startup
- """Empty cache and rebuild index.
- """
- await rdb.flushdb()
- idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['delayStat:'])
- schema = (TextField('$.airport.code', as_name='airport', sortable=True),
- NumericField('$.time.year', as_name='year', sortable=True),
- NumericField('$.time.month', as_name='month', sortable=True))
- await rdb.ft('idx').create_index(schema, definition=idx_def)
Data Fetch Logic
- async def get_cancellations(airport: str, year: int, month: int):
- try:
- result, duration = await time_func(rdb.ft('idx').search,
- Query(f"(@airport:{airport}) (@year:[{year} {year}]) (@month:[{month} {month}])"))
- if len(result.docs) > 0: # cache hit
- jsonval = loads(result.docs[0].json)
- metrics.incr_hits(duration)
- json_res = {"result": jsonval['statistics']['flights']['canceled']}
- else: # cache miss
- col = mdb.airlines.delays
- lock = await lock_mgr.lock(f"lock:{airport}:{year}:{month}") #fine-grained, distributed lock
- result, duration = await time_func(col.find_one,
- { "airport.code": airport,
- "time.year": {"$eq": year},
- "time.month": {"$eq": month}
- })
- metrics.incr_misses(duration)
- if result:
- id = result.pop('_id') # this field can't be serialized and needs to be removed
- await rdb.json().set(f"delayStat:{id}", '$', result) #add val to cache and set TTL to 1 hour
- await rdb.expire(f"delayStat:{id}", 3600)
- json_res = {"result": result['statistics']['flights']['canceled']}
- else: # not found in cache or db
- raise HTTPException(status_code=404, detail=f"Data not found for {airport} {year} {month}")
- except Exception as err:
- if type(err) == HTTPException:
- raise err
- else:
- raise HTTPException(status_code=400, detail=str(err))
- else:
- return json_res
- finally:
- if lock and lock.valid:
- await lock_mgr.unlock(lock)
Source
Copyright ©1993-2024 Joey E Whelan, All rights reserved.