Sunday, December 15, 2019

Workflow for moving an existing VS Code project to Github

Summary

This post is a step by step instruction on how to push an existing Visual Studio Code project to Github.  I'll demonstrate steps from both the GUI and command line to accomplish this.

Step 1:  Create the Github Repository

Screen-shot below of a basic repo.  Note that I use Github's stock Node.js .gitignore file.


Resulting repo:


Step 2:  Create a Local Git Repository

Command-line and GUI methods below:
$ git init .

 

Step 3:  Add Remote (Github) Repository

git remote add origin https://github.com/joeywhelan/containertest

Step 4:  Pull from Github

This step will pull down the existing files in the start-up repo - most importantly, the .gitignore file.

Command-line + GUI methods:

$ git pull origin main

Result below.  Note all the untracked files from the node_modules directory are now hidden via the .gitignore from Github.


Step 5:  Add all of the local files to the local Git Staging Area

$ git add .

Results:

Step 6:  Commit all to the local Git repo.

$ git commit -m "first commit" .

Results:

Step 7:  Rename the local 'master' branch to main and push local files to Github. 

$ git branch -m master main
$ git push -u origin main

Results:

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Monday, December 9, 2019

Google Cloud Run Quickstart

Summary

This post is a continuation of the previous quick start on containerizing a Node.js app.  In this post, I'll use the same simple app + Dockerfile and deploy the app to Google's Cloud Run platform.

Cloud Run is an alternate method for server-less app deployments.  Google Cloud Functions is other method.  Whereas Cloud Functions has very specific language requirements, Cloud Run has none.  Whatever you build in a container is fair game.  The only requirement is the app needs to respond to HTTP requests as a trigger.  Cloud Run provides similar auto-scaling capabilities as Cloud Functions making it an excellent choice for a self-managing app deployment.

Step 1:  Create a Google Cloud Project

 

 

Step 2:  Enable Cloud Build and Cloud Run APIs

 

Step 3:  Build the Container

$ gcloud config set project cloudrun-quickstart-261521 
Updated property [core/project].
$ gcloud builds submit --tag gcr.io/cloudrun-quickstart-261521/cloudrun-quickstartCreating temporary tarball archive of 331 file(s) totalling 1.6 MiB before compression.
Uploading tarball of [.] to [gs://cloudrun-quickstart-261521_cloudbuild/source/1575934972.69-7637137b7675474ab861a2f4185529c6.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/cloudrun-quickstart-261521/builds/7591715a-3e15-4323-b570-0c2dc191fb3a].
Logs are available at [https://console.cloud.google.com/gcr/builds/7591715a-3e15-4323-b570-0c2dc191fb3a?project=945345104488].
...
DONE
--------------------------------------------------------------------------------------------------------------------------------------------

ID                                    CREATE_TIME                DURATION  SOURCE                                                                                                IMAGES                                                           STATUS
7591715a-3e15-4323-b570-0c2dc191fb3a  2019-12-09T23:42:55+00:00  57S       gs://cloudrun-quickstart-261521_cloudbuild/source/1575934972.69-7637137b7675474ab861a2f4185529c6.tgz  gcr.io/cloudrun-quickstart-261521/cloudrun-quickstart (+1 more)  SUCCESS

Step 4:  Deploy to Cloud Run

Screen shots below of the Cloud Run console.  Note in the second screen shot the concurrency controls available.  You can specify the number of requests per container and the max container auto-scale growth.




Step 5:  Execute

$ curl -i https://cloudrun-quickstart-6saiqefrtq-uc.a.run.app 
HTTP/2 200 
x-powered-by: Express
content-type: text/html; charset=utf-8
etag: W/"b-Kq5sNclPz7QV2+lfQIuc6R7oRu0"
x-cloud-trace-context: 4f70f43d41086a3d8ea2320a166a5f89;o=1
date: Mon, 09 Dec 2019 23:58:10 GMT
server: Google Frontend
content-length: 11
alt-svc: quic=":443"; ma=2592000; v="46,43",h3-Q050=":443"; ma=2592000,h3-Q049=":443"; ma=2592000,h3-Q048=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000

hello world

Step 6:  Clean up

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Node.js on Docker Quickstart



Summary

This post is a basic primer on getting a Node app containerized.

Step 1:  Create the app

Below is a basic HTTP server implementation using Express.
'use strict';

const express = require('express');
const app = express();
const port = process.env.PORT || 8080;

app.get('/', function(req, res) {
 res.send('hello world')
});

app.listen(port, () => {
    console.log(`listening on ${port}`);
});
Below is the resulting package.json containing the app dependencies. This file is created via the 'npm init' command. 'npm install --save' causes the dependencies to be updated for each module used in the app.
{
  "name": "containertest",
  "version": "1.0.0",
  "description": "simple express server",
  "main": "server.js",
  "repository": "none",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start": "node server.js"
  },
  "author": "joey whelan",
  "license": "MIT",
  "dependencies": {
    "express": "^4.17.1"
  }
}

Step 2:  Create the Dockerfile and .dockerignore

Below is the Dockerfile to support the above app.  It's written per best practices to cause the app source code to be added as the last layer to enable caching of modules.
FROM node:lts
WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install
COPY . .
EXPOSE 8080
CMD ["npm", "start"]
.dockerignore below.
node_modules
npm-debug.log

Step 3:  Build the Docker image

$ docker build -t containertest .
Sending build context to Docker daemon  19.97kB
Step 1/7 : FROM node:lts
lts: Pulling from library/node
844c33c7e6ea: Pull complete 
ada5d61ae65d: Pull complete 
f8427fdf4292: Pull complete 
f025bafc4ab8: Pull complete 
7a9577c07934: Pull complete 
9b4289f800f5: Pull complete 
55e3fcab47b9: Pull complete 
c7a94e331913: Pull complete 
bb9efc0c132a: Pull complete 
Digest: sha256:88ee7d2a5e18d359b4b5750ecb50a9b238ab467397c306aeb9955f4f11be44ce
Status: Downloaded newer image for node:lts
 ---> 7be6a8478f5f
Step 2/7 : WORKDIR /usr/src/app
 ---> df2833d84c36
Removing intermediate container 11f00574e18a
Step 3/7 : COPY package*.json ./
 ---> a892506a76df
Removing intermediate container 07fc76863a41
Step 4/7 : RUN npm install
 ---> Running in 7abdbc6d6e64
added 50 packages from 37 contributors and audited 126 packages in 1.197s
found 0 vulnerabilities

 ---> 9ab0afa5e750
Removing intermediate container 7abdbc6d6e64
Step 5/7 : COPY . .
 ---> 39620b323b38
Removing intermediate container f2692a5064a0
Step 6/7 : EXPOSE 8080
 ---> Running in 4e758301eaad
 ---> 076d56510119
Removing intermediate container 4e758301eaad
Step 7/7 : CMD npm start
 ---> Running in 80f1a6be42cc
 ---> c9c55b4ddca7
Removing intermediate container 80f1a6be42cc
Successfully built c9c55b4ddca7
Successfully tagged containertest:latest
Resulting images below:
$ docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
containertest       latest              c9c55b4ddca7        3 minutes ago       911MB
node                lts                 7be6a8478f5f        2 weeks ago         908MB

Step 4:  Run the container

$ docker run -p 8080:8080 -d containertest
d8d98f3fff8e752c186f99599ca475682790e3a5645d16705a398404ddf9ec74
Resulting running container below:
$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                    NAMES
d8d98f3fff8e        containertest       "docker-entrypoint..."   2 minutes ago       Up 2 minutes        0.0.0.0:8080->8080/tcp   thirsty_dijkstra
Execution of HTTP request against the containerized server below:
$ curl -v localhost:8080/
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.58.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< X-Powered-By: Express
< Content-Type: text/html; charset=utf-8
< Content-Length: 11
< ETag: W/"b-Kq5sNclPz7QV2+lfQIuc6R7oRu0"
< Date: Mon, 09 Dec 2019 16:44:36 GMT
< Connection: keep-alive
< 
* Connection #0 to host localhost left intact
hello world

Step 5:  Stop the container

Stop command results below.  Note it's possible to abbreviate the container ID.
$ docker stop d8
d8
$ docker ps
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS               NAMES

Step 6:  Clean up

Commands below to delete the container and its image.
$ docker ps -a
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS                     PORTS               NAMES
d8d98f3fff8e        containertest       "docker-entrypoint..."   17 minutes ago      Exited (0) 4 minutes ago                       thirsty_dijkstra
$ docker rm d8
d8
$ docker ps -a
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS               NAMES
$ docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
containertest       latest              c9c55b4ddca7        26 minutes ago      911MB
node                lts                 7be6a8478f5f        2 weeks ago         908MB
$ docker rmi c9
Untagged: containertest:latest
Deleted: sha256:c9c55b4ddca7aac2d12a93c66ee1adf95b5e2b4b09a7b93aac2e0981b45006be
Deleted: sha256:076d565101195290512b90eba0d25f16a3798db0f1cf49aea4d096e926d03250
Deleted: sha256:39620b323b38b824b8f074183eda49339748f2516a70d19718d821a086826234
Deleted: sha256:20407ee7b27893df082e6fa7eddb9608517d53a930beaf426c37ac2453949714
Deleted: sha256:9ab0afa5e750b610d08ed12258972e8d880d8acdd8b3034bd96add8c5daea705
Deleted: sha256:b972df701627963f9fa4dbb2ef1c20148cdddd8a8922aea6c3ba8e2ceca62c27
Deleted: sha256:a892506a76df6ceaaff88b3fe14ee30de477fc9596cb8236aeeee0b3a0106e76
Deleted: sha256:4fab789311ef158be2b924dcdaa1646802900913e07d645f95adb299ee09c506
Deleted: sha256:df2833d84c365c86e3c5218cc997d3ec958e1e4f68eb47cb82483cbd2a14c738
Deleted: sha256:43dd5d9ada9dc352d7fdf5cd3b179cd0855681851eef378a5c82b3ce682bc17e
$ docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
node                lts                 7be6a8478f5f        2 weeks ago         908MB

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Sunday, October 27, 2019

Trading on Trump's Tweets


Summary

There have been several articles during President Trump's term regarding his use of Twitter and his influence on stock prices.  Below is one such article:

https://fortune.com/2017/02/24/trump-tweet-stocks/ 

This post explores development of a programmatic analysis of Trump's tweets that are focused on publicly traded companies.  I use a variety of APIs to ferret out the tweets of interest and then take action on them.  In this exercise, I simply generate an alert email; however, one could envision automated trading as the action.

This post represents the culmination of the my Twitter API blog series:

Architecture

This is a Node-based architecture that uses various publicly accessible REST APIs.

 

Processing Logic

 

 

Code Excerpt - Tweet Processing

The function below accepts tweet text as input and then sends that text through Google's NL engine for entity analysis. If the top entity, ranked by salience, is an "ORGANIZATION" - then there's a chance the tweet is regarding a company. The next step is to use the IEX Cloud API to determine if the entity does in fact corresponds to a publicly traded company.  If so, then perform full processing of the tweet:  gather further NL + stock analytics and then package them for an email alert.

async function processTweet(tweet) {
    logger.debug(`processTweet()`);
    try {
        const esnt = await entitySentiment(GOOGLE_KEY, ENTITY_SENTIMENT_URL, tweet);

        if (esnt.type === 'ORGANIZATION') { //entity corresponds to a type that might be a company
            let stock;
            if (Array.isArray(symbolArray)) {
                stock = symbolArray.find(obj => {
                    return obj.name.match(esnt.name);
                });
                if (stock) {  //name corresponds to a publicly traded company - fetch full tweet sentiment
                    //and stock data
                    const snt = await sentiment(GOOGLE_KEY, SENTIMENT_URL, tweet);
                    const data = await getStockData(IEX_KEY, STOCK_URL, stock.symbol);

                    let analytics = {};
                    analytics.tweet = tweet;
                    analytics.name = esnt.name;
                    analytics.salience = esnt.salience;
                    analytics.entitySentiment = esnt.entitySentiment;
                    analytics.documentSentiment = snt;
                    let mag = (analytics.entitySentiment.magnitude + analytics.documentSentiment.magnitude) / 2;
                    let score = (analytics.entitySentiment.score + analytics.documentSentiment.score) / 2;
                    analytics.aggregate = mag * score;
                    analytics.symbol = stock.symbol;
                    analytics.data = data;
                    sendEmail(SENDGRID_KEY, SENDGRID_URL, analytics);
                }
            }
        }
    }
    catch(err) {
        logger.error(err);
    }
}

Code Excerpt - Fetch Stock Data

Excerpt below exercises IEX's API to fetch a few simple stock data items.  This API is quite rich.  There is significantly more analytics available that what I've pulled below:  current stock price and previous day's history.

async function getStockData(token, url, symbol) {
    logger.debug(`getStockData() - name:${symbol}`);
    
    let data = {};
    const price = await getPrice(token, url, symbol);
    const previous = await getPrevious(token, url, symbol);
    data.current_price = price;
    data.date = previous.date;
    data.open = previous.open;
    data.close = previous.close;
    data.high = previous.high;
    data.low = previous.low
    return data;
}

Code Excerpt - Test

Test function below submits a tweet President Trump unleashed on Harley-Davidson on June 25, 2018.
async function test() {
    symbolArray = await getSymbols(IEX_KEY, SYMBOL_URL);
    const tweet1 = "Surprised that Harley-Davidson, of all companies, would be the first to wave the White Flag. I
fought hard for them and ultimately they will not pay tariffs selling into the E.U., 
which has hurt us badly on trade, down $151 Billion. Taxes just a Harley excuse - be patient!";
    await processTweet(tweet1);
}

test()
.then(() => {
    console.log('complete');
});

Results

Excerpt below of the raw email text that was generated.

Date: Sun, 27 Oct 2019 17:54:52 +0000 (UTC)
From: twitterTrade@example.com
Mime-Version: 1.0
To: joey.whelan@gmail.com
Message-ID: 
Content-type: multipart/alternative; boundary="----------=_1572198892-24558-282"
Subject: Twitter Trade Alert - Negative Tweet: Harley-Davidson

{
    "tweet": "Surprised that Harley-Davidson, of all companies, would be th=
e first to wave the White Flag. I fought hard for them and ultimately they =
will not pay tariffs selling into the E.U., which has hurt us badly on trad=
e, down $151 Billion. Taxes just a Harley excuse - be patient!",
    "name": "Harley-Davidson",
    "salience": 0.35687405,
    "entitySentiment": {
        "magnitude": 0.4,
        "score": 0
    },
    "documentSentiment": {
        "magnitude": 0.9,
        "score": -0.1
    },
    "aggregate": -0.0325,
    "symbol": "HOG",
    "data": {
        "current_price": 39.39,
        "date": "2019-10-25",
        "open": 38.64,
        "close": 39.39,
        "high": 39.69,
        "low": 38.64
    }
}

Source

https://github.com/joeywhelan/twitterTrade

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Thursday, October 24, 2019

Twitter Filtered Stream


Summary

This post discusses my use of  Twitter Developer Labs (beta) APIs for creating a real-time tweet feed.  The APIs are all HTTP-based.  The actual streaming tweet feed is a HTTP connection that, in theory, never ends.

Architecture

The diagram below depicts the overall flow for this exercise.
  • An API token has to be fetched to call any of the Twitter APIs.
  • Fetch any existing tweet filter rules
  • Delete them
  • Add new filtering rules
  • Start streaming a tweet feed based on those filtering rules


Fetch API Token

I discussed the steps for that in this post.

Get Existing Filter Rules

The code below fetches any existing filtering rules in place for the given account associated with the bearer token.

const RULES_URL  = 'https://api.twitter.com/labs/1/tweets/stream/filter/rules';
async function getRules(token, url) {
    console.debug(`${(new Date()).toISOString()} getRules()`);
    
    try {
        const response = await fetch(url, {
            method: 'GET',
            headers: {
            'Authorization' : 'Bearer ' + token
            }
        });
        if (response.ok) {
            const json = await response.json();
            return json;
        }
        else {
            throw new Error(`response status: ${response.status} ${response.statusText}`);    
        }
    }
    catch (err) {
        console.error(`${(new Date()).toISOString()} getRules() - ${err}`);
        throw err;
    }
}

Delete Existing Filter Rules

Passing an array of filter IDs, delete that array from Twitter for the account associated with the bear token.

async function deleteRules(token, ids, url) {
    console.debug(`${(new Date()).toISOString()} deleteRules()`);
 
    const body = {
        'delete' : {
            'ids': ids
        }
    };
    try {
        const response = await fetch(url, {
            method: 'POST',
            headers: {
                'Content-Type' : 'application/json',
                'Authorization' : 'Bearer ' + token
            },
            body: JSON.stringify(body)
        });
        if (response.ok) {
            const json = await response.json();
            return json.meta.summary.deleted;
        }
        else {
            throw new Error(`response status: ${response.status} ${response.statusText}`);    
        }
    }
    catch (err) {
        console.error(`${(new Date()).toISOString()} deleteRules() - ${err}`);
        throw err;
    }
}

Add New Filtering Rules

The code below adds an array of filtering rules to a given account.  Example array with a single rule below.  That rule targets tweets from the President and filters out any retweets or quotes.
const RULES = [{'value' : 'from:realDonaldTrump -is:retweet -is:quote'}];
async function setRules(token, rules, url) {
    console.debug(`${(new Date()).toISOString()} setRules()`);
 
    const body = {'add' : rules};
    try {
        const response = await fetch(url, {
            method: 'POST',
            headers: {
                'Content-Type'  : 'application/json',
                'Authorization' : 'Bearer ' + token
            },
            body: JSON.stringify(body)
        });
        if (response.ok) {
            const json = await response.json();
            return json.meta.summary.created;
        }
        else {
            throw new Error(`response status: ${response.status} ${response.statusText}`);    
        }
    }
    catch (err) {
        console.error(`${(new Date()).toISOString()} setRules() - ${err}`);
        throw err;
    }
}

Stream Tweets

Below is an excerpt of the main streaming logic.  A link to the full source repo is at the bottom of this blog.  This excerpt follows happy path of a HTTP 200 response and starts up a theoretically never-ending reader stream to Twitter with tweets that match the filter criteria built up previously.  Twitter sends heartbeats on this connection every 20 seconds.
                g_reader = response.body;
                g_reader.on('data', (chunk) => {
                    try {
                        const json = JSON.parse(chunk);
                        let text = json.data.text.replace(/\r?\n|\r|@|#/g, ' ');  //remove newlines, @ and # from tweet text
                        console.log(`${(new Date()).toISOString()} tweet: ${text}`);
                    }
                    catch (err) {
                        //heartbeat will generate a json parse error.  No action necessary; continue to read the stream.
                        console.debug(`${(new Date()).toISOString()} stream() - heartbeat received`);
                    } 
                    finally {
                        g_backoff = 0;
                        clearTimeout(abortTimer);
                        abortTimer = setTimeout(() => { controller.abort(); }, ABORT_TIMEOUT * 1000);
                    } 
                });

Results

2019-10-24T14:01:01.906Z filter()
2019-10-24T14:01:01.909Z getTwitterToken()
2019-10-24T14:01:02.166Z clearAllRules()
2019-10-24T14:01:02.166Z getRules()
2019-10-24T14:01:02.353Z deleteRules()
2019-10-24T14:01:02.604Z number of rules deleted: 1
2019-10-24T14:01:02.605Z setRules()
2019-10-24T14:01:02.902Z number of rules added: 1
2019-10-24T14:01:02.903Z stream()
2019-10-24T14:01:03.179Z stream() - 200 response
2019-10-24T14:01:23.177Z stream() - heartbeat received
...
2019-10-24T14:20:03.657Z stream() - heartbeat received
2019-10-24T14:20:12.959Z tweet: The Federal Reserve is derelict in its duties if it 
doesn’t lower the Rate and even, ideally, stimulate. Take a look around the World at our 
competitors. Germany and others are  actually GETTING PAID to borrow money. Fed was way too 
fast to raise, and way too slow to cut!
2019-10-24T14:20:23.660Z stream() - heartbeat received

Source

https://github.com/joeywhelan/twitterFilter

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Wednesday, October 16, 2019

Twitter Analytics with Google Natural Language


Summary

I'll be taking Twitter tweets and processing them through Google's Natural Language APIs in this post.  The NL APIs provide the ability to parse text into 'entities' and/or determining 'sentiment' of the entity and surrounding text.  I'll be using a combination of the two to analyze some tweets.

Entity + Sentiment Analysis

Lines 4-23:  REST call to Google's NL entity-sentiment endpoint.  The response from that endpoint is an array of entities in rank order of 'salience' (relevance).  Entities have types, such as organization, person, etc.  Net, the tweet gets parsed into entities and I'm pulling the #1 entity from the tweet as ranked by salience.

const ENTITY_SENTIMENT_URL = 'https://language.googleapis.com/v1beta2/documents:analyzeEntitySentiment?key=' + GOOGLE_KEY;

    try {
        const response = await fetch(ENTITY_SENTIMENT_URL, {
            method : 'POST',
            body : JSON.stringify(body),
            headers: {'Content-Type' : 'application/json; charset=utf-8'},
        });

        if (response.ok) {
            const json = await response.json();
            const topSalience = json.entities[0];
            const results = {
                'name' : topSalience.name,
                'type' : topSalience.type,
                'salience' : topSalience.salience,
                'entitySentiment' : topSalience.sentiment
            }
            return results;
        }
        else {
            let msg = (`response status: ${response.status}`);
            throw new Error(msg);
        }
    }
    catch (err) {
        ts = new Date();
        let msg = (`${ts.toISOString()} entitySentiment() - ${err}`);
        console.error(msg)
        throw err;
    }

Sentiment Analysis

Lines 1-22 implement a sentiment analysis of the entire tweet text.
    try {
        const response = await fetch(SENTIMENT_URL, {
            method : 'POST',
            body : JSON.stringify(body),
            headers: {'Content-Type' : 'application/json; charset=utf-8'},
        });

        if (response.ok) {
            const json = await response.json();
            return json.documentSentiment;
        }
        else {
            let msg = (`response status: ${response.status}`);
            throw new Error(msg);
        }
    }
    catch (err) {
        ts = new Date();
        let msg = (`${ts.toISOString()} sentiment() - ${err}`);
        console.error(msg)
        throw err;
    }

Blending

Lines 1-16:  This function calls upon both of the Google NL API functions above and provides a blended analysis of the tweet text.  Below, I took the product of the averages of the magnitude (amount of emotion) and score (positive vs negative emotion) between the entity-sentiment and overall sentiment to arrive at an aggregate figure.  There are certainly other ways to combine these factors.

async function analyze(tweet) {
    const esnt = await entitySentiment(tweet);
    const snt = await sentiment(tweet);

    let results = {};
    results.tweet = tweet;
    results.name = esnt.name;
    results.type = esnt.type;
    results.salience = esnt.salience;
    results.entitySentiment = esnt.entitySentiment;
    results.documentSentiment = snt;
    let mag = (results.entitySentiment.magnitude + results.documentSentiment.magnitude) / 2;
    let score = (results.entitySentiment.score + results.documentSentiment.score) / 2;
    results.aggregate = mag * score;
    return results;
}

Execution

Lines 1-4:  Simple function for reading a JSON-formatted file of tweets.
Lines 6-17:  Reads a file containing an array of tweets and process each through the Google NL functions mentioned above.
async function readTweetFile(file) {
    let tweets = await fsp.readFile(file);
    return JSON.parse(tweets);
}

readTweetFile(INFILE)
.then(tweets => {
    for (let i=0; i < tweets.length; i++) {
        analyze(tweets[i].text)
        .then(json => {
            console.log(JSON.stringify(json, null, 4));
        });
    }
})
.catch(err => {
    console.error(err);
});

Results

Below is an example of a solid negative sentiment from Donald Trump regarding Adam Schiff.  Schiff is accurately identified as a 'Person' entity.  Note the negative scores + high emotion (magnitude) in both the entity sentiment and overall sentiment analysis.

{
    "tweet": "Shifty Adam Schiff wants to rest his entire case on a Whistleblower who he now
     says can’t testify, & the reason he can’t testify is that he is afraid to do so 
     because his account of the Presidential telephone call is a fraud & 
     totally different from the actual transcribed call...",
    "name": "Adam Schiff",
    "type": "PERSON",
    "salience": 0.6048015,
    "entitySentiment": {
        "magnitude": 3.2,
        "score": -0.5
    },
    "documentSentiment": {
        "magnitude": 0.9,
        "score": -0.9
    },
    "aggregate": -1.435
}

Below is another accurately detected 'person' entity with a positive statement from the President.
{
    "tweet": "Kevin McAleenan has done an outstanding job as Acting Secretary of
Homeland Security. We have worked well together with Border Crossings being way down.
Kevin now, after many years in Government, wants to spend more time with his family and
go to the private sector....",
    "name": "Kevin McAleenan",
    "type": "PERSON",
    "salience": 0.6058554,
    "entitySentiment": {
        "magnitude": 0.4,
        "score": 0
    },
    "documentSentiment": {
        "magnitude": 1.2,
        "score": 0.3
    },
    "aggregate": 0.12
}

Source

https://github.com/joeywhelan/twitterAnalytics

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Sunday, October 13, 2019

Twitter Premium Search API - Node.js


Summary

In this post I'll demonstrate how to use the Twitter Premium Search API.   This is a pure REST API with two different search modes:  past 30 days or full archive search since Twitter existed (2006).

The API has a very limited 'free' mode for Developers to try out.  Limits are imposed on usage:  number of API requests, tweets pulled per month and rate of API calls.  To do anything of significance with this API, you're faced with paying for Twitter's API subscription.  That gets pretty pricey quickly with the cheapest tier currently at $99/month.  This post is based on usage of the 'free'/sandbox tier.

Main Loop

Line 1 fetches a bearer token for accessing the Twitter APIs.  I covered this topic in a previous post.

Lines 4-19 implement a while loop that fetches batches of tweets for a given search query.  For the Twitter free/sandbox environment, you can pull up to 100 tweets per API call.  Each tweet in the batch is evaluated to determine if it was 140 or 280 character tweet.  The tweet text is formatted and then that and the created_date are added to a JSON array.  That array is ultimately written to file.

Line 20 is a self-imposed delay on calls to the Twitter API.  If you bust their rate limits, you'll get a HTTP 429 error.

        const token = await getTwitterToken(AUTH_URL);
        let next = null;
        
        do {
            const batch = await getTweetBatch(token, url, query, fromDate, maxResults, next);
            for (let i=0; i < batch.results.length; i++) {  //loop through the page/batch of results
                let tweet = {};
                if (batch.results[i].truncated) {  //determine if this is a 140 or 280 character tweet
                    tweet.text = batch.results[i].extended_tweet.full_text.trim();
                }
                else {
                    tweet.text = batch.results[i].text.trim();
                }

                tweet.text = tweet.text.replace(/\r?\n|\r|@|#/g, ' ');  //remove newlines, @ and # from tweet text
                tweet.created_at = batch.results[i].created_at;
                tweets.push(tweet);
            }
            next = batch.next;
            await rateLimiter(3);  //rate limit twitter api calls to 1 per 3 seconds/20 per minute
        }
        while (next);

Tweet Batch Fetch

Lines 1-26 set up a node fetch to the Twitter REST API end point.  If this was a call with a 'next' parameter (meaning multiple pages of tweets on a single search), I add that parameter to the fetch.

    const body = {
        'query' : query,
        'fromDate' : fromDate,
        'maxResults' : maxResults
    };
    if (next) {
        body.next = next;
    }

    try {
        const response = await fetch(url, {
            method: 'POST',
            headers: {
            'Authorization' : 'Bearer ' + token
            },
            body: JSON.stringify(body)
        });
        if (response.ok) {
            const json = await response.json();
            return json;
        }
        else {
            let msg = (`authorization request response status: ${response.status}`);
            throw new Error(msg);    
        }
    }

Usage

let query = 'from:realDonaldTrump -RT';  //get tweets originated from Donald Trump, filter out his retweets
let url = SEARCH_URL + THIRTY_DAY_LABEL;  //30day search
let fromDate = '201910010000'; //search for tweets within the current month (currently, Oct 2019)
search(url, query, fromDate, 100)  //100 is the max results per request for the sandbox environment 
.then(total => {
    console.log('total tweets: ' + total);
})
.catch(err => {
    console.error(err);
});

Output

Snippet of the resulting JSON array from the function call above.
[
    {
        "text": "We have become a far greater Economic Power than ever before, and we are using that power for WORLD PEACE!",
        "created_at": "Sun Oct 13 14:32:37 +0000 2019"
    },
    {
        "text": "Where’s Hunter? He has totally disappeared! Now looks like he has raided and scammed even more countries! Media is AWOL.",
        "created_at": "Sun Oct 13 14:15:55 +0000 2019"
    },

Source

https://github.com/joeywhelan/twitterSearch

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Sunday, September 22, 2019

Fetching an Application-only Twitter API Token via Node


Summary


This short post demonstrates the steps necessary to fetch an app-only bearer token via Twitter's OAuth2 interface using Node.js.  That token would then be subsequently used to access Twitter's APIs.  This post follows the steps explained on the Twitter developer site here.

Set-up

  1. Create a developer account as described here.  
  2. Create an 'application' as described here.
  3. At this point, you have a 'Consumer Key' and 'Consumer Secret'.  Those two strings will be used in the code shown below.

Code

 

Create the Consumer Token

Per the Twitter documentation, the Consumer Key and Secret need to be URL encoded, concatentated, and then base64-encoded.
const CONSUMER_KEY = process.env.CONSUMER_KEY;
const CONSUMER_SECRET = process.env.CONSUMER_SECRET;

function urlEncode (str) {
    return encodeURIComponent(str)
      .replace(/!/g, '%21')
      .replace(/'/g, '%27')
      .replace(/\(/g, '%28')
      .replace(/\)/g, '%29')
      .replace(/\*/g, '%2A')
}

const consumerToken = btoa(urlEncode(CONSUMER_KEY) + ':' + urlEncode(CONSUMER_SECRET));

Fetch the Bearer Token

Code below uses node-fetch to execute a HTTP POST to the Twitter OAuth2 interface.  If the fetch is successful, the bearer token is inside a JSON object returned by that interface.

    return fetch(url, {
        method: 'POST',
        headers: {
            'Authorization' : 'Basic ' + consumerToken,
            'Content-Type' : 'application/x-www-form-urlencoded;charset=UTF-8'
        }, 
        body : 'grant_type=client_credentials'
    })
    .then(response => {
        if (response.ok) {
            return response.json();
        }
        else {
            throw new Error('Response Status: ' + response.status);
        }
    })
    .then(json => {
        if (json.token_type == 'bearer') {
            return json.access_token;
        }
        else {
            throw new Error('Invalid token type: ' + json.token_type);
        }
    });  

Source


Full source here: https://github.com/joeywhelan/authTest

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Sunday, July 21, 2019

API Development with GCP


Summary

This post covers my use of Google Cloud Platform (GCP) to build a scalable piece of middleware, theoretically, infinitely scalable.  The notional use case here is a store locator microservice for a company that has thousands of locations.  The service will provide the closest store location for a given ZIP code or GPS coordinates.  The location will be returned as a URL representing the directions on Google Maps.

Overall Architecture

For this particular application, I utilized GCP-based services for the entirety of the core app.  I additionally leveraged the services of Auth0 for authentication services.  The API is realized in a REST structure - two GET-based services.

GCP Architecture

I used Google App Engine Flex (GAE) for the application core.  I provided front-end API support with Cloud Endpoints.  Caching of store and  ZIP coordinates utilizes Google's cloud implementation of Redis - Cloud Memorystore.  A combination of Cloud Functions and Cloud Storage provides the ability for real-time updating of the cache with simple configuration file modifications.

API Proxy + Authentication Layer

All requests into the webserver written in node.js on GAE Flexible are proxied by Cloud Endpoints.  Endpoints provides redirection to HTTPS and authentication proxying, among other things.  I chose utilize the OAuth services provided by Auth0.  They in turn proxy the JWT-based (RSA256) authentication that provides security for the API.



Configuration of the Cloud Endpoints is accomplished via an OpenAPI ver 2.0 YAML file.

swagger: "2.0"
info:
  title: "Store Locator"
  description: "Generate a Google maps directions URL to the nearest 
  store based on user's current ZIP code or coordinates"
  version: "1.0.0"
host: "youraccount.appspot.com"
consumes:
- "text/plain"
produces:
- "text/uri-list"
schemes:
  - "https"
securityDefinitions:
  auth0_jwk:
    authorizationUrl: ""
    flow: "implicit"
    type: "oauth2"
    x-google-issuer: "https://yourcaccount.auth0.com/"
    x-google-jwks_uri: "https://youraccount.auth0.com/.well-known/jwks.json"
    x-google-audiences: "https://locatorUser1"
security:
  - auth0_jwk: []
paths:
  /locator/zip:
    get:
      summary: "Find nearest store by ZIP code"
      operationId: "ZIP"
      description: "ZIP code"
      parameters:
        -
          name: zip
          in: query
          required: true
          type: string
      responses:
        200:
          description: "Google Maps URL with directions from input ZIP to nearest store"
          schema: 
            type: string
        404:
          description: "Error Message"
          schema:
            type: string
  /locator/coordinates:
    get:
      summary: "Find nearest store by coordinates (latitude, longitude)"
      operationId: "Coordinates"
      description: "Latitude, Longitude"
      parameters:
        -
          name: coordinates
          in: query
          required: true
          type: string
      responses:
        200:
          description: "Google Maps URL with directions from input coordinates to nearest store"
          schema: 
            type: string
        404:
          description: "Error Message"
          schema:
            type: string


Application Layer

The heart of this is a node.js application that realizes two HTTP GET paths in Express.  The paths allow for a search of the closest store based on the user's current ZIP code or GPS coordinates (latitude + longitude).  Cloud Memorystore keeps an updated set of coordinates for the ZIP codes and store locations.

Code Snippet - Get Closest Store by Coordinates

I use a filter based on the haversine formula to narrow down the list of closest store candidates.  That formula finds 'as the crow flies' distances.  Once those candidates have been found, I then send them into a Google Maps API service (Distance Matrix) for actual driving distances.  Ultimately, the API call returns a URL that corresponds to the actual driving directions between the origin and store coordinates.

/**
 * Performs the Haversine formula to generate the great circle distance between two coordinates.
 * @param {object} coord1 - latitude & longitude
 * @param {object} coord2 - latitude & longitude
 * @return {int} - great circle distance between the two coordinates
 */
function haversine(coord1, coord2) {
 let lat1 = coord1.lat;
 let lon1 = coord1.long;
 let lat2 = coord2.lat;
 let lon2 = coord2.long;
 const R = 3961;  //miles
    const degRad = Math.PI/180;
    const dLat = (lat2-lat1)*degRad;
    const dLon = (lon2-lon1)*degRad;
 
 lat1 *= degRad;
    lat2 *= degRad;
    const a = Math.sin(dLat/2) * Math.sin(dLat/2) + 
        Math.sin(dLon/2) * Math.sin(dLon/2) * Math.cos(lat1) * Math.cos(lat2);
    const c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
    return R * c;
}

/**
 * Fetches a configurable number of stores that are closest to a given coordinate.
 * @param {object} origin - latitude & longitude
 * @param {int} numVals - number of closest stores to return
 * @return {array} - array of the numVal closest stores
 */
function getStoresByCoord(origin, numVals) {
 console.log(`getStoresByCoord(JSON.stringify(origin), numVals)`);
 let distances = [];
 
 if (numVals > storeList.length) numVals = storeList.length;

 //performs a haversine dist calc between the origin and each of the stores
 for (let i=0; i < storeList.length; i++) {
  const dist = haversine(origin, {'lat' : storeList[i].lat, 'long' : storeList[i].long});
  const val = {'index': i, 'distance': dist};
  distances.push(val);
 }

 let stores = [];
 distances.sort(compareDist);
 for (let i = 0; i < numVals; i++) {
  stores.push(storeList[distances[i].index]);
 }
 
 return stores;
}

/**
 * Fetches the closest store location based on the user's lat/long
 * Performs an initial filtering based ZIP code only, then refines a configurable number of closest
 * stores using actual driving distance from the Google Maps Distance Matrix
 * API.
 * @param {string} origin - lat/long of origin location
 * @param {array of strings} stores - lat/long(s) of store locations
 * @return {string} - URL of Google map with nearest store
 */
function getClosestStore(origin, stores) {
 console.log(`getClosestStore(JSON.stringify(origin))`);
 
 let dests = [];
 for (const store of stores) {
  dests.push({lat: store.lat, lng: store.long});
 }
 return mapsClient.distanceMatrix({
  'origins': [{lat: origin.lat, lng: origin.long}],
  'destinations': dests
 })
 .asPromise()
 .then((response) => {
  if (response.status == 200 && response.json.status == 'OK') {
   let minIndex;
   let minDist;
   const elements = response.json.rows[0].elements
   for (let i=0; i < elements.length; i++) {
    if (minDist == null || elements[i].distance.value < minDist) {
     minIndex = i;
     minDist = elements[i].distance.value;
    }
   }
   return stores[minIndex];
  }
  else {
   throw new Error('invalid return status on Google distanceMatrix API call');
  }
 })
 .catch(err => {
  console.error(`getClosestStore(): ${JSON.stringify(err)}`);
  throw err;
 });
}

/**
 * Creates a google maps url with the directions from an origin to a store location.
 * @param {object} origin - latitude & longitude
 * @param {object} store - object containing store address info, to include latitude & longitude
 * @return {string} - Google Maps URL showing directions from origin to store location
 */
function getDirections(origin, store) {
 return MAPSURL + `&origin=origin.lat, origin.long` + `&destination=store.lat, store.long`;
}

/**
 * Fetches the closest store location based on the user's lat/long.
 * @param {string} coordinates - lat/long of user's current location
 * @return {string} - URL of Google map with nearest store
 */
app.get('/locator/coordinates', (request, response) => {
 const vals = request.query.coordinates.split(',');
 const origin = {'lat' : vals[0], 'long' : vals[1]};
 const stores = getStoresByCoord(origin, 3);
 getClosestStore(origin, stores)
 .then(store => {
  const url = getDirections(origin, store);
  response.status(200).send(url);
 })
 .catch(err => {
  response.status(404).send(err.message);
 });
});

Caching Layer

Google Cloud Memorystore is a cloud implementation of Redis.  I use this service to provide a real-time updatable set of ZIP code and store location coordinates.

Configuration + Persistence Layer

This app is able to get to real-time updates on stores and ZIP codes simply by reloading files into Google Cloud Storage.  This allows move/add/deletes of stores without any modification of code or restarts of the application.  I created a Cloud Function that is triggered on modification of either the stores or ZIP code files.  After being triggered, the Cloud Function updates the Memorystore cache with the latest information.

Code Snippet - Cloud Function (gcsMonitor.js)


/**
* Public function for reading the Store location file from Google Cloud Storage
* and loading into Google CloudMemory(redis).  
* Will propagate exceptions.
*/
function loadStoreCache() {
 console.log(`loadStoreCache() executing`);
 const bucket = storage.bucket(gcpBucket);
 const stream = bucket.file(gcpStoreFile).createReadStream();
 const client = redis.createClient(REDISPORT, REDISHOST);
 client.on("error", function (err) {
  console.log("loadStoreCache() Redis error:" + err);
 });

 csv()
 .fromStream(stream)
 .subscribe((json) => {
  let hashKey;
  for (let [key, value] of Object.entries(json)) {
   if (key === 'storeNum') {
    hashKey = 'store:' + value;
   }
   else {
    console.log(`loadStoreCache() inserting hashKey`);
    client.hset(hashKey, key, value, (err, reply) => {
     if (err) {
      console.error(`loadStoreCache() Error: ${err}`);
     }
     
    });
   }
  }
 })
 .on('done', (err) => {
  client.quit();
  console.log(`loadStoreCache() complete`);
 });
};

Test Client

As discussed, the API uses Auth0 (proxied by Endpoints) for authentication.  The code below fetches a JWT token from Auth0 and then performs an API call with that token in the header.
function fetchToken() {

    const body = {
        'client_id': clientId,
        'client_secret': clientSecret,
        'audience': audience,
        'grant_type': 'client_credentials'
    };

    return fetch(tokenUrl, {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify(body)
    })
    .then(response => {
        if (response.ok) {
            return response.json();
        }
        else {
            console.error('fetchToken Error: ' + response.status);
        }
    }) 
    .then(json => {
        return json.access_token;
    })   
}

const apiUrl = 'https://yourapp.appspot.com/locator/coordinates/?coordinates=37.1464,-94.4630'
function authTest() {
    return fetchToken()
    .then(token => {
        fetch(apiUrl, {
            method: 'GET',
            headers: {
                'Authorization': 'Bearer ' + token
            }
        })
        .then(response => {
            if (response.ok) {
                return response.text();
            }
            else {
                console.error(response.status);
            }
        })
        .then(text => {
            console.log('Response: ' + text);
        })
    });
}

authTest();


Results

$ node testClient.js
Response: https://www.google.com/maps/dir/?api=1&origin=37.1464, -94.4630&destination=37.0885389, -94.5144897

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Saturday, March 2, 2019

Event Sourcing with Redis Streams


Summary

Streams were an addition to the Redis 5.0 release.  Redis streams are roughly analogous to a log file: an append-only data structure.  Redis streams also have some producer/consumer capabilities that are roughly analogous to Kafka streams, with some important differences.  A full explanation of Redis streams here.

Event sourcing is one of those old topics that has been brought back to life again in a new context.  The new context is state persistence and messaging for microservice architectures.  A full explanation of event sourcing here.

This post is about my adventures at implementing event sourcing via Redis streams.  The example is simple and contrived - an account microservice that allows deposits and withdrawals.  I made a rough attempt at implementing this in a 'domain-driven design' model, but I didn't fully adhere to that model as I didn't care for the level of abstraction necessary.

Even though the scenario is simple, my overall impression of event sourcing is - it's hard.  It's hard to think about code in an event-driven manner and it's hard to implement it correctly in a distributed architecture.

Overall Architecture

Diagram below of the high-level architecture:  REST-based microservice that leverages Redis for the event store and MongoDB for event data aggregation.

Service Architecture

Microserver application arch below.  I implemented this with a Node.js HTTP server for the REST routes and an Account Service class that has an event store client and an array of account aggregates.

Projection Architecture

Architecture for the aggregating data from the event store below.  Again, Node.js implementation with a Redis client that realizes an event store functionality and a MongoDB client for aggregating data from events.

Event Store Architecture

A rough outline of what the event store implementation looks like.  I use Redis objects, in particular, the stream object to realize event sourcing functionality such as fetching an event, publishing an event, subscribing for events, etc.

Code Snippets

Creating an Account - accountService.js

The code below leverages a Redis Set object to ensure unique account ID's.  A JSON object is then created with the corresponding 'create' event and then 'published' to Redis.

 create(id) {
  return this._client.addId(id, 'accountId')
  .then(isUnique => {
   logger.debug(`AccountService.create - id:${id} - isUnique:${isUnique}`);
   if (isUnique) {
    const newEvent = {'id' : id, 'version': 0, 'type': 'create'};
    return this._client.publish('accountStream', newEvent);
   }
   else {
    return new Promise((resolve, reject) => {
     resolve(null);
    });
   }
  })
  .then (results => {
   if (results && results.length === 2) {  //results is an array.  first item is the new version number of the aggregate, 
             //second is the timestamp of the create event that was published
    logger.debug(`AccountService.create - id:${id} - results:${results}`);
    const version = results[0];
    const timestamp = results[1];
    const account = new Account(id, version, timestamp);
    this._accounts[id] = account;  //add the new account to the cache
    return {'id' : id};
   }
   else {
    throw new Error('Attempting to create an account id that already exists');
   }
  })
  .catch(err => {
   logger.error(`AccountService.create - id:${id} - ${err}`);
   throw err;
  });
 }

Making a deposit to an Account - accountService.js

The code below attempts to load the account from cache or replay of events if not in cache.  Business logic for a deposit is implemented in the account aggregate (account.js).  If the aggregate allows the deposit, then an event is published.  If the publishing of the event fails (concurrency conflict), the deposit is rolled back and an error is thrown.

 deposit(id, amount) {
  let account;
  
  return this._loadAccount(id) //attempt to load the account from cache and/or rehydrate from events
  .then(result => {
   account = result;
   account.deposit(amount);
   const newEvent = {'id' : id, 'version' : account.version, 'type': 'deposit', 'amount': amount};
   return this._client.publish('accountStream', newEvent);
  })
  .then(results => {
   logger.debug(`AccountService.deposit - id:${id}, amount:${amount} - results:${results}`);
   if (results) {
    account.version = results[0];
    account.timestamp = results[1];
    this._accounts[id] = account; //update the account cache
    return {'id': id, 'amount': amount};
   }
   else {
    account.withdraw(amount); //rolling back aggregate due to unsuccessful publishing of deposit event
    return null;
   }
  })
  .catch(err => {
   logger.error(`PlayerService.deposit - id:${id}, amount:${amount} - ${err}`);
   throw err;
  });
 }

Publishing an event - eventStoreClient.js

The code below implements concurrency control to the event store with Redis' 'watch' method.  Only one process will be permitted to publish an event with a given 'version' number.

 publish(streamName, event) { 
  logger.debug(`EventStoreClient.publish`);
  this._client.watch(event.id); //watch the id (account)
  return this._getAsync(event.id)  //fetch the current version from a Redis key with that ID 
  .then(result => { 
   if (!result || parseInt(result) === parseInt(event.version)) {  //key doesn't exist or versions match
    event.version += 1;  //increment version number prior to publishing the event
    logger.debug(`EventStoreClient.publish - streamName:${streamName}, event:${JSON.stringify(event)}\
     - result:${result}`);
    return new Promise((resolve, reject) => {
     this._client.multi()  //atomic transaction that increments the version and adds event to stream
     .incr(event.id)
     .xadd(streamName, '*', 'event', JSON.stringify(event))
     .exec((err, replies) => {
      if (err) {
       reject(err);
      }
      else {
       resolve(replies);
      }
     });
    });
   }
   else {  //covers the scenario where a concurrent access causes a mismatch with event version numbers
     //return null and then it's up to the client to make another publish attempt
    return new Promise((resolve, reject) => {
     resolve(null);
    });
   }
  })
  .catch(err => {
   logger.error(`EventStoreClient.publish - streamName:${streamName}, event:${event} - ${err}`);
   throw err;
  });
 }

Subscribing for events - eventStoreClient.js

The Redis streams implementation doesn't provide standard pub/sub functionality.  Subscriber behavior can be emulated though using Node.js event emitters coupled with Redis consumer groups.  A given consumer group is read below periodically via setInterval.  If new events are present, they're emitted via the emitter.  The 'subscriber' would implement the corresponding Node event handler for the emitter returned by this function.  This is precisely how the 'projector' is implemented for aggregating event data to a MongoDB database.

 subscribe(streamName, consumerName) { 
  let emitter;
  let groupName = streamName + 'Group';
  logger.debug(`EventStoreClient.subscribe - streamName:${streamName}, groupName:${groupName}, consumerName:${consumerName}`);
  
  if (this._emitters[streamName] && this._emitters[streamName][groupName]) {
   emitter = this._emitters[streamName][groupName];
  }
  else {
   this._client.xgroup('CREATE', streamName, groupName, '0', (err) => {});  //attempt to create Redis group
   emitter = new events.EventEmitter();
   let obj = setInterval(() => {
    this._readGroup(streamName, groupName, consumerName)
    .then(eventList => {
     if (eventList.length > 0) {
      emitter.emit('event', eventList);
     }
    })
    .catch(err => {
     logger.error(`EventStoreClient.subscribe - streamName:${streamName}, groupName:${groupName},\
     consumerName:${consumerName} - ${err}`);
     throw err;     
    });
   }, this._readInterval);
   if (!this._emitters[streamName]) {
    this._emitters[streamName] = {};
   }
   this._emitters[streamName][groupName] = emitter;
   this._intervals.push(obj); 
  }
  
  return emitter;
 }

Sample Results

Below the state of a Redis instance after the following actions:  Create account, Deposit $100, Withdraw $100
127.0.0.1:6379> xrange accountStream - +
1) 1) "1551312621884-0"
   2) 1) "event"
      2) "{\"id\":\"JohnDoe\",\"version\":1,\"type\":\"create\"}"
2) 1) "1551312827949-0"
   2) 1) "event"
      2) "{\"id\":\"JohnDoe\",\"version\":2,\"type\":\"deposit\",\"amount\":100}"
3) 1) "1551312847014-0"
   2) 1) "event"
      2) "{\"id\":\"JohnDoe\",\"version\":3,\"type\":\"withdraw\",\"amount\":100}"
Below is the corresponding state of a MongoDB collection being used for event data aggregation.
> db.accountCollection.find()
{ "_id" : "JohnDoe", "funds" : 0, "timestamps" : [ "1551312621884-0", "1551312827949-0", "1551312847014-0" ] }

Source

Full source w/comments here: https://github.com/joeywhelan/redisStreamEventStore

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

NICE inContact WorkItem Routing


Summary

I'll demonstrate the use of the NICE inContact Work Item API in this post.  That API provides the ability to bring non-native tasks into the routing engine.  The 'workitem' to be routed will be email from Google Gmail.  I'll cover the use of the Gmail API in addition to WorkItem.

This is a contrived example with, at best, demo-grade code.

Overall Architecture

Below is a depiction of the overall set up of this exercise.  Emails are pulled from Gmail via API, submitted to InContact's routing engine via API, then delivered to an agent target via a routing a strategy.


Application Architecture

Below is the application layout.  

Routing Logic

Picture of the routing strategy used for this demo.  This a very simple strategy that pulls a work item from queue and then pops a web page with the email contents to an agent.


Code Snippets


Main Procedure

Code below performs the OAuth2 handshake with Google, gets a list of messages currently in the Inbox, then packages up an array of Promises each of which pulls message contents from Gmail and submit them to the Work Item API.
function processMessage(id) {
 return gmail.getMessage(id)
 .then(msg => {
  return workItem.sendEmail(msg.id, msg.from, msg.payload);
 })
 .then(contactId => {
  return {msgId : id, contactId : contactId};
 });
}

gmail.authorize()
.then(_ => {
 return gmail.getMessageList();
})
.then((msgs) => {
 let p = [];
 msgs.forEach(msg => {
  p.push(processMessage(msg.id));
 });
 return Promise.all(p);
})
.then(results => {
 console.log(results);
})
.catch((err) => {
 console.log(err);
});

Gmail GetMessageList and GetMessage

getMessageList() pulls a list of Gmail IDs currently with the INBOX label.  getMessage() pulls the content of a message for a given Gmail ID.

 getMessageList() {
  console.log('getMessageList()');
  const auth = this.oAuth2Client;
  const gmail = google.gmail({version: 'v1', auth});
  return new Promise((resolve, reject) => {
   gmail.users.messages.list({userId: 'me', labelIds: ['INBOX']}, (err, res) => {
    if (err) {
     reject(err);
    }
    resolve(res);
   });
  })
  .then((res) => {
   return res.data.messages;
  })
  .catch((err) => {
   console.error('getMessageList() - ' + err.message);
   throw err;
  });
 }


 getMessage(id) {
  console.log('getMessage() - id: ' + id);
  const auth = this.oAuth2Client;
  const gmail = google.gmail({version: 'v1', auth});
  return new Promise((resolve, reject) => {
   gmail.users.messages.get({userId: 'me', 'id': id}, (err, res) => {
    if (err) {
     reject(err);
    }
    resolve(res);
   });
  })
  .then((res) => {
   const id = res.data.id;
   const arr = (res.data.payload.headers.find(o => o.name === 'From')).value
   .match(/([a-zA-Z0-9._-]+@[a-zA-Z0-9._-]+\.[a-zA-Z0-9._-]+)/gi);
   let from;
   if (arr) {
    from = arr[0];
   }
   else {
    from = '';
   }
   let payload = '';
   if (res.data.payload.body.data) {
    payload = atob(res.data.payload.body.data);
   }
   return {id: id, from: from, payload: payload};
  })
  .catch((err) => {
   console.error('getMessage() - ' + err.message);
   throw err;
  });
 }


WorkItem API Call

This consists of a simple POST with the workItem params in the body.

 postWorkItem(workItemURL, token, id, from, payload, type) {
  console.log('postWorkItem() - url: ' + workItemURL + ' from: ' + from);
  const body = {
    'pointOfContact': this.poc,
    'workItemId': id,
    'workItemPayload': payload,
    'workItemType': type,
    'from': from
  };
 
  return fetch(workItemURL, {
   method: 'POST',
   body: JSON.stringify(body),
   headers: {
    'Content-Type' : 'application/json', 
    'Authorization' : 'bearer ' + token
   },
   cache: 'no-store',
   mode: 'cors'
  })
  .then(response => {
   if (response.ok) {
    return response.json();
   }
   else {
    const msg = 'response status: ' + response.status;
    throw new Error(msg);
   }
  })
  .then(json => {
    return json.contactId;
  })
  .catch(err => {
   console.error('postWorkItem() - ' + err.message);
   throw err;
  });
 }

Results


Source

Full source w/comments here: https://github.com/joeywhelan/workitem

Copyright ©1993-2024 Joey E Whelan, All rights reserved.