Summary
In this post, I'll demonstrate how to implement caching with Redis against a Mongo database. All code is Python using the redis-py client lib. This is implemented as a REST API server using the Fast API module.
Architecture
High Level
Detailed
Data Set
A snippet of the data set is below
[ { "airport": { "code": "ATL", "name": "Atlanta, GA: Hartsfield-Jackson Atlanta International" }, "time": { "label": "2003/06", "month": 6, "month_name": "June", "year": 2003 }, "statistics": { "num_delays": { "carrier": 1009, "late_aircraft": 1275, "national_aviation_system": 3217, "security": 17, "weather": 328 }, "carriers": { "names": [ "American Airlines Inc.", "JetBlue Airways", "Continental Air Lines Inc.", "Delta Air Lines Inc.", "Atlantic Southeast Airlines", "AirTran Airways Corporation", "America West Airlines Inc.", "Northwest Airlines Inc.", "ExpressJet Airlines Inc.", "United Air Lines Inc.", "US Airways Inc." ], "total": 11 }, "flights": { "canceled": 216, "delayed": 5843, "diverted": 27, "on_time": 23974, "total": 30060 }, "minutes_delayed": { "carrier": 61606, "late_aircraft": 68335, "national_aviation_system": 118831, "security": 518, "total": 268764, "weather": 19474 } } },
Data Loading
async def startup_event(): #mongo startup """Reset the db to an empty state and then load data from file """ col = mdb.airlines.delays col.drop() with open('data.json') as json_file: data = load(json_file) await col.insert_many(data) await col.create_index([('airport.code', 1),('time.year', 1),('time.month', 1)]) #redis startup """Empty cache and rebuild index. """ await rdb.flushdb() idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['delayStat:']) schema = (TextField('$.airport.code', as_name='airport', sortable=True), NumericField('$.time.year', as_name='year', sortable=True), NumericField('$.time.month', as_name='month', sortable=True)) await rdb.ft('idx').create_index(schema, definition=idx_def)
Data Fetch Logic
async def get_cancellations(airport: str, year: int, month: int): try: result, duration = await time_func(rdb.ft('idx').search, Query(f"(@airport:{airport}) (@year:[{year} {year}]) (@month:[{month} {month}])")) if len(result.docs) > 0: # cache hit jsonval = loads(result.docs[0].json) metrics.incr_hits(duration) json_res = {"result": jsonval['statistics']['flights']['canceled']} else: # cache miss col = mdb.airlines.delays lock = await lock_mgr.lock(f"lock:{airport}:{year}:{month}") #fine-grained, distributed lock result, duration = await time_func(col.find_one, { "airport.code": airport, "time.year": {"$eq": year}, "time.month": {"$eq": month} }) metrics.incr_misses(duration) if result: id = result.pop('_id') # this field can't be serialized and needs to be removed await rdb.json().set(f"delayStat:{id}", '$', result) #add val to cache and set TTL to 1 hour await rdb.expire(f"delayStat:{id}", 3600) json_res = {"result": result['statistics']['flights']['canceled']} else: # not found in cache or db raise HTTPException(status_code=404, detail=f"Data not found for {airport} {year} {month}") except Exception as err: if type(err) == HTTPException: raise err else: raise HTTPException(status_code=400, detail=str(err)) else: return json_res finally: if lock and lock.valid: await lock_mgr.unlock(lock)
Source
Copyright ©1993-2024 Joey E Whelan, All rights reserved.