Saturday, October 15, 2022

Web Crawler with Redis Indexing

Summary

I'll be demonstrating a simple web crawler implementation in Nodejs in this post.  The crawled results are then stored in Redis and indexed with RediSearch.  Apache Tika is used for document parsing.

Architecture

High Level


Detailed


Crawler POST End Point


 
/**
 * Crawl endpoint.  Starts a Worker thread (spider.js) that crawls the given fqdn, extracts text via Tika,
 * and then store the resulting text in Redis as JSON documents
 * This returns immediately and provides a taskID for the Worker thread. 
 */
app.post('/crawl', (req, res) => {
    console.log(`app - POST /crawl ${req.body.fqdn}`);
    const taskID = uuidv4();
    try {
        new Worker('./app/spider.js', { workerData : { 'fqdn': req.body.fqdn, 'taskID': taskID }});
        res.status(201).json({'taskID': taskID});
    }
    catch (err) {
        console.error(`app - POST /crawl ${req.body.fqdn} - ${err.message}`)
        res.status(400).json({ 'error': err.message });
    }
});

Text Extraction with Tika

    async extract(doc, data, hash) {
        const stream = Readable.from(data);  //get a stream from the arrayBuffer obj
        const response = await axios({  //send that stream to Tika for automatic mime-type detection and text extraction
            method: 'PUT',
            url: `${tikaUrl}/tika`,
            data: stream,
            responseType: 'text',
            headers: {
                'Content-Type': 'application/octet-stream',
                'Accept': 'text/plain'
            }
        });
        const json = { "doc": doc, "text": response.data, "hash": hash };
        await this.client.json.set(`${PREFIX}:${doc}`, '.', json);
    }

Index Creation

async function buildIndex() {
    console.log(`app - buildIndex`);
    let rc = await clientFactory();
    try {
        await rc.ft.create('docIdx', {
            '$.doc': {
                type: redis.SchemaFieldTypes.TEXT,
                AS: 'doc'
            },
            '$.text': {
                type: redis.SchemaFieldTypes.TEXT,
                AS: 'text'
            }   
        }, {
            ON: 'JSON',
            PREFIX: 'DOC'
        });
    }
    catch(err) { 
        console.error(`app - buildIndex - ${err.message}`); 
    }
}

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Redis Lookaside Cache with MongoDB

Summary

In this post, I'll demonstrate how to implement caching with Redis against a Mongo database.  All code is Python using the redis-py client lib.  This is implemented as a REST API server using the Fast API module.

Architecture

High Level


Detailed


Data Set

A snippet of the data set is below

 
[
    {
        "airport": {
            "code": "ATL",
            "name": "Atlanta, GA: Hartsfield-Jackson Atlanta International"
        },
        "time": {
            "label": "2003/06",
            "month": 6,
            "month_name": "June",
            "year": 2003
        },
        "statistics": {
            "num_delays": {
                "carrier": 1009,
                "late_aircraft": 1275,
                "national_aviation_system": 3217,
                "security": 17,
                "weather": 328
            },
            "carriers": {
                "names": [
                    "American Airlines Inc.",
                    "JetBlue Airways",
                    "Continental Air Lines Inc.",
                    "Delta Air Lines Inc.",
                    "Atlantic Southeast Airlines",
                    "AirTran Airways Corporation",
                    "America West Airlines Inc.",
                    "Northwest Airlines Inc.",
                    "ExpressJet Airlines Inc.",
                    "United Air Lines Inc.",
                    "US Airways Inc."
                ],
                "total": 11
            },
            "flights": {
                "canceled": 216,
                "delayed": 5843,
                "diverted": 27,
                "on_time": 23974,
                "total": 30060
            },
            "minutes_delayed": {
                "carrier": 61606,
                "late_aircraft": 68335,
                "national_aviation_system": 118831,
                "security": 518,
                "total": 268764,
                "weather": 19474
            }
        }
    },

Data Loading


 
async def startup_event():
    #mongo startup
    """Reset the db to an empty state and then load data from file
    """
    col = mdb.airlines.delays
    col.drop()  
    with open('data.json') as json_file:
        data = load(json_file)
    await col.insert_many(data)
    await col.create_index([('airport.code', 1),('time.year', 1),('time.month', 1)])
    
    #redis startup
    """Empty cache and rebuild index.
    """
    await rdb.flushdb()
    idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['delayStat:'])
    schema = (TextField('$.airport.code', as_name='airport', sortable=True),
                NumericField('$.time.year', as_name='year', sortable=True),
                NumericField('$.time.month', as_name='month', sortable=True))
    await rdb.ft('idx').create_index(schema, definition=idx_def)

Data Fetch Logic


 
async def get_cancellations(airport: str, year: int, month: int):
    try:
        result, duration = await time_func(rdb.ft('idx').search,
                Query(f"(@airport:{airport}) (@year:[{year} {year}]) (@month:[{month} {month}])"))

        if len(result.docs) > 0:  # cache hit
            jsonval = loads(result.docs[0].json)
            metrics.incr_hits(duration)
            json_res = {"result": jsonval['statistics']['flights']['canceled']}
        else:  # cache miss
            col = mdb.airlines.delays
            lock = await lock_mgr.lock(f"lock:{airport}:{year}:{month}")  #fine-grained, distributed lock
            result, duration = await time_func(col.find_one, 
                { "airport.code": airport,
                    "time.year": {"$eq": year},
                    "time.month": {"$eq": month}
            })
            metrics.incr_misses(duration)
            if result:
                id = result.pop('_id')  # this field can't be serialized and needs to be removed
                await rdb.json().set(f"delayStat:{id}", '$', result)  #add val to cache and set TTL to 1 hour
                await rdb.expire(f"delayStat:{id}", 3600)
                json_res = {"result": result['statistics']['flights']['canceled']}
            else: # not found in cache or db
                raise HTTPException(status_code=404, detail=f"Data not found for {airport} {year} {month}")
    except Exception as err:
        if type(err) == HTTPException:
            raise err
        else:
            raise HTTPException(status_code=400, detail=str(err))
    else:
        return json_res
    finally:
        if lock and lock.valid:
            await lock_mgr.unlock(lock)

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

RediSearch - ioredis client lib

Summary

I'll be demonstrating various Redis Search operations with the ioredis (Nodejs) client lib in this post.

Data Set Creation

A random set of objects are created and are then saved in Redis as hash sets.
    const pipeline = client.pipeline();
    for (let i=0; i < NUM; i++) {
        const colors = (COLORS.sort(() => .5 - Math.random())).slice(0, Math.floor(Math.random() * COLORS.length))
        const fields = {
            'textField': `text${Math.floor(Math.random() * NUM)}`, 
            'numericField': Math.floor(Math.random() * NUM), 
            'tagField': colors
        };
        await pipeline.hmset(`item:${i}`, fields);
    }
    await pipeline.exec();

Index Creation

    await client.call('FT.CREATE', 'idx', 'ON', 'HASH', 'PREFIX', '1', 'item:', 'SCHEMA', 
        'textField', 'TEXT', 'SORTABLE',
        'numericField', 'NUMERIC', 'SORTABLE',
        'tagField', 'TAG'
    );

Search Operations

	//Search for exact match in a Text field
    let result = await client.call('FT.SEARCH', 'idx', '@textField:text1');

    //Search for a range in a Numeric field
    result = await client.call('FT.SEARCH', 'idx', '@numericField:[1,3]');

    //Search for a match in a Tag Field
    result = await client.call('FT.SEARCH', 'idx', '@tagField:{blue}');

Aggregation Operations

    //Aggregate on a text field across all hashes in the index
    let result = await client.call('FT.AGGREGATE', 'idx', '*', 'GROUPBY', '1', '@textField', 
        'REDUCE', 'COUNT', '0', 'AS', 'CNT');
        
    //Search on a numeric range and then apply the SQRT function to a numeric field in the matches
    const upper = Math.floor(Math.random() * NUM) + 1;
    result = await client.call('FT.AGGREGATE', 'idx', `@numericField:[0,${upper}]`, 
        'APPLY', 'SQRT(@numericField)', 'AS', 'SQRT');
        
    
    //Search for logical OR of two tag values and use a CURSOR.  Limit return
    //to 2 values to illustrate looping on a cursor.
    result = await client.call('FT.AGGREGATE', 'idx', '@tagField:{ yellow | red }', 
        'LOAD', '3', '@textField', '@numericField', '@tagField',
        'WITHCURSOR', 'COUNT', '2'
    );
    console.log('FT.AGGREGATE idx @tagField:{ yellow | red } LOAD 3 @textField @numericField @tagField WITHCURSOR COUNT 2');
    let items = result[0];
    let cursor = result[1];
    while (true) {
        for (let item of items) {
            if (Array.isArray(item)) {
                console.log(JSON.stringify(item))
            }
        }
        if (cursor) {
            result = await client.call('FT.CURSOR', 'READ', 'idx', cursor, 'COUNT', '2');
            items = result[0];
            cursor = result[1];
        }
        else {
            break;
        }
    }

Index Alteration

    await client.call('FT.ALTER', 'idx', 'SCHEMA', 'ADD', 'newField', 'TEXT', 'SORTABLE');
    let result = await client.call('FT.INFO', 'idx');

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Redis + Mongo Document (JSON/Search) Commands Comparison

Summary

This post will demonstrate equivalent Create, Read, Update and Delete (CRUD) commands in MongoDB and Redis.  I provide examples from Mongo's shell (mongosh), Redis CLI, node-redis client lib, and redis-py client lib.

Sample Data Set

Below is a snippet of the JSON document that is used in all the examples.

[
    { 
        "item": "journal", 
        "qty": 25, 
        "tags": ["blank", "red"], 
        "dim_cm": [ 14, 21 ], 
        "size": { 
            "h": 14, 
            "w": 21, 
            "uom": "cm"
        }, 
        "status": "E"  
    },
    { 
        "item": "notebook", 
        "qty": 50, 
        "tags": ["red", "blank"], 
        "dim_cm": [ 14, 21 ], 
        "size": { 
            "h": 14, 
            "w": 21, 
            "uom": "cm" 
        }, 
        "status": "E" 
    },

Create Operation

Mongo Shell

db.inventory.insertMany([
{ _id: 1, item: "journal", qty: 25, tags: ["blank", "red"], dim_cm: [ 14, 21 ], size: { h: 14, w: 21, uom: "cm" }, status: "E"  },
{ _id: 2, item: "notebook", qty: 50, tags: ["red", "blank"], dim_cm: [ 14, 21 ], size: { h: 14, w: 21, uom: "cm" }, status: "E" },
{ _id: 3, item: "paper", qty: 100, tags: ["red", "blank", "plain"], dim_cm: [ 14, 21 ], size: { h: 19, w: 22.85, uom: "cm" },status: "B"  },
{ _id: 4, item: "planner", qty: 75, tags: ["blank", "red"], dim_cm: [ 22.85, 30 ], status: "C" },
{ _id: 5, item: "postcard", qty: 45, tags: ["blue"], dim_cm: [ 10, 15.25 ], status: "D" }
])

db.inventory.createIndexes([
{qty: 1},
{item: 1},
{tags: 1},
{size: 1},
{status: 1}
])

Redis CLI

JSON.SET inventory:1 . '{"item":"journal","qty":25,"tags":["blank","red"],"dim_cm":[14,21],"size":{"h":14,"w":21,"uom":"cm"},"status":"E"}'
JSON.SET inventory:2 . '{"item":"notebook","qty":50,"tags":["red","blank"],"dim_cm":[14,21],"size":{"h":14,"w":21,"uom":"cm"},"status":"E"}'
JSON.SET inventory:3 . '{"item":"paper","qty":100,"tags":["red","blank","plain"],"dim_cm":[14,21],"size":{"h":19,"w":22.85,"uom":"cm"},"status":"B"}'
JSON.SET inventory:4 . '{"item":"planner","qty":75,"tags":["blank","red"],"dim_cm":[22.85,30],"status":"C"}'
JSON.SET inventory:5 . '{"item":"postcard","qty":45,"tags":["blue"],"dim_cm":[10,15.25],"status":"D"}'

FT.CREATE inventoryIdx ON JSON PREFIX 1 inventory: SCHEMA $.item AS item TEXT $.qty AS qty NUMERIC $.tags.* AS tags TAG $.dim_cm[0] AS dim_cm_0 NUMERIC $.dim_cm[1] AS dim_cm_1 NUMERIC $.status AS status TEXT $.size.h AS sizeh NUMERIC $.size.w AS sizew NUMERIC $.size.uom AS sizeuom TEXT

node-redis (Javascript)

for (let item of dataset) {
   await client.json.set(`inventory:${itemNum++}`, '.', item);
}
    
await client.ft.create('inventoryIdx', {
   '$.item': {
      type: SchemaFieldTypes.TEXT,
      AS: 'item'
   },
   '$.qty': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'qty'
   },
   '$.tags.*': {
      type: SchemaFieldTypes.TAG,
      AS: 'tags'
   },
   '$.dim_cm[0]': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'dim_cm_0'
   },
   '$.dim_cm[1]': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'dim_cm_1',
   },
   '$.status': {
      type: SchemaFieldTypes.TEXT,
      AS: 'status'
   },
   '$.size.h': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'sizeh'
   },
   '$.size.w': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'sizew'
   },
   '$.size.uom': {
      type: SchemaFieldTypes.TEXT,
      AS: 'sizeuom'
   }
}, {
   ON: 'JSON',
   PREFIX: 'inventory:'
});

redis-py (Python)

itemNum = 1

for dataItem in dataset:
   client.json().set(f'inventory:{itemNum}', '.', dataItem)
   itemNum += 1

index_def = IndexDefinition(
   index_type=IndexType.JSON,
        prefix=['inventory:']
)
schema = (  TextField('$.item', as_name='item'),
            NumericField('$.qty', as_name='qty'),
            TagField('$.tags.*', as_name='tags'),
            NumericField('$.dim_cm[0]', as_name='dim_cm_0'),
            NumericField('$.dim_cm[1]', as_name='dim_cm_1'),
            TextField('$.status', as_name='status'),
            NumericField('$.size.h', as_name='sizeh'),
            NumericField('$.size.w', as_name='sizew'),
            TextField('$.size.uom', as_name='sizeuom')
)
client.ft('inventoryIdx').create_index(schema,definition=index_def)

Read Operation - Compound Query

Mongo Shell

db.inventory.find( {
     status: "E",
     $or: [ { qty: { $lt: 30 } }, { item: /^pa/ } ]
} )

Redis CLI

FT.SEARCH inventoryIdx '(@status:E) ((@qty:[-inf (30])|(@item:pa*))'

node-redis (Javascript)

await client.ft.search('inventoryIdx', '(@status:E) ((@qty:[-inf (30])|(@item:pa*))');

redis-py (Python)

client.ft('inventoryIdx').search(Query('(@status:E) ((@qty:[-inf (30])|(@item:pa*))')).docs

Update Operation

Mongo Shell

db.inventory.update(
   { _id: 2 },
   {
     $inc: { qty: 5 },
     $set: {
       item: "spiral notebook",
     }
   }
)

Redis CLI

MULTI
JSON.NUMINCRBY inventory:2 '.qty' 5
JSON.SET inventory:2 .item '"spiral notebook"'
EXEC

node-redis (Javascript)

await client
   .multi()
   .json.numIncrBy('inventory:2', '.qty', 5)
   .json.set('inventory:2', '.item', 'spiral notebook' )
   .exec();

redis-py (Python)

pipe = client.pipeline()
pipe.json().numincrby('inventory:2', '.qty', 5)
pipe.json().set('inventory:2', '.item', 'spiral notebook') 
pipe.execute()

Delete Operation

Mongo Shell

db.inventory.deleteOne( { "_id" : 2 } )

Redis CLI

JSON.DEL inventory:2

node-redis (Javascript)

await client.json.del('inventory:2');

redis-py (Python)

client.json().delete('inventory:2')

Source (with more examples)

https://github.com/Redislabs-Solution-Architects/docdevdemo

Copyright ©1993-2024 Joey E Whelan, All rights reserved.