Saturday, October 15, 2022

Redis Lookaside Cache with MongoDB

Summary

In this post, I'll demonstrate how to implement caching with Redis against a Mongo database.  All code is Python using the redis-py client lib.  This is implemented as a REST API server using the Fast API module.

Architecture

High Level


Detailed


Data Set

A snippet of the data set is below

 
[
    {
        "airport": {
            "code": "ATL",
            "name": "Atlanta, GA: Hartsfield-Jackson Atlanta International"
        },
        "time": {
            "label": "2003/06",
            "month": 6,
            "month_name": "June",
            "year": 2003
        },
        "statistics": {
            "num_delays": {
                "carrier": 1009,
                "late_aircraft": 1275,
                "national_aviation_system": 3217,
                "security": 17,
                "weather": 328
            },
            "carriers": {
                "names": [
                    "American Airlines Inc.",
                    "JetBlue Airways",
                    "Continental Air Lines Inc.",
                    "Delta Air Lines Inc.",
                    "Atlantic Southeast Airlines",
                    "AirTran Airways Corporation",
                    "America West Airlines Inc.",
                    "Northwest Airlines Inc.",
                    "ExpressJet Airlines Inc.",
                    "United Air Lines Inc.",
                    "US Airways Inc."
                ],
                "total": 11
            },
            "flights": {
                "canceled": 216,
                "delayed": 5843,
                "diverted": 27,
                "on_time": 23974,
                "total": 30060
            },
            "minutes_delayed": {
                "carrier": 61606,
                "late_aircraft": 68335,
                "national_aviation_system": 118831,
                "security": 518,
                "total": 268764,
                "weather": 19474
            }
        }
    },

Data Loading


 
async def startup_event():
    #mongo startup
    """Reset the db to an empty state and then load data from file
    """
    col = mdb.airlines.delays
    col.drop()  
    with open('data.json') as json_file:
        data = load(json_file)
    await col.insert_many(data)
    await col.create_index([('airport.code', 1),('time.year', 1),('time.month', 1)])
    
    #redis startup
    """Empty cache and rebuild index.
    """
    await rdb.flushdb()
    idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['delayStat:'])
    schema = (TextField('$.airport.code', as_name='airport', sortable=True),
                NumericField('$.time.year', as_name='year', sortable=True),
                NumericField('$.time.month', as_name='month', sortable=True))
    await rdb.ft('idx').create_index(schema, definition=idx_def)

Data Fetch Logic


 
async def get_cancellations(airport: str, year: int, month: int):
    try:
        result, duration = await time_func(rdb.ft('idx').search,
                Query(f"(@airport:{airport}) (@year:[{year} {year}]) (@month:[{month} {month}])"))

        if len(result.docs) > 0:  # cache hit
            jsonval = loads(result.docs[0].json)
            metrics.incr_hits(duration)
            json_res = {"result": jsonval['statistics']['flights']['canceled']}
        else:  # cache miss
            col = mdb.airlines.delays
            lock = await lock_mgr.lock(f"lock:{airport}:{year}:{month}")  #fine-grained, distributed lock
            result, duration = await time_func(col.find_one, 
                { "airport.code": airport,
                    "time.year": {"$eq": year},
                    "time.month": {"$eq": month}
            })
            metrics.incr_misses(duration)
            if result:
                id = result.pop('_id')  # this field can't be serialized and needs to be removed
                await rdb.json().set(f"delayStat:{id}", '$', result)  #add val to cache and set TTL to 1 hour
                await rdb.expire(f"delayStat:{id}", 3600)
                json_res = {"result": result['statistics']['flights']['canceled']}
            else: # not found in cache or db
                raise HTTPException(status_code=404, detail=f"Data not found for {airport} {year} {month}")
    except Exception as err:
        if type(err) == HTTPException:
            raise err
        else:
            raise HTTPException(status_code=400, detail=str(err))
    else:
        return json_res
    finally:
        if lock and lock.valid:
            await lock_mgr.unlock(lock)

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.