Thursday, October 24, 2019

Twitter Filtered Stream


Summary

This post discusses my use of  Twitter Developer Labs (beta) APIs for creating a real-time tweet feed.  The APIs are all HTTP-based.  The actual streaming tweet feed is a HTTP connection that, in theory, never ends.

Architecture

The diagram below depicts the overall flow for this exercise.
  • An API token has to be fetched to call any of the Twitter APIs.
  • Fetch any existing tweet filter rules
  • Delete them
  • Add new filtering rules
  • Start streaming a tweet feed based on those filtering rules


Fetch API Token

I discussed the steps for that in this post.

Get Existing Filter Rules

The code below fetches any existing filtering rules in place for the given account associated with the bearer token.

const RULES_URL  = 'https://api.twitter.com/labs/1/tweets/stream/filter/rules';
async function getRules(token, url) {
    console.debug(`${(new Date()).toISOString()} getRules()`);
    
    try {
        const response = await fetch(url, {
            method: 'GET',
            headers: {
            'Authorization' : 'Bearer ' + token
            }
        });
        if (response.ok) {
            const json = await response.json();
            return json;
        }
        else {
            throw new Error(`response status: ${response.status} ${response.statusText}`);    
        }
    }
    catch (err) {
        console.error(`${(new Date()).toISOString()} getRules() - ${err}`);
        throw err;
    }
}

Delete Existing Filter Rules

Passing an array of filter IDs, delete that array from Twitter for the account associated with the bear token.

async function deleteRules(token, ids, url) {
    console.debug(`${(new Date()).toISOString()} deleteRules()`);
 
    const body = {
        'delete' : {
            'ids': ids
        }
    };
    try {
        const response = await fetch(url, {
            method: 'POST',
            headers: {
                'Content-Type' : 'application/json',
                'Authorization' : 'Bearer ' + token
            },
            body: JSON.stringify(body)
        });
        if (response.ok) {
            const json = await response.json();
            return json.meta.summary.deleted;
        }
        else {
            throw new Error(`response status: ${response.status} ${response.statusText}`);    
        }
    }
    catch (err) {
        console.error(`${(new Date()).toISOString()} deleteRules() - ${err}`);
        throw err;
    }
}

Add New Filtering Rules

The code below adds an array of filtering rules to a given account.  Example array with a single rule below.  That rule targets tweets from the President and filters out any retweets or quotes.
const RULES = [{'value' : 'from:realDonaldTrump -is:retweet -is:quote'}];
async function setRules(token, rules, url) {
    console.debug(`${(new Date()).toISOString()} setRules()`);
 
    const body = {'add' : rules};
    try {
        const response = await fetch(url, {
            method: 'POST',
            headers: {
                'Content-Type'  : 'application/json',
                'Authorization' : 'Bearer ' + token
            },
            body: JSON.stringify(body)
        });
        if (response.ok) {
            const json = await response.json();
            return json.meta.summary.created;
        }
        else {
            throw new Error(`response status: ${response.status} ${response.statusText}`);    
        }
    }
    catch (err) {
        console.error(`${(new Date()).toISOString()} setRules() - ${err}`);
        throw err;
    }
}

Stream Tweets

Below is an excerpt of the main streaming logic.  A link to the full source repo is at the bottom of this blog.  This excerpt follows happy path of a HTTP 200 response and starts up a theoretically never-ending reader stream to Twitter with tweets that match the filter criteria built up previously.  Twitter sends heartbeats on this connection every 20 seconds.
                g_reader = response.body;
                g_reader.on('data', (chunk) => {
                    try {
                        const json = JSON.parse(chunk);
                        let text = json.data.text.replace(/\r?\n|\r|@|#/g, ' ');  //remove newlines, @ and # from tweet text
                        console.log(`${(new Date()).toISOString()} tweet: ${text}`);
                    }
                    catch (err) {
                        //heartbeat will generate a json parse error.  No action necessary; continue to read the stream.
                        console.debug(`${(new Date()).toISOString()} stream() - heartbeat received`);
                    } 
                    finally {
                        g_backoff = 0;
                        clearTimeout(abortTimer);
                        abortTimer = setTimeout(() => { controller.abort(); }, ABORT_TIMEOUT * 1000);
                    } 
                });

Results

2019-10-24T14:01:01.906Z filter()
2019-10-24T14:01:01.909Z getTwitterToken()
2019-10-24T14:01:02.166Z clearAllRules()
2019-10-24T14:01:02.166Z getRules()
2019-10-24T14:01:02.353Z deleteRules()
2019-10-24T14:01:02.604Z number of rules deleted: 1
2019-10-24T14:01:02.605Z setRules()
2019-10-24T14:01:02.902Z number of rules added: 1
2019-10-24T14:01:02.903Z stream()
2019-10-24T14:01:03.179Z stream() - 200 response
2019-10-24T14:01:23.177Z stream() - heartbeat received
...
2019-10-24T14:20:03.657Z stream() - heartbeat received
2019-10-24T14:20:12.959Z tweet: The Federal Reserve is derelict in its duties if it 
doesn’t lower the Rate and even, ideally, stimulate. Take a look around the World at our 
competitors. Germany and others are  actually GETTING PAID to borrow money. Fed was way too 
fast to raise, and way too slow to cut!
2019-10-24T14:20:23.660Z stream() - heartbeat received

Source

https://github.com/joeywhelan/twitterFilter

Copyright ©1993-2024 Joey E Whelan, All rights reserved.