Saturday, October 15, 2022

Redis Lookaside Cache with MongoDB


In this post, I'll demonstrate how to implement caching with Redis against a Mongo database.  All code is Python using the redis-py client lib.  This is implemented as a REST API server using the Fast API module.


High Level


Data Set

A snippet of the data set is below

        "airport": {
            "code": "ATL",
            "name": "Atlanta, GA: Hartsfield-Jackson Atlanta International"
        "time": {
            "label": "2003/06",
            "month": 6,
            "month_name": "June",
            "year": 2003
        "statistics": {
            "num_delays": {
                "carrier": 1009,
                "late_aircraft": 1275,
                "national_aviation_system": 3217,
                "security": 17,
                "weather": 328
            "carriers": {
                "names": [
                    "American Airlines Inc.",
                    "JetBlue Airways",
                    "Continental Air Lines Inc.",
                    "Delta Air Lines Inc.",
                    "Atlantic Southeast Airlines",
                    "AirTran Airways Corporation",
                    "America West Airlines Inc.",
                    "Northwest Airlines Inc.",
                    "ExpressJet Airlines Inc.",
                    "United Air Lines Inc.",
                    "US Airways Inc."
                "total": 11
            "flights": {
                "canceled": 216,
                "delayed": 5843,
                "diverted": 27,
                "on_time": 23974,
                "total": 30060
            "minutes_delayed": {
                "carrier": 61606,
                "late_aircraft": 68335,
                "national_aviation_system": 118831,
                "security": 518,
                "total": 268764,
                "weather": 19474

Data Loading

async def startup_event():
    #mongo startup
    """Reset the db to an empty state and then load data from file
    col = mdb.airlines.delays
    with open('data.json') as json_file:
        data = load(json_file)
    await col.insert_many(data)
    await col.create_index([('airport.code', 1),('time.year', 1),('time.month', 1)])
    #redis startup
    """Empty cache and rebuild index.
    await rdb.flushdb()
    idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['delayStat:'])
    schema = (TextField('$.airport.code', as_name='airport', sortable=True),
                NumericField('$.time.year', as_name='year', sortable=True),
                NumericField('$.time.month', as_name='month', sortable=True))
    await rdb.ft('idx').create_index(schema, definition=idx_def)

Data Fetch Logic

async def get_cancellations(airport: str, year: int, month: int):
        result, duration = await time_func(rdb.ft('idx').search,
                Query(f"(@airport:{airport}) (@year:[{year} {year}]) (@month:[{month} {month}])"))

        if len( > 0:  # cache hit
            jsonval = loads([0].json)
            json_res = {"result": jsonval['statistics']['flights']['canceled']}
        else:  # cache miss
            col = mdb.airlines.delays
            lock = await lock_mgr.lock(f"lock:{airport}:{year}:{month}")  #fine-grained, distributed lock
            result, duration = await time_func(col.find_one, 
                { "airport.code": airport,
                    "time.year": {"$eq": year},
                    "time.month": {"$eq": month}
            if result:
                id = result.pop('_id')  # this field can't be serialized and needs to be removed
                await rdb.json().set(f"delayStat:{id}", '$', result)  #add val to cache and set TTL to 1 hour
                await rdb.expire(f"delayStat:{id}", 3600)
                json_res = {"result": result['statistics']['flights']['canceled']}
            else: # not found in cache or db
                raise HTTPException(status_code=404, detail=f"Data not found for {airport} {year} {month}")
    except Exception as err:
        if type(err) == HTTPException:
            raise err
            raise HTTPException(status_code=400, detail=str(err))
        return json_res
        if lock and lock.valid:
            await lock_mgr.unlock(lock)


Copyright ©1993-2024 Joey E Whelan, All rights reserved.