Saturday, October 15, 2022

Web Crawler with Redis Indexing

Summary

I'll be demonstrating a simple web crawler implementation in Nodejs in this post.  The crawled results are then stored in Redis and indexed with RediSearch.  Apache Tika is used for document parsing.

Architecture

High Level


Detailed


Crawler POST End Point


  1. /**
  2. * Crawl endpoint. Starts a Worker thread (spider.js) that crawls the given fqdn, extracts text via Tika,
  3. * and then store the resulting text in Redis as JSON documents
  4. * This returns immediately and provides a taskID for the Worker thread.
  5. */
  6. app.post('/crawl', (req, res) => {
  7. console.log(`app - POST /crawl ${req.body.fqdn}`);
  8. const taskID = uuidv4();
  9. try {
  10. new Worker('./app/spider.js', { workerData : { 'fqdn': req.body.fqdn, 'taskID': taskID }});
  11. res.status(201).json({'taskID': taskID});
  12. }
  13. catch (err) {
  14. console.error(`app - POST /crawl ${req.body.fqdn} - ${err.message}`)
  15. res.status(400).json({ 'error': err.message });
  16. }
  17. });

Text Extraction with Tika

  1. async extract(doc, data, hash) {
  2. const stream = Readable.from(data); //get a stream from the arrayBuffer obj
  3. const response = await axios({ //send that stream to Tika for automatic mime-type detection and text extraction
  4. method: 'PUT',
  5. url: `${tikaUrl}/tika`,
  6. data: stream,
  7. responseType: 'text',
  8. headers: {
  9. 'Content-Type': 'application/octet-stream',
  10. 'Accept': 'text/plain'
  11. }
  12. });
  13. const json = { "doc": doc, "text": response.data, "hash": hash };
  14. await this.client.json.set(`${PREFIX}:${doc}`, '.', json);
  15. }

Index Creation

  1. async function buildIndex() {
  2. console.log(`app - buildIndex`);
  3. let rc = await clientFactory();
  4. try {
  5. await rc.ft.create('docIdx', {
  6. '$.doc': {
  7. type: redis.SchemaFieldTypes.TEXT,
  8. AS: 'doc'
  9. },
  10. '$.text': {
  11. type: redis.SchemaFieldTypes.TEXT,
  12. AS: 'text'
  13. }
  14. }, {
  15. ON: 'JSON',
  16. PREFIX: 'DOC'
  17. });
  18. }
  19. catch(err) {
  20. console.error(`app - buildIndex - ${err.message}`);
  21. }
  22. }

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Redis Lookaside Cache with MongoDB

Summary

In this post, I'll demonstrate how to implement caching with Redis against a Mongo database.  All code is Python using the redis-py client lib.  This is implemented as a REST API server using the Fast API module.

Architecture

High Level


Detailed


Data Set

A snippet of the data set is below

  1. [
  2. {
  3. "airport": {
  4. "code": "ATL",
  5. "name": "Atlanta, GA: Hartsfield-Jackson Atlanta International"
  6. },
  7. "time": {
  8. "label": "2003/06",
  9. "month": 6,
  10. "month_name": "June",
  11. "year": 2003
  12. },
  13. "statistics": {
  14. "num_delays": {
  15. "carrier": 1009,
  16. "late_aircraft": 1275,
  17. "national_aviation_system": 3217,
  18. "security": 17,
  19. "weather": 328
  20. },
  21. "carriers": {
  22. "names": [
  23. "American Airlines Inc.",
  24. "JetBlue Airways",
  25. "Continental Air Lines Inc.",
  26. "Delta Air Lines Inc.",
  27. "Atlantic Southeast Airlines",
  28. "AirTran Airways Corporation",
  29. "America West Airlines Inc.",
  30. "Northwest Airlines Inc.",
  31. "ExpressJet Airlines Inc.",
  32. "United Air Lines Inc.",
  33. "US Airways Inc."
  34. ],
  35. "total": 11
  36. },
  37. "flights": {
  38. "canceled": 216,
  39. "delayed": 5843,
  40. "diverted": 27,
  41. "on_time": 23974,
  42. "total": 30060
  43. },
  44. "minutes_delayed": {
  45. "carrier": 61606,
  46. "late_aircraft": 68335,
  47. "national_aviation_system": 118831,
  48. "security": 518,
  49. "total": 268764,
  50. "weather": 19474
  51. }
  52. }
  53. },

Data Loading


  1. async def startup_event():
  2. #mongo startup
  3. """Reset the db to an empty state and then load data from file
  4. """
  5. col = mdb.airlines.delays
  6. col.drop()
  7. with open('data.json') as json_file:
  8. data = load(json_file)
  9. await col.insert_many(data)
  10. await col.create_index([('airport.code', 1),('time.year', 1),('time.month', 1)])
  11. #redis startup
  12. """Empty cache and rebuild index.
  13. """
  14. await rdb.flushdb()
  15. idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['delayStat:'])
  16. schema = (TextField('$.airport.code', as_name='airport', sortable=True),
  17. NumericField('$.time.year', as_name='year', sortable=True),
  18. NumericField('$.time.month', as_name='month', sortable=True))
  19. await rdb.ft('idx').create_index(schema, definition=idx_def)

Data Fetch Logic


  1. async def get_cancellations(airport: str, year: int, month: int):
  2. try:
  3. result, duration = await time_func(rdb.ft('idx').search,
  4. Query(f"(@airport:{airport}) (@year:[{year} {year}]) (@month:[{month} {month}])"))
  5.  
  6. if len(result.docs) > 0: # cache hit
  7. jsonval = loads(result.docs[0].json)
  8. metrics.incr_hits(duration)
  9. json_res = {"result": jsonval['statistics']['flights']['canceled']}
  10. else: # cache miss
  11. col = mdb.airlines.delays
  12. lock = await lock_mgr.lock(f"lock:{airport}:{year}:{month}") #fine-grained, distributed lock
  13. result, duration = await time_func(col.find_one,
  14. { "airport.code": airport,
  15. "time.year": {"$eq": year},
  16. "time.month": {"$eq": month}
  17. })
  18. metrics.incr_misses(duration)
  19. if result:
  20. id = result.pop('_id') # this field can't be serialized and needs to be removed
  21. await rdb.json().set(f"delayStat:{id}", '$', result) #add val to cache and set TTL to 1 hour
  22. await rdb.expire(f"delayStat:{id}", 3600)
  23. json_res = {"result": result['statistics']['flights']['canceled']}
  24. else: # not found in cache or db
  25. raise HTTPException(status_code=404, detail=f"Data not found for {airport} {year} {month}")
  26. except Exception as err:
  27. if type(err) == HTTPException:
  28. raise err
  29. else:
  30. raise HTTPException(status_code=400, detail=str(err))
  31. else:
  32. return json_res
  33. finally:
  34. if lock and lock.valid:
  35. await lock_mgr.unlock(lock)

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

RediSearch - ioredis client lib

Summary

I'll be demonstrating various Redis Search operations with the ioredis (Nodejs) client lib in this post.

Data Set Creation

A random set of objects are created and are then saved in Redis as hash sets.
  1. const pipeline = client.pipeline();
  2. for (let i=0; i < NUM; i++) {
  3. const colors = (COLORS.sort(() => .5 - Math.random())).slice(0, Math.floor(Math.random() * COLORS.length))
  4. const fields = {
  5. 'textField': `text${Math.floor(Math.random() * NUM)}`,
  6. 'numericField': Math.floor(Math.random() * NUM),
  7. 'tagField': colors
  8. };
  9. await pipeline.hmset(`item:${i}`, fields);
  10. }
  11. await pipeline.exec();

Index Creation

  1. await client.call('FT.CREATE', 'idx', 'ON', 'HASH', 'PREFIX', '1', 'item:', 'SCHEMA',
  2. 'textField', 'TEXT', 'SORTABLE',
  3. 'numericField', 'NUMERIC', 'SORTABLE',
  4. 'tagField', 'TAG'
  5. );

Search Operations

  1. //Search for exact match in a Text field
  2. let result = await client.call('FT.SEARCH', 'idx', '@textField:text1');
  3.  
  4. //Search for a range in a Numeric field
  5. result = await client.call('FT.SEARCH', 'idx', '@numericField:[1,3]');
  6.  
  7. //Search for a match in a Tag Field
  8. result = await client.call('FT.SEARCH', 'idx', '@tagField:{blue}');

Aggregation Operations

  1. //Aggregate on a text field across all hashes in the index
  2. let result = await client.call('FT.AGGREGATE', 'idx', '*', 'GROUPBY', '1', '@textField',
  3. 'REDUCE', 'COUNT', '0', 'AS', 'CNT');
  4. //Search on a numeric range and then apply the SQRT function to a numeric field in the matches
  5. const upper = Math.floor(Math.random() * NUM) + 1;
  6. result = await client.call('FT.AGGREGATE', 'idx', `@numericField:[0,${upper}]`,
  7. 'APPLY', 'SQRT(@numericField)', 'AS', 'SQRT');
  8. //Search for logical OR of two tag values and use a CURSOR. Limit return
  9. //to 2 values to illustrate looping on a cursor.
  10. result = await client.call('FT.AGGREGATE', 'idx', '@tagField:{ yellow | red }',
  11. 'LOAD', '3', '@textField', '@numericField', '@tagField',
  12. 'WITHCURSOR', 'COUNT', '2'
  13. );
  14. console.log('FT.AGGREGATE idx @tagField:{ yellow | red } LOAD 3 @textField @numericField @tagField WITHCURSOR COUNT 2');
  15. let items = result[0];
  16. let cursor = result[1];
  17. while (true) {
  18. for (let item of items) {
  19. if (Array.isArray(item)) {
  20. console.log(JSON.stringify(item))
  21. }
  22. }
  23. if (cursor) {
  24. result = await client.call('FT.CURSOR', 'READ', 'idx', cursor, 'COUNT', '2');
  25. items = result[0];
  26. cursor = result[1];
  27. }
  28. else {
  29. break;
  30. }
  31. }

Index Alteration

  1. await client.call('FT.ALTER', 'idx', 'SCHEMA', 'ADD', 'newField', 'TEXT', 'SORTABLE');
  2. let result = await client.call('FT.INFO', 'idx');

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Redis + Mongo Document (JSON/Search) Commands Comparison

Summary

This post will demonstrate equivalent Create, Read, Update and Delete (CRUD) commands in MongoDB and Redis.  I provide examples from Mongo's shell (mongosh), Redis CLI, node-redis client lib, and redis-py client lib.

Sample Data Set

Below is a snippet of the JSON document that is used in all the examples.

  1. [
  2. {
  3. "item": "journal",
  4. "qty": 25,
  5. "tags": ["blank", "red"],
  6. "dim_cm": [ 14, 21 ],
  7. "size": {
  8. "h": 14,
  9. "w": 21,
  10. "uom": "cm"
  11. },
  12. "status": "E"
  13. },
  14. {
  15. "item": "notebook",
  16. "qty": 50,
  17. "tags": ["red", "blank"],
  18. "dim_cm": [ 14, 21 ],
  19. "size": {
  20. "h": 14,
  21. "w": 21,
  22. "uom": "cm"
  23. },
  24. "status": "E"
  25. },

Create Operation

Mongo Shell

  1. db.inventory.insertMany([
  2. { _id: 1, item: "journal", qty: 25, tags: ["blank", "red"], dim_cm: [ 14, 21 ], size: { h: 14, w: 21, uom: "cm" }, status: "E" },
  3. { _id: 2, item: "notebook", qty: 50, tags: ["red", "blank"], dim_cm: [ 14, 21 ], size: { h: 14, w: 21, uom: "cm" }, status: "E" },
  4. { _id: 3, item: "paper", qty: 100, tags: ["red", "blank", "plain"], dim_cm: [ 14, 21 ], size: { h: 19, w: 22.85, uom: "cm" },status: "B" },
  5. { _id: 4, item: "planner", qty: 75, tags: ["blank", "red"], dim_cm: [ 22.85, 30 ], status: "C" },
  6. { _id: 5, item: "postcard", qty: 45, tags: ["blue"], dim_cm: [ 10, 15.25 ], status: "D" }
  7. ])
  8.  
  9. db.inventory.createIndexes([
  10. {qty: 1},
  11. {item: 1},
  12. {tags: 1},
  13. {size: 1},
  14. {status: 1}
  15. ])

Redis CLI

  1. JSON.SET inventory:1 . '{"item":"journal","qty":25,"tags":["blank","red"],"dim_cm":[14,21],"size":{"h":14,"w":21,"uom":"cm"},"status":"E"}'
  2. JSON.SET inventory:2 . '{"item":"notebook","qty":50,"tags":["red","blank"],"dim_cm":[14,21],"size":{"h":14,"w":21,"uom":"cm"},"status":"E"}'
  3. JSON.SET inventory:3 . '{"item":"paper","qty":100,"tags":["red","blank","plain"],"dim_cm":[14,21],"size":{"h":19,"w":22.85,"uom":"cm"},"status":"B"}'
  4. JSON.SET inventory:4 . '{"item":"planner","qty":75,"tags":["blank","red"],"dim_cm":[22.85,30],"status":"C"}'
  5. JSON.SET inventory:5 . '{"item":"postcard","qty":45,"tags":["blue"],"dim_cm":[10,15.25],"status":"D"}'
  6.  
  7. FT.CREATE inventoryIdx ON JSON PREFIX 1 inventory: SCHEMA $.item AS item TEXT $.qty AS qty NUMERIC $.tags.* AS tags TAG $.dim_cm[0] AS dim_cm_0 NUMERIC $.dim_cm[1] AS dim_cm_1 NUMERIC $.status AS status TEXT $.size.h AS sizeh NUMERIC $.size.w AS sizew NUMERIC $.size.uom AS sizeuom TEXT

node-redis (Javascript)

  1. for (let item of dataset) {
  2. await client.json.set(`inventory:${itemNum++}`, '.', item);
  3. }
  4. await client.ft.create('inventoryIdx', {
  5. '$.item': {
  6. type: SchemaFieldTypes.TEXT,
  7. AS: 'item'
  8. },
  9. '$.qty': {
  10. type: SchemaFieldTypes.NUMERIC,
  11. AS: 'qty'
  12. },
  13. '$.tags.*': {
  14. type: SchemaFieldTypes.TAG,
  15. AS: 'tags'
  16. },
  17. '$.dim_cm[0]': {
  18. type: SchemaFieldTypes.NUMERIC,
  19. AS: 'dim_cm_0'
  20. },
  21. '$.dim_cm[1]': {
  22. type: SchemaFieldTypes.NUMERIC,
  23. AS: 'dim_cm_1',
  24. },
  25. '$.status': {
  26. type: SchemaFieldTypes.TEXT,
  27. AS: 'status'
  28. },
  29. '$.size.h': {
  30. type: SchemaFieldTypes.NUMERIC,
  31. AS: 'sizeh'
  32. },
  33. '$.size.w': {
  34. type: SchemaFieldTypes.NUMERIC,
  35. AS: 'sizew'
  36. },
  37. '$.size.uom': {
  38. type: SchemaFieldTypes.TEXT,
  39. AS: 'sizeuom'
  40. }
  41. }, {
  42. ON: 'JSON',
  43. PREFIX: 'inventory:'
  44. });

redis-py (Python)

  1. itemNum = 1
  2.  
  3. for dataItem in dataset:
  4. client.json().set(f'inventory:{itemNum}', '.', dataItem)
  5. itemNum += 1
  6.  
  7. index_def = IndexDefinition(
  8. index_type=IndexType.JSON,
  9. prefix=['inventory:']
  10. )
  11. schema = ( TextField('$.item', as_name='item'),
  12. NumericField('$.qty', as_name='qty'),
  13. TagField('$.tags.*', as_name='tags'),
  14. NumericField('$.dim_cm[0]', as_name='dim_cm_0'),
  15. NumericField('$.dim_cm[1]', as_name='dim_cm_1'),
  16. TextField('$.status', as_name='status'),
  17. NumericField('$.size.h', as_name='sizeh'),
  18. NumericField('$.size.w', as_name='sizew'),
  19. TextField('$.size.uom', as_name='sizeuom')
  20. )
  21. client.ft('inventoryIdx').create_index(schema,definition=index_def)

Read Operation - Compound Query

Mongo Shell

  1. db.inventory.find( {
  2. status: "E",
  3. $or: [ { qty: { $lt: 30 } }, { item: /^pa/ } ]
  4. } )

Redis CLI

  1. FT.SEARCH inventoryIdx '(@status:E) ((@qty:[-inf (30])|(@item:pa*))'

node-redis (Javascript)

  1. await client.ft.search('inventoryIdx', '(@status:E) ((@qty:[-inf (30])|(@item:pa*))');

redis-py (Python)

  1. client.ft('inventoryIdx').search(Query('(@status:E) ((@qty:[-inf (30])|(@item:pa*))')).docs

Update Operation

Mongo Shell

  1. db.inventory.update(
  2. { _id: 2 },
  3. {
  4. $inc: { qty: 5 },
  5. $set: {
  6. item: "spiral notebook",
  7. }
  8. }
  9. )

Redis CLI

  1. MULTI
  2. JSON.NUMINCRBY inventory:2 '.qty' 5
  3. JSON.SET inventory:2 .item '"spiral notebook"'
  4. EXEC

node-redis (Javascript)

  1. await client
  2. .multi()
  3. .json.numIncrBy('inventory:2', '.qty', 5)
  4. .json.set('inventory:2', '.item', 'spiral notebook' )
  5. .exec();

redis-py (Python)

  1. pipe = client.pipeline()
  2. pipe.json().numincrby('inventory:2', '.qty', 5)
  3. pipe.json().set('inventory:2', '.item', 'spiral notebook')
  4. pipe.execute()

Delete Operation

Mongo Shell

  1. db.inventory.deleteOne( { "_id" : 2 } )

Redis CLI

  1. JSON.DEL inventory:2

node-redis (Javascript)

  1. await client.json.del('inventory:2');

redis-py (Python)

  1. client.json().delete('inventory:2')

Source (with more examples)

https://github.com/Redislabs-Solution-Architects/docdevdemo

Copyright ©1993-2024 Joey E Whelan, All rights reserved.