Sunday, May 15, 2022

RedisTimeSeries

Summary

I'll be covering an IoT data feed use case in this post.  That data feed originates from a Tempest weather station and is stored on a Redis instance as TimeSeries data.  The application is implemented as a container on Google Cloud Run.  The Redis TimeSeries data is visualized using Grafana.

Architecture



Application


ExpressJS REST API Server

Very simple server-side app to start and stop the data flow.
app.post('/start', async (req, res) => {
    try {
        if (!tc) {
            tc = new TempestClient();
            await tc.start();
            res.status(201).json({'message': 'success'});
        }
        else {
            throw new Error('tempest client already instantiated');
        }
    }
    catch (err) {
        res.status(400).json({error: err.message})
    };
});

app.post('/stop', async (req, res) => {
    try {
        if (tc) {
            await tc.stop();
            tc = null;
            res.status(201).json({'message': 'success'});
        }
        else {
            throw new Error('tempest client does not exist');    
        }
    }
    catch (err) {
        res.status(400).json({error: err.message})
    };
});

Tempest Client

Weatherflow provides a published REST and Websocket API.  In this case, I used their Websocket interface to provide a 3-second feed of wind data from the weather station.

    async start() {
        if (!this.ts && !this.ws) {
            this.ts = new TimeSeriesClient(redis.user, redis.password, redis.url);
            await this.ts.connect();
            this.ws = new WebSocket(`${tempest.url}?token=${tempest.password}`);

            this.ws.on('open', () => {
                console.log('Websocket opened');
                this.ws.send(JSON.stringify(this.wsRequest));
            });
        
            this.ws.on('message', async (data) => {
                const obj = JSON.parse(data);
                if ("ob" in obj) {
                    const time = Date.now()
                    const speed = Number(obj.ob[1] * MS_TO_MPH).toFixed(1);
                    const direction = obj.ob[2];
                    console.log(`time: ${time} speed: ${speed} direction: ${direction}`);
                    await this.ts.update(tempest.deviceId, time, speed, direction);                
                }
             });

            this.ws.on('close', async () => {
                console.log('Websocket closed')
                await this.ts.quit();
                this.ts = null;
                this.ws = null;
            });

            this.ws.on('error', async (err) => {
                await this.ts.quit();
                this.ws.close();
                this.ts = null;
                this.ws = null;
                console.error('ws err: ' + err);
            });
        }
    }

    async stop() {
        this.ws.close();
    }

Redis TimeSeries Client

I used the Node-Redis client to implement a function that performs a TimeSeries Add.  
    async update(deviceId, time, speed, direction) {
        await this.client.ts.add(`wind_direction:${deviceId}`, time, direction);
        await this.client.ts.add(`wind_speed:${deviceId}`, time, speed);
    }

Deployment

Dockerfile


FROM node:18-slim
WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install
COPY . .
EXPOSE 8080
CMD ["npm", "start"]

Redis Cloud + Insight




Google Cloud Code Integration with VS Code

The app container is deployed to Cloud Run using the Cloud Code tools.








Grafana Data Connection to Redis



Execution

CURL POST To Start Data Flow


curl -X POST https://redis-demo-y6pby4qk2a-uc.a.run.app/start -u yourUser:yourPassword

Redis Insight Real-time Feed


Cloud Run Console



Grafana Dashboard



Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.