Friday, December 30, 2022

Redis Vector Similarity Search

Summary

I'll show some examples of how to utilize vector similarity search (VSS) in Redis.  I'll generate embeddings from pictures, store the resulting vectors in Redis, and then perform searches against those stored vectors.


Architecture




Data Set

I used the Fashion Dataset on Kaggle for the photos to vectorize and store in Redis.

Application

The sample app is written in Python and organized as a single class.  That class does a one-time load of the dataset photos as vectors and stores the vectors in a file.  That file is subsequently used to store the vectors and their associated photo IDs in Redis.  Vector searches of other photos can then be performed against Redis.

Embedding

The code below reads a directory containing the dataset photos, vectorizes each photo, and then writes those vectors to file.  As mentioned, this is a one-time operation.
        if (not os.path.exists(VECTOR_FILE) and len(os.listdir(IMAGE_DIR)) > 0):
            img2vec = Img2Vec(cuda=False)
            images: list = os.listdir(IMAGE_DIR)
            images = images[0:NUM_IMAGES]
            with open(VECTOR_FILE, 'w') as outfile:
                for image in images:
                    img: Image = Image.open(f'{IMAGE_DIR}/{image}').convert('RGB').resize((224, 224))
                    vector: list = img2vec.get_vec(img)
                    id: str = os.path.splitext(image)[0]
                    json.dump({'image_id': id, 'image_vector': vector.tolist()}, outfile)
                    outfile.write('\n')

Redis Data Loading

Redis supports VSS for both JSON and Hash Set data types.  I parameterized a function to allow the creation of Redis VSS indices for either data type.  One important difference in working with JSON or Hashes with Redis VSS:  vectors can be stored as is (array of floats) for JSON documents, but need to be reduced to a BLOB for Hash Sets.


    def _get_images(self) -> dict:

        with open(VECTOR_FILE, 'r') as infile:
            for line in infile:
                obj: object = json.loads(line)
                id: str = str(obj['image_id'])
                match self.object_type:
                    case OBJECT_TYPE.HASH:
                        self.image_dict[id] = np.array(obj['image_vector'], dtype=np.float32).tobytes()
                    case OBJECT_TYPE.JSON:
                        self.image_dict[id] = obj['image_vector']  
                        
def _load_db(self) -> None:

        self.connection.flushdb()
        self._get_images()

        match self.object_type:
            case OBJECT_TYPE.HASH:
                schema = [ VectorField('image_vector', 
                                self.index_type.value, 
                                {   "TYPE": 'FLOAT32', 
                                    "DIM": 512, 
                                    "DISTANCE_METRIC": self.metric_type.value
                                }
                            ),
                            TagField('image_id')
                ]
                idx_def = IndexDefinition(index_type=IndexType.HASH, prefix=['key:'])
                self.connection.ft('idx').create_index(schema, definition=idx_def)

                pipe: Connection = self.connection.pipeline()
                for id, vec in self.image_dict.items():
                    pipe.hset(f'key:{id}', mapping={'image_id': id, 'image_vector': vec})
                pipe.execute()
            case OBJECT_TYPE.JSON:
                schema = [ VectorField('$.image_vector', 
                                self.index_type.value, 
                                {   "TYPE": 'FLOAT32', 
                                    "DIM": 512, 
                                    "DISTANCE_METRIC": self.metric_type.value
                                },  as_name='image_vector'
                            ),
                            TagField('$.image_id', as_name='image_id')
                ]
                idx_def: IndexDefinition = IndexDefinition(index_type=IndexType.JSON, prefix=['key:'])
                self.connection.ft('idx').create_index(schema, definition=idx_def)
                pipe: Connection = self.connection.pipeline()
                for id, vec in self.image_dict.items():
                    pipe.json().set(f'key:{id}', '$', {'image_id': id, 'image_vector': vec})
                pipe.execute()

Search

With vectors loaded and indices created in Redis, a vector search looks the same for either JSON or Hash Sets.  The search vector must be reduced to a BLOB.  Searches can be strictly for vector similarity (KNN search) or a combination of VSS and a traditional Redis Search query (hybrid search).  The function below is parameterized to support both.

 
def search(self, query_vector: list, search_type: SEARCH_TYPE, hyb_str=None) -> list:
        match search_type:
            case SEARCH_TYPE.VECTOR:
                q_str = f'*=>[KNN {TOPK} @image_vector $vec_param AS vector_score]'
            case SEARCH_TYPE.HYBRID:
                q_str = f'(@image_id:{{{hyb_str}}})=>[KNN {TOPK} @image_vector $vec_param AS vector_score]'
        
        q = Query(q_str)\
            .sort_by('vector_score')\
            .paging(0,TOPK)\
            .return_fields('vector_score','image_id')\
            .dialect(2)    
        params_dict = {"vec_param": query_vector}

        results = self.connection.ft('idx').search(q, query_params=params_dict)
        return results

Source

https://github.com/Redislabs-Solution-Architects/vss-ops

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Monday, December 26, 2022

Redis Search with FHIR Data

Summary

This post will demonstrate search functionality in Redis Enterprise (RE) with FHIR data.  I'll generate FHIR patient bundles with the Synthea application.  Then I'll build a three-node RE cluster and sharded Redis DB via Docker scripting and RE REST API.  Finally, I'll show multiple search scenarios on that healthcare data.

Overall Architecture







Data Generation

The shell script below pulls the Synthea jar file, if necessary, and then creates FHIR patient record bundles for every US state.  One to ten bundles are randomly created for each state.
if [ ! -f synthea-with-dependencies.jar ]
then
    wget -q https://github.com/synthetichealth/synthea/releases/download/master-branch-latest/synthea-with-dependencies.jar
fi

STATES=("Alabama" "Alaska" "Arizona" "Arkansas" "California" "Colorado" "Connecticut" 
"Delaware" "District of Columbia" "Florida" "Georgia" "Hawaii" "Idaho" "Illinois"
"Indiana" "Iowa" "Kansas" "Kentucky" "Louisiana"  "Maine" "Montana" "Nebraska" 
"Nevada" "New Hampshire" "New Jersey" "New Mexico" "New York" "North Carolina"
"North Dakota" "Ohio" "Oklahoma" "Oregon" "Maryland" "Massachusetts" "Michigan" 
"Minnesota" "Mississippi" "Missouri" "Pennsylvania" "Rhode Island" "South Carolina"
"South Dakota" "Tennessee" "Texas" "Utah" "Vermont" "Virginia" "Washington" 
"West Virginia" "Wisconsin" "Wyoming")

MAX_POP=10

for state in "${STATES[@]}"; do   
  pop=$(($RANDOM%$MAX_POP + 1))
  java -jar synthea-with-dependencies.jar -c ./syntheaconfig.txt -p $pop "$state"
done

RE Build

This shell script uses a docker-compose file to create a 3-node Redis Enterprise cluster.  It pulls down the latest GA copies of the Search and JSON modules, executes the compose script, assembles a cluster, loads the Search and JSON modules via REST API, and then finally - creates a 2-shard, replicated database on the cluster via REST API.
SEARCH_LATEST=redisearch.Linux-ubuntu18.04-x86_64.2.6.3.zip
JSON_LATEST=rejson.Linux-ubuntu18.04-x86_64.2.4.2.zip

if [ ! -f $SEARCH_LATEST ]
then
    wget -q https://redismodules.s3.amazonaws.com/redisearch/$SEARCH_LATEST
fi 

if [ ! -f $JSON_LATEST ]
then
    wget https://redismodules.s3.amazonaws.com/rejson/$JSON_LATEST
fi 

echo "Launch Redis Enterprise docker containers"
docker compose up -d
echo "*** Wait for Redis Enterprise to come up ***"
curl -s -o /dev/null --retry 5 --retry-all-errors --retry-delay 3 -f -k -u "redis@redis.com:redis" https://localhost:19443/v1/bootstrap
echo "*** Build Cluster ***"
docker exec -it re1 /opt/redislabs/bin/rladmin cluster create name cluster.local username redis@redis.com password redis
docker exec -it re2 /opt/redislabs/bin/rladmin cluster join nodes 192.168.20.2 username redis@redis.com password redis
docker exec -it re3 /opt/redislabs/bin/rladmin cluster join nodes 192.168.20.2 username redis@redis.com password redis
echo "*** Load Modules ***"
curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/modules -F module=@$SEARCH_LATEST
curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/modules -F module=@$JSON_LATEST
echo "*** Build FHIR DB ***"
curl -s -o /dev/null -k -u "redis@redis.com:redis" https://localhost:19443/v1/bdbs -H "Content-Type:application/json" -d @fhirdb.json

RE Architecture

The diagram below depicts the resulting RE architecture that is created.  Two shards (labeled M1 and M2) and their replicas (R1 and R2) are distributed across the cluster.




Screenshots below of the admin interfaces of the RE cluster and database that was created.











Search Examples

Below are some snippets of some of the search/aggregation examples implemented in Python.

Medical Facility Geographic Search


Below are the Redis index and search commands to find the closest medical facility (that is in the database) to a geographic coordinate.  In this, case the coordinates are for Woodland Park, CO.

Index - JavaScript

        await this.client.ft.create('location_idx', {
            '$.status': {
                type: SchemaFieldTypes.TAG,
                AS: 'status'  
            },
            '$.name': {
                type: SchemaFieldTypes.TEXT,
                AS: 'name'
            },
            '$.address.city': {
                type: SchemaFieldTypes.TAG,
                AS: 'city'
            },
            '$.address.state': {
                type: SchemaFieldTypes.TAG,
                AS: 'state'
            },
            '$.position.longitude': {
                type: SchemaFieldTypes.NUMERIC,
                AS: 'longitude'
            },
            '$.position.latitude': {
                type: SchemaFieldTypes.NUMERIC,
                AS: 'latitude'
            }
        }, { ON: 'JSON', PREFIX: 'Location:'});

Index - Python

        idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['Location:'])
        schema = [  TagField('$.status', as_name='status'),
            TextField('$.name', as_name='name'),
            TagField('$.address.city', as_name='city'),
            TagField('$.address.state', as_name='state'),
            NumericField('$.position.longitude', as_name='longitude'),
            NumericField('$.position.latitude', as_name='latitude')
        ]
        connection.ft('location_idx').create_index(schema, definition=idx_def)

Search - JavaScript

        result = await this.client.ft.aggregate('location_idx','@status:{active}', {
            LOAD: ['@name', '@city', '@state', '@longitude', '@latitude'],
            STEPS: [
                    {   type: AggregateSteps.APPLY,
                        expression: 'geodistance(@longitude, @latitude, -105.0569, 38.9939)', 
                        AS: 'meters' 
                    },
                    {   type: AggregateSteps.APPLY ,
                        expression: 'ceil(@meters*0.000621371)', 
                        AS: 'miles' 
                    },
                    {
                        type: AggregateSteps.SORTBY,
                        BY: {
                            BY: '@miles', 
                            DIRECTION: 'ASC' 
                        }
                    },
                    {
                        type: AggregateSteps.LIMIT,
                        from: 0, 
                        size: 1
                    }
            ]
        });

Search - Python

        request = AggregateRequest('@status:{active}')\
        .load('@name', '@city', '@state', '@longitude', '@latitude')\
        .apply(meters='geodistance(@longitude, @latitude, -105.0569, 38.9939)')\
        .apply(miles='ceil(@meters*0.000621371)')\
        .sort_by(Asc('@miles'))\
        .limit(0,1)
        result = connection.ft('location_idx').aggregate(request)

Results

[[b'name', b'ARETI COMPREHENSIVE PRIMARY CARE', b'city', b'COLORADO SPRINGS', b'state', b'CO', 
b'longitude', b'-104.768591624', b'latitude', b'38.9006726282', b'meters', b'27009.43', b'miles', b'17']]

Medication Prescriptions


Below are the index and search commands to compile a list of the Top 3 physicians prescribing opioids by script count.

Index - JavaScript

        await this.client.ft.create('medicationRequest_idx', {
            '$.status': {
                type: SchemaFieldTypes.TAG,
                AS: 'status'
            },
            '$.medicationCodeableConcept.text': {
                type: SchemaFieldTypes.TEXT,
                AS: 'drug'
            },
            '$.requester.display': {
                type: SchemaFieldTypes.TEXT,
                AS: 'prescriber',
                SORTABLE: true
            },
            '$.reasonReference[*].display': {
                type: SchemaFieldTypes.TEXT,
                AS: 'reason'
            }
        }, {ON: 'JSON', PREFIX: 'MedicationRequest:'});

Index - Python

        idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['MedicationRequest:'])
        schema = [  TagField('$.status', as_name='status'),
            TextField('$.medicationCodeableConcept.text', as_name='drug'),
            TextField('$.requester.display', as_name='prescriber', sortable=True),
            TextField('$.reasonReference[*].display', as_name='reason')
        ]
        connection.ft('medicationRequest_idx').create_index(schema, definition=idx_def)

Search - JavaScript

        const opioids = 'Hydrocodone|Oxycodone|Oxymorphone|Morphine|Codeine|Fentanyl|Hydromorphone|Tapentadol|Methadone';
        result = await this.client.ft.aggregate('medicationRequest_idx', `@drug:${opioids}`, {
            STEPS: [
                {   type: AggregateSteps.GROUPBY,
                    properties: ['@prescriber'],
                    REDUCE: [
                        {   type: AggregateGroupByReducers.COUNT,
                            property: '@prescriber',
                            AS: 'opioids_prescribed'
                        }
                    ]   
                },
                {
                    type: AggregateSteps.SORTBY,
                    BY: { 
                        BY: '@opioids_prescribed', 
                        DIRECTION: 'DESC' 
                    }
                },
                {
                    type: AggregateSteps.LIMIT,
                    from: 0, 
                    size: 3
                }
            ]
        });

Search - Python

        opioids = 'Hydrocodone|Oxycodone|Oxymorphone|Morphine|Codeine|Fentanyl|Hydromorphone|Tapentadol|Methadone'
        request = AggregateRequest(f'@drug:{opioids}')\
        .group_by('@prescriber', reducers.count().alias('opioids_prescribed'))\
        .sort_by(Desc('@opioids_prescribed'))\
        .limit(0,3)
        result = connection.ft('medicationRequest_idx').aggregate(request)

Results

[[b'prescriber', b'Dr. Aja848 McKenzie376', b'opiods_prescribed', b'53'], 
[b'prescriber', b'Dr. Jaquelyn689 Bernier607', b'opiods_prescribed', b'52'], 
[b'prescriber', b'Dr. Aurora248 Kessler503', b'opiods_prescribed', b'49']]

Insurer Claim Values


Below are the index and search commands to find the Top 3 insurers by total claim dollar value.

Index - JavaScript


        await this.client.ft.create('claims_idx', {
            '$.status': {
                type: SchemaFieldTypes.TAG,
                AS: 'status'
            },
            '$.insurance[*].coverage.display': {
                type: SchemaFieldTypes.TEXT,
                AS: 'insurer',
                SORTABLE: true    
            },
            '$.total.value': {
                type: SchemaFieldTypes.NUMERIC,
                AS: 'value'
            }
        }, {ON: 'JSON', PREFIX: 'Claim:'});

Index - Python


        idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['Claim:'])
        schema = [  TagField('$.status', as_name='status'),
            TextField('$.insurance[*].coverage.display', as_name='insurer', sortable=True),
            NumericField('$.total.value', as_name='value')
        ]
        connection.ft('claims_idx').create_index(schema, definition=idx_def)

Search - JavaScript

        result = await this.client.ft.aggregate('claims_idx', '@status:{active}', {
            STEPS: [
                {   type: AggregateSteps.GROUPBY,
                    properties: ['@insurer'],
                    REDUCE: [{   
                        type: AggregateGroupByReducers.SUM,
                        property: '@value',
                        AS: 'total_value'
                }]},
                {
                    type: AggregateSteps.FILTER,
                    expression: '@total_value > 0'
                },
                {   type: AggregateSteps.SORTBY,
                    BY: { 
                    BY: '@total_value', 
                    DIRECTION: 'DESC' 
                }},
                {   type: AggregateSteps.LIMIT,
                    from: 0, 
                    size: 5
                }
            ]
        });

Search - Python

        request = AggregateRequest('@status:{active}')\
        .group_by('@insurer', reducers.sum('@value').alias('total_value'))\
        .filter('@total_value > 0')\
        .sort_by(Desc('@total_value'))\
        .limit(0,3)
        result = connection.ft('claims_idx').aggregate(request)

Results

[[b'insurer', b'Medicare', b'total_value', b'29841923.54'], [b'insurer', b'NO_INSURANCE', b'total_value', b'9749265.48'], 
[b'insurer', b'UnitedHealthcare', b'total_value', b'8859141.59']]

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Saturday, October 15, 2022

Web Crawler with Redis Indexing

Summary

I'll be demonstrating a simple web crawler implementation in Nodejs in this post.  The crawled results are then stored in Redis and indexed with RediSearch.  Apache Tika is used for document parsing.

Architecture

High Level


Detailed


Crawler POST End Point


 
/**
 * Crawl endpoint.  Starts a Worker thread (spider.js) that crawls the given fqdn, extracts text via Tika,
 * and then store the resulting text in Redis as JSON documents
 * This returns immediately and provides a taskID for the Worker thread. 
 */
app.post('/crawl', (req, res) => {
    console.log(`app - POST /crawl ${req.body.fqdn}`);
    const taskID = uuidv4();
    try {
        new Worker('./app/spider.js', { workerData : { 'fqdn': req.body.fqdn, 'taskID': taskID }});
        res.status(201).json({'taskID': taskID});
    }
    catch (err) {
        console.error(`app - POST /crawl ${req.body.fqdn} - ${err.message}`)
        res.status(400).json({ 'error': err.message });
    }
});

Text Extraction with Tika

    async extract(doc, data, hash) {
        const stream = Readable.from(data);  //get a stream from the arrayBuffer obj
        const response = await axios({  //send that stream to Tika for automatic mime-type detection and text extraction
            method: 'PUT',
            url: `${tikaUrl}/tika`,
            data: stream,
            responseType: 'text',
            headers: {
                'Content-Type': 'application/octet-stream',
                'Accept': 'text/plain'
            }
        });
        const json = { "doc": doc, "text": response.data, "hash": hash };
        await this.client.json.set(`${PREFIX}:${doc}`, '.', json);
    }

Index Creation

async function buildIndex() {
    console.log(`app - buildIndex`);
    let rc = await clientFactory();
    try {
        await rc.ft.create('docIdx', {
            '$.doc': {
                type: redis.SchemaFieldTypes.TEXT,
                AS: 'doc'
            },
            '$.text': {
                type: redis.SchemaFieldTypes.TEXT,
                AS: 'text'
            }   
        }, {
            ON: 'JSON',
            PREFIX: 'DOC'
        });
    }
    catch(err) { 
        console.error(`app - buildIndex - ${err.message}`); 
    }
}

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Redis Lookaside Cache with MongoDB

Summary

In this post, I'll demonstrate how to implement caching with Redis against a Mongo database.  All code is Python using the redis-py client lib.  This is implemented as a REST API server using the Fast API module.

Architecture

High Level


Detailed


Data Set

A snippet of the data set is below

 
[
    {
        "airport": {
            "code": "ATL",
            "name": "Atlanta, GA: Hartsfield-Jackson Atlanta International"
        },
        "time": {
            "label": "2003/06",
            "month": 6,
            "month_name": "June",
            "year": 2003
        },
        "statistics": {
            "num_delays": {
                "carrier": 1009,
                "late_aircraft": 1275,
                "national_aviation_system": 3217,
                "security": 17,
                "weather": 328
            },
            "carriers": {
                "names": [
                    "American Airlines Inc.",
                    "JetBlue Airways",
                    "Continental Air Lines Inc.",
                    "Delta Air Lines Inc.",
                    "Atlantic Southeast Airlines",
                    "AirTran Airways Corporation",
                    "America West Airlines Inc.",
                    "Northwest Airlines Inc.",
                    "ExpressJet Airlines Inc.",
                    "United Air Lines Inc.",
                    "US Airways Inc."
                ],
                "total": 11
            },
            "flights": {
                "canceled": 216,
                "delayed": 5843,
                "diverted": 27,
                "on_time": 23974,
                "total": 30060
            },
            "minutes_delayed": {
                "carrier": 61606,
                "late_aircraft": 68335,
                "national_aviation_system": 118831,
                "security": 518,
                "total": 268764,
                "weather": 19474
            }
        }
    },

Data Loading


 
async def startup_event():
    #mongo startup
    """Reset the db to an empty state and then load data from file
    """
    col = mdb.airlines.delays
    col.drop()  
    with open('data.json') as json_file:
        data = load(json_file)
    await col.insert_many(data)
    await col.create_index([('airport.code', 1),('time.year', 1),('time.month', 1)])
    
    #redis startup
    """Empty cache and rebuild index.
    """
    await rdb.flushdb()
    idx_def = IndexDefinition(index_type=IndexType.JSON, prefix=['delayStat:'])
    schema = (TextField('$.airport.code', as_name='airport', sortable=True),
                NumericField('$.time.year', as_name='year', sortable=True),
                NumericField('$.time.month', as_name='month', sortable=True))
    await rdb.ft('idx').create_index(schema, definition=idx_def)

Data Fetch Logic


 
async def get_cancellations(airport: str, year: int, month: int):
    try:
        result, duration = await time_func(rdb.ft('idx').search,
                Query(f"(@airport:{airport}) (@year:[{year} {year}]) (@month:[{month} {month}])"))

        if len(result.docs) > 0:  # cache hit
            jsonval = loads(result.docs[0].json)
            metrics.incr_hits(duration)
            json_res = {"result": jsonval['statistics']['flights']['canceled']}
        else:  # cache miss
            col = mdb.airlines.delays
            lock = await lock_mgr.lock(f"lock:{airport}:{year}:{month}")  #fine-grained, distributed lock
            result, duration = await time_func(col.find_one, 
                { "airport.code": airport,
                    "time.year": {"$eq": year},
                    "time.month": {"$eq": month}
            })
            metrics.incr_misses(duration)
            if result:
                id = result.pop('_id')  # this field can't be serialized and needs to be removed
                await rdb.json().set(f"delayStat:{id}", '$', result)  #add val to cache and set TTL to 1 hour
                await rdb.expire(f"delayStat:{id}", 3600)
                json_res = {"result": result['statistics']['flights']['canceled']}
            else: # not found in cache or db
                raise HTTPException(status_code=404, detail=f"Data not found for {airport} {year} {month}")
    except Exception as err:
        if type(err) == HTTPException:
            raise err
        else:
            raise HTTPException(status_code=400, detail=str(err))
    else:
        return json_res
    finally:
        if lock and lock.valid:
            await lock_mgr.unlock(lock)

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

RediSearch - ioredis client lib

Summary

I'll be demonstrating various Redis Search operations with the ioredis (Nodejs) client lib in this post.

Data Set Creation

A random set of objects are created and are then saved in Redis as hash sets.
    const pipeline = client.pipeline();
    for (let i=0; i < NUM; i++) {
        const colors = (COLORS.sort(() => .5 - Math.random())).slice(0, Math.floor(Math.random() * COLORS.length))
        const fields = {
            'textField': `text${Math.floor(Math.random() * NUM)}`, 
            'numericField': Math.floor(Math.random() * NUM), 
            'tagField': colors
        };
        await pipeline.hmset(`item:${i}`, fields);
    }
    await pipeline.exec();

Index Creation

    await client.call('FT.CREATE', 'idx', 'ON', 'HASH', 'PREFIX', '1', 'item:', 'SCHEMA', 
        'textField', 'TEXT', 'SORTABLE',
        'numericField', 'NUMERIC', 'SORTABLE',
        'tagField', 'TAG'
    );

Search Operations

	//Search for exact match in a Text field
    let result = await client.call('FT.SEARCH', 'idx', '@textField:text1');

    //Search for a range in a Numeric field
    result = await client.call('FT.SEARCH', 'idx', '@numericField:[1,3]');

    //Search for a match in a Tag Field
    result = await client.call('FT.SEARCH', 'idx', '@tagField:{blue}');

Aggregation Operations

    //Aggregate on a text field across all hashes in the index
    let result = await client.call('FT.AGGREGATE', 'idx', '*', 'GROUPBY', '1', '@textField', 
        'REDUCE', 'COUNT', '0', 'AS', 'CNT');
        
    //Search on a numeric range and then apply the SQRT function to a numeric field in the matches
    const upper = Math.floor(Math.random() * NUM) + 1;
    result = await client.call('FT.AGGREGATE', 'idx', `@numericField:[0,${upper}]`, 
        'APPLY', 'SQRT(@numericField)', 'AS', 'SQRT');
        
    
    //Search for logical OR of two tag values and use a CURSOR.  Limit return
    //to 2 values to illustrate looping on a cursor.
    result = await client.call('FT.AGGREGATE', 'idx', '@tagField:{ yellow | red }', 
        'LOAD', '3', '@textField', '@numericField', '@tagField',
        'WITHCURSOR', 'COUNT', '2'
    );
    console.log('FT.AGGREGATE idx @tagField:{ yellow | red } LOAD 3 @textField @numericField @tagField WITHCURSOR COUNT 2');
    let items = result[0];
    let cursor = result[1];
    while (true) {
        for (let item of items) {
            if (Array.isArray(item)) {
                console.log(JSON.stringify(item))
            }
        }
        if (cursor) {
            result = await client.call('FT.CURSOR', 'READ', 'idx', cursor, 'COUNT', '2');
            items = result[0];
            cursor = result[1];
        }
        else {
            break;
        }
    }

Index Alteration

    await client.call('FT.ALTER', 'idx', 'SCHEMA', 'ADD', 'newField', 'TEXT', 'SORTABLE');
    let result = await client.call('FT.INFO', 'idx');

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Redis + Mongo Document (JSON/Search) Commands Comparison

Summary

This post will demonstrate equivalent Create, Read, Update and Delete (CRUD) commands in MongoDB and Redis.  I provide examples from Mongo's shell (mongosh), Redis CLI, node-redis client lib, and redis-py client lib.

Sample Data Set

Below is a snippet of the JSON document that is used in all the examples.

[
    { 
        "item": "journal", 
        "qty": 25, 
        "tags": ["blank", "red"], 
        "dim_cm": [ 14, 21 ], 
        "size": { 
            "h": 14, 
            "w": 21, 
            "uom": "cm"
        }, 
        "status": "E"  
    },
    { 
        "item": "notebook", 
        "qty": 50, 
        "tags": ["red", "blank"], 
        "dim_cm": [ 14, 21 ], 
        "size": { 
            "h": 14, 
            "w": 21, 
            "uom": "cm" 
        }, 
        "status": "E" 
    },

Create Operation

Mongo Shell

db.inventory.insertMany([
{ _id: 1, item: "journal", qty: 25, tags: ["blank", "red"], dim_cm: [ 14, 21 ], size: { h: 14, w: 21, uom: "cm" }, status: "E"  },
{ _id: 2, item: "notebook", qty: 50, tags: ["red", "blank"], dim_cm: [ 14, 21 ], size: { h: 14, w: 21, uom: "cm" }, status: "E" },
{ _id: 3, item: "paper", qty: 100, tags: ["red", "blank", "plain"], dim_cm: [ 14, 21 ], size: { h: 19, w: 22.85, uom: "cm" },status: "B"  },
{ _id: 4, item: "planner", qty: 75, tags: ["blank", "red"], dim_cm: [ 22.85, 30 ], status: "C" },
{ _id: 5, item: "postcard", qty: 45, tags: ["blue"], dim_cm: [ 10, 15.25 ], status: "D" }
])

db.inventory.createIndexes([
{qty: 1},
{item: 1},
{tags: 1},
{size: 1},
{status: 1}
])

Redis CLI

JSON.SET inventory:1 . '{"item":"journal","qty":25,"tags":["blank","red"],"dim_cm":[14,21],"size":{"h":14,"w":21,"uom":"cm"},"status":"E"}'
JSON.SET inventory:2 . '{"item":"notebook","qty":50,"tags":["red","blank"],"dim_cm":[14,21],"size":{"h":14,"w":21,"uom":"cm"},"status":"E"}'
JSON.SET inventory:3 . '{"item":"paper","qty":100,"tags":["red","blank","plain"],"dim_cm":[14,21],"size":{"h":19,"w":22.85,"uom":"cm"},"status":"B"}'
JSON.SET inventory:4 . '{"item":"planner","qty":75,"tags":["blank","red"],"dim_cm":[22.85,30],"status":"C"}'
JSON.SET inventory:5 . '{"item":"postcard","qty":45,"tags":["blue"],"dim_cm":[10,15.25],"status":"D"}'

FT.CREATE inventoryIdx ON JSON PREFIX 1 inventory: SCHEMA $.item AS item TEXT $.qty AS qty NUMERIC $.tags.* AS tags TAG $.dim_cm[0] AS dim_cm_0 NUMERIC $.dim_cm[1] AS dim_cm_1 NUMERIC $.status AS status TEXT $.size.h AS sizeh NUMERIC $.size.w AS sizew NUMERIC $.size.uom AS sizeuom TEXT

node-redis (Javascript)

for (let item of dataset) {
   await client.json.set(`inventory:${itemNum++}`, '.', item);
}
    
await client.ft.create('inventoryIdx', {
   '$.item': {
      type: SchemaFieldTypes.TEXT,
      AS: 'item'
   },
   '$.qty': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'qty'
   },
   '$.tags.*': {
      type: SchemaFieldTypes.TAG,
      AS: 'tags'
   },
   '$.dim_cm[0]': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'dim_cm_0'
   },
   '$.dim_cm[1]': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'dim_cm_1',
   },
   '$.status': {
      type: SchemaFieldTypes.TEXT,
      AS: 'status'
   },
   '$.size.h': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'sizeh'
   },
   '$.size.w': {
      type: SchemaFieldTypes.NUMERIC,
      AS: 'sizew'
   },
   '$.size.uom': {
      type: SchemaFieldTypes.TEXT,
      AS: 'sizeuom'
   }
}, {
   ON: 'JSON',
   PREFIX: 'inventory:'
});

redis-py (Python)

itemNum = 1

for dataItem in dataset:
   client.json().set(f'inventory:{itemNum}', '.', dataItem)
   itemNum += 1

index_def = IndexDefinition(
   index_type=IndexType.JSON,
        prefix=['inventory:']
)
schema = (  TextField('$.item', as_name='item'),
            NumericField('$.qty', as_name='qty'),
            TagField('$.tags.*', as_name='tags'),
            NumericField('$.dim_cm[0]', as_name='dim_cm_0'),
            NumericField('$.dim_cm[1]', as_name='dim_cm_1'),
            TextField('$.status', as_name='status'),
            NumericField('$.size.h', as_name='sizeh'),
            NumericField('$.size.w', as_name='sizew'),
            TextField('$.size.uom', as_name='sizeuom')
)
client.ft('inventoryIdx').create_index(schema,definition=index_def)

Read Operation - Compound Query

Mongo Shell

db.inventory.find( {
     status: "E",
     $or: [ { qty: { $lt: 30 } }, { item: /^pa/ } ]
} )

Redis CLI

FT.SEARCH inventoryIdx '(@status:E) ((@qty:[-inf (30])|(@item:pa*))'

node-redis (Javascript)

await client.ft.search('inventoryIdx', '(@status:E) ((@qty:[-inf (30])|(@item:pa*))');

redis-py (Python)

client.ft('inventoryIdx').search(Query('(@status:E) ((@qty:[-inf (30])|(@item:pa*))')).docs

Update Operation

Mongo Shell

db.inventory.update(
   { _id: 2 },
   {
     $inc: { qty: 5 },
     $set: {
       item: "spiral notebook",
     }
   }
)

Redis CLI

MULTI
JSON.NUMINCRBY inventory:2 '.qty' 5
JSON.SET inventory:2 .item '"spiral notebook"'
EXEC

node-redis (Javascript)

await client
   .multi()
   .json.numIncrBy('inventory:2', '.qty', 5)
   .json.set('inventory:2', '.item', 'spiral notebook' )
   .exec();

redis-py (Python)

pipe = client.pipeline()
pipe.json().numincrby('inventory:2', '.qty', 5)
pipe.json().set('inventory:2', '.item', 'spiral notebook') 
pipe.execute()

Delete Operation

Mongo Shell

db.inventory.deleteOne( { "_id" : 2 } )

Redis CLI

JSON.DEL inventory:2

node-redis (Javascript)

await client.json.del('inventory:2');

redis-py (Python)

client.json().delete('inventory:2')

Source (with more examples)

https://github.com/Redislabs-Solution-Architects/docdevdemo

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Monday, May 30, 2022

RedisJSON - CRUD Ops

Summary

I cover a shopping cart application use case in this post.  I implement a full CRUD set for the users, products, and carts using the RedisJSON module.

Architecture

The Server-side is implemented as an Express.js app with the node-redis module.  The test client is also Node-based with node-fetch used for the HTTP client and a random data generator I concocted with the uniqueNamesGenerator module.


Code Snippets

Random Data Generation

    static generateUser() {
        return {
            "userID": uuidv4(),
            "lastName":  RandomData.#getLastName(),
            "firstName": RandomData.#getFirstName(),
            "street": RandomData.#getStreet(),
            "city": RandomData.#getCity(),
            "state": RandomData.#getState(),
            "zip": RandomData.#getZip()    
        };
    };
    
    static #getFirstName() {
        return uniqueNamesGenerator({
            dictionaries:[names],
            length: 1
        });
    };

    static #getLastName() {
        return uniqueNamesGenerator({
            dictionaries: [adjectives],
            length: 1,
            style: 'capital'
        });
    };


Create User - Client-side

async function createUser(dbType, user) {
    const response = await fetch(`${SERVER.url}/${dbType}/user`, {
        method: 'POST',
        body: JSON.stringify(user),
        headers: {
            'Content-Type': 'application/json',
            'Authorization': AUTH
        }
    });
    return await response.json();
};

const user = RandomData.generateUser();
res = await createUser('redis', user);


Update Cart - Server-side

app.patch('/:dbType/cart/:cartID', async (req, res) => {
    switch (req.params.dbType) {
        case 'redis':
            try {
                var client = await redisConnect();
                const updatedItem = req.body;
                const items = await client.json.get(`cart:${req.params.cartID}`, {path:'.items'}); 
                const newItems = [];
                let found = false
                for (let item of items) {
                    if (updatedItem.sku == item.sku) {
                        found = true;
                        if (updatedItem.quantity == 0) {
                            continue;
                        }
                        else {
                            newItems.push(updatedItem)
                        }
                        break;
                    }
                    else {
                        newItems.push(item);
                    }
                }
                if (!found) {
                    newItems.push(updatedItem)
                }           
                const val = await client.json.set(`cart:${req.params.cartID}`, `.items`, newItems);

                if (val == 'OK') {
                    console.log(`200: Cart ${req.params.cartID} updated`);
                    res.status(200).json({'cartID': req.params.cartID});
                }
                else {
                    throw new Error(`Cart ${req.params.sku} not fully updated`);
                }
            }
            catch (err) {
                console.error(`400: ${err.message}`);
                res.status(400).json({error: err.message});
            }
            finally {
                await client.quit();
            };
            break;
        default:
            const msg = 'Unknown DB Type';
            console.error(`400: ${msg}`);
            res.status(400).json({error: msg});
            break;
    };
});


Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Saturday, May 21, 2022

Azure Container Apps

Summary

This is a continuation of a previous post on proxying a SOAP API to REST.  In this post, I'll deploy the containerized proxy to Azure Container Apps and front end it with Azure API Management (APIM).

Architecture



Code

Proxy App

I modified the Python FastAPI app slightly to serve up an OpenAPI file.  That file is used by APIM during provisioning.

from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from zeep import Client
import logging

logging.getLogger('zeep').setLevel(logging.ERROR)
client = Client('https://www.w3schools.com/xml/tempconvert.asmx?wsdl')
app = FastAPI()

@app.get("/openapi.yml")
async def openapi():
    return FileResponse("openapi.yml")

@app.get("/CelsiusToFahrenheit")
async def celsiusToFahrenheit(temp: int): 
    try:
        soapResponse = client.service.CelsiusToFahrenheit(temp)
        fahrenheit = int(round(float(soapResponse),0))
    except:
        raise HTTPException(status_code=400, detail="SOAP request error")
    else:
        return {"temp": fahrenheit}


@app.get("/FahrenheitToCelsius")
async def fahrenheitToCelsius(temp: int): 
    try:
        soapResponse = client.service.FahrenheitToCelsius(temp)
        celsius = int(round(float(soapResponse),0))
    except:
        raise HTTPException(status_code=400, detail="SOAP request error")
    else:
        return {"temp": celsius}

OpenAPI Spec


swagger: '2.0'
info:
  title: apiproxy
  description: REST to SOAP proxy
  version: 1.0.0
schemes:
  - http
produces:
  - application/json
paths:
  /CelsiusToFahrenheit:
    get:
      summary: Convert celsius temp to fahrenheit
      parameters:
        - name: temp
          in: path
          required: true
          type: integer
      responses:
        '200':
          description: converted temp
          schema: 
            type: object
            properties:
              temp:
                type: integer
        '400':
          description: General error
  /FahrenheitToCelsius:
    get:
      summary: Convert fahrenheit temp to celsius
      parameters:
        - name: temp
          in: path
          required: true
          type: integer
      responses:
        '200':
          description: converted temp
          schema: 
            type: object
            properties:
              temp:
                type: integer
        '400':
          description: General error

Deployment


Create + Configure Azure Container Registry





Visual Studio Code - Build Image in Azure







Create + Configure Azure Container App






Execution

Deploy and Test Container App in APIM



Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Sunday, May 15, 2022

RedisTimeSeries

Summary

I'll be covering an IoT data feed use case in this post.  That data feed originates from a Tempest weather station and is stored on a Redis instance as TimeSeries data.  The application is implemented as a container on Google Cloud Run.  The Redis TimeSeries data is visualized using Grafana.

Architecture



Application


ExpressJS REST API Server

Very simple server-side app to start and stop the data flow.
app.post('/start', async (req, res) => {
    try {
        if (!tc) {
            tc = new TempestClient();
            await tc.start();
            res.status(201).json({'message': 'success'});
        }
        else {
            throw new Error('tempest client already instantiated');
        }
    }
    catch (err) {
        res.status(400).json({error: err.message})
    };
});

app.post('/stop', async (req, res) => {
    try {
        if (tc) {
            await tc.stop();
            tc = null;
            res.status(201).json({'message': 'success'});
        }
        else {
            throw new Error('tempest client does not exist');    
        }
    }
    catch (err) {
        res.status(400).json({error: err.message})
    };
});

Tempest Client

Weatherflow provides a published REST and Websocket API.  In this case, I used their Websocket interface to provide a 3-second feed of wind data from the weather station.

    async start() {
        if (!this.ts && !this.ws) {
            this.ts = new TimeSeriesClient(redis.user, redis.password, redis.url);
            await this.ts.connect();
            this.ws = new WebSocket(`${tempest.url}?token=${tempest.password}`);

            this.ws.on('open', () => {
                console.log('Websocket opened');
                this.ws.send(JSON.stringify(this.wsRequest));
            });
        
            this.ws.on('message', async (data) => {
                const obj = JSON.parse(data);
                if ("ob" in obj) {
                    const time = Date.now()
                    const speed = Number(obj.ob[1] * MS_TO_MPH).toFixed(1);
                    const direction = obj.ob[2];
                    console.log(`time: ${time} speed: ${speed} direction: ${direction}`);
                    await this.ts.update(tempest.deviceId, time, speed, direction);                
                }
             });

            this.ws.on('close', async () => {
                console.log('Websocket closed')
                await this.ts.quit();
                this.ts = null;
                this.ws = null;
            });

            this.ws.on('error', async (err) => {
                await this.ts.quit();
                this.ws.close();
                this.ts = null;
                this.ws = null;
                console.error('ws err: ' + err);
            });
        }
    }

    async stop() {
        this.ws.close();
    }

Redis TimeSeries Client

I used the Node-Redis client to implement a function that performs a TimeSeries Add.  
    async update(deviceId, time, speed, direction) {
        await this.client.ts.add(`wind_direction:${deviceId}`, time, direction);
        await this.client.ts.add(`wind_speed:${deviceId}`, time, speed);
    }

Deployment

Dockerfile


FROM node:18-slim
WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install
COPY . .
EXPOSE 8080
CMD ["npm", "start"]

Redis Cloud + Insight




Google Cloud Code Integration with VS Code

The app container is deployed to Cloud Run using the Cloud Code tools.








Grafana Data Connection to Redis



Execution

CURL POST To Start Data Flow


curl -X POST https://redis-demo-y6pby4qk2a-uc.a.run.app/start -u yourUser:yourPassword

Redis Insight Real-time Feed


Cloud Run Console



Grafana Dashboard



Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Stable Marriage Problem - Python

 Summary

This short post covers a Python implementation of the Gale-Shapely algorithm for the Stable Marriage Problem (SMP).  Stable matching is a well-studied problem in multiple fields.  It has applications in nearly any two-sided market scenario.

Code Snippets

Preference Generator

Below is a Python function to generate a random set of preferences for two classes.  Those preferences are then subsequently used by Gale-Shapley to determine a stable matching.
def generate_prefs(class1, class2):
    if len(class1) != len(class2):
        raise Exception("Invalid input: unequal list sizes")

    prefs = {}
    for item in class1:
        random.shuffle(class2)
        prefs[item] = class2.copy()
    
    for item in class2:
        random.shuffle(class1)
        prefs[item] = class1.copy()

    return dict(sorted(prefs.items()))

Gale-Shapley

def gale_shapley(prefs, proposers):
    matches = []
    while len(proposers) > 0:  #terminating condition - all proposers are matched
        proposer = proposers.pop(0)  #Each round - proposer is popped from the free list
        proposee = prefs[proposer].pop(0)  #Each round - the proposer's top preference is popped
        matchLen= len(matches)
        found = False
        
        for index in range(matchLen):  
            match = matches[index]
            if proposee in match:  #proposee is already matched
                found = True
                temp = match.copy()
                temp.remove(proposee)
                matchee = temp.pop()
                if prefs[proposee].index(proposer) < prefs[proposee].index(matchee):  #proposer is a higher preference 
                    matches.remove(match)  #remove old match
                    matches.append([proposer, proposee])  #create new match with proposer
                    proposers.append(matchee)  #add the previous proposer to the free list of proposers
                else:
                    proposers.append(proposer)  #proposer wasn't a higher prefence, so gets put back on free list
                break
            else:
                continue
        if not found:  #proposee was not previously matched so is automatically matched to proposer
            matches.append([proposer, proposee])
        else:
            continue
    return matches

Output

Below is a sample run with two three-member classes: (a1, a2, a3) and (b1, b2, b3).
Preferences
{'a1': ['b2', 'b3', 'b1'],
 'a2': ['b3', 'b2', 'b1'],
 'a3': ['b1', 'b2', 'b3'],
 'b1': ['a2', 'a1', 'a3'],
 'b2': ['a1', 'a3', 'a2'],
 'b3': ['a3', 'a1', 'a2']}

Matches
[['a3', 'b1'], ['a1', 'b2'], ['a2', 'b3']]

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Sunday, February 20, 2022

API Proxy - Docker

Summary

This post covers a machine-to-machine use case where the provider has only a SOAP interface to their services but the client can only support a REST interface.  One possible solution to that dilemma is to deploy middleware in the form of a proxy between the two entities.

Concept

The diagram below depicts overall the architectural concept.  The SOAP API is mediated by a REST API.



App Architecture

The REST proxy is implemented via the FastAPI framework.  SOAP interactions are managed via the Zeep package.  The HTTP server is provided by Uvicorn.  The entire package is deployed as a container via Docker.  The SOAP service being used here is simple/toy example here.


Environment Set Up

I'm using Visual Studio Code to develop this in Python.  A Python virtual environment can be set up with the command below:

python3 -m venv env

Code

The Python code below implements a REST proxy for two different SOAP endpoints.

client = Client('https://www.w3schools.com/xml/tempconvert.asmx?wsdl')
app = FastAPI()

@app.get("/CelsiusToFahrenheit")
async def celsiusToFahrenheit(temp: int): 
    try:
        soapResponse = client.service.CelsiusToFahrenheit(temp)
        fahrenheit = int(round(float(soapResponse),0))
    except:
        raise HTTPException(status_code=400, detail="SOAP request error")
    else:
        return {"temp": fahrenheit}


@app.get("/FahrenheitToCelsius")
async def fahrenheitToCelsius(temp: int): 
    try:
        soapResponse = client.service.FahrenheitToCelsius(temp)
        celsius = int(round(float(soapResponse),0))
    except:
        raise HTTPException(status_code=400, detail="SOAP request error")
    else:
        return {"temp": celsius}

Container Set Up

Python library requirements for the app are stored to a text file with the command below:

pip freeze > requirements.txt
Docker file below:

FROM python:3.9-slim
COPY proxy.py proxy.py
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir --upgrade -r requirements.txt
CMD ["uvicorn", "proxy:app", "--host", "0.0.0.0", "--port", "80"]
Screenshot of the resulting container in Studio Code's Docker tool:

Execution


$ curl 'http://localhost/FahrenheitToCelsius?temp=32'
{"temp":0}

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.