Thursday, January 19, 2017

IoT Data Pipeline - Part 1: Device Set up


Summary

In this post I'll describe my beginning steps in building a IoT data pipeline.  The scenario is somewhat contrived, but it uses infrastructure I have combined with no-cost interfaces.  The IoT 'device' in this scenario is a Personal Weather Station (PWS) I have installed on my place.  It's made by Ambient Weather - Model WS-1400-IP.  I integrated that to a cloud storage service from Verizon with a bit of Node.js code.

IoT Data Pipeline - Part 1: Device Set up
IoT Data Pipeline - Part 2: Node/Java Crypto Interop
IoT Data Pipeline - Part 3: Kafka Integration

Device

The 'IoT' device I used was an existing WS-1400-IP PWS I installed last year.  The PWS has an outdoor unit with various sensors for temperature, wind speed/direction, precipitation, etc.  It's solar-powered.  There's an additional indoor unit with a wireless receiver, Ethernet port, and firmware to run a web stack.  The indoor unit communicates with the outdoor via wireless transmissions.  Sensor data is then output to an internal HTTP server that can be accessed via the Ethernet interface. Optionally, the firmware supports transmission of the data to the Wunderground network.

Below is a picture of the PWS, photo-bombed by my mules.


The firmware that was installed on my PWS left much to be desired as far as access to the raw data stream.  By default, the PWS can send an HTTP GET with the data as a query string to Wunderground.  Unfortunately, that destination address is hard-coded into the firmware - making it impossible to redirect it for my own use.  There are various folks out there that have hacked around that limitation either by issuing commands directly to the firmware or with IP routing redirects - examples here.  I ended up putting another firmware load on the unit, not necessarily supported for my model, that provided a web interface for directing output to somewhere other than Wunderground.

Implementation

Now with access to the PWS data stream, I was in a position to do something with it.  I decided to build a simple Node.js server for accepting data from the PWS.  The PWS sends its data in the clear as query string via HTTP Get.  That plaintext data includes your Wunderground username and password - which is less than desirable, but it is what it is.  

The Node server in this case simulates a device that can then communicate to Verizon's IoT infrastructure.  Verizon ThingSpace has a freely available (as of now) REST API for messaging from various IoT-enabled devices.  Verizon refers to those messages from a device as a 'dweet'.  Additionally, the Node server is sending a copy of the PWS data to Wunderground.  I actually like the Wunderground presentation and wanted to keep that functional while this server was in operation.  Finally, I wrote a simple HTTP client in Node to fetch dweets ThingSpace for testing.

Below is a figure depicting the test architecture I put together.

Node Server Code

Main Server

function handler(req, res) {
 wunderground(req.url, res);
 thingspace(req.url);
}

var httpServer = http.createServer(handler).listen(9080);
logger.debug('File: main.js, Listening on port 9080');
Lines 1-4: Super simple request handler.  Call two functions (to be discussed next) to send the data to Wunderground and ThingSpace.
Line 6: Instantiate HTTP server and listen on a non-reserved port.

Send Data to Wunderground

function wunderground(requrl, res) {
 var retVal = '';
 var options = {
  host : 'rtupdate.wunderground.com',
  port : '80',
  path : requrl,
  method : 'GET'
 };
 
 var fwdreq = http.request(options, function(fwdres) {
  fwdres.on('data', function(chunk) {
   retVal += chunk;
  });
  fwdres.on('end', function() {
   res.end(retVal);
   logger.debug('Exiting - File: main.js, Method: wunderground()', retVal.trim());
  });
 });
 fwdreq.on('error', function(err1) {
  logger.error('File: main.js, Method: wunderground(), Message err1: ', err1.message);
 });
 fwdreq.end(retVal);
}
Line 1:  I pass the original request URL that contains all the PWS data as a query string with the URL as a parameter.  The 'res' parameter is the Response object from the original request.  Successful transmissions to Wunderground result in a HTTP 200 status with 'success' in the response body.
Lines 3-8:  Set up an options object for the forwarding an HTTP Get request to Wunderground.
Lines 10-23:  HTTP request set up using core Node functionality.

Send Data to ThingSpace

function thingspace(requrl) {
 var query = url.parse(requrl, true).query;
 var wx_data = {'tempf' : query.tempf, 'humidity': query.humidity, 
   'winddir' : query.winddir, 'windspeedmph' : query.windspeedmph, 
   'rainin' : query.rainin, 'solarradiation' : query.solarradiation};
 var options = {
   host : 'thingspace.io',
   port : '443',
   path : '/dweet/for/' + properties.device,
   headers: {'Content-Type' : 'application/json'},
   method : 'POST'
 };
 
 var cipher = crypto.createCipher('aes256', properties.password);
 var cipherText = cipher.update(JSON.stringify(wx_data), 'utf8','hex') + cipher.final('hex');
 
 var retVal = '';
 var req = https.request(options, function(res) {
  res.on('data', function(chunk) {
   retVal += chunk;
  });
  res.on('end', function() {
   retVal = JSON.parse(retVal);
   logger.debug('Exiting - File: main.js, Method:thingspace()', retVal.this);
  });
 });

 req.on('error', function(err1) {
  logger.error('File: main.js, Method: thingspace(), Message err1: ', err1.message);
 });
 req.write(JSON.stringify({'mesg' : cipherText}));
 req.end();
}
Line 1:  Similar to the Wunderground function, I've passed the original HTTP Get URL from the PWS to this function.  It contains the PWS data in its query string.
Lines 2-5:  For this exercise, I parsed out a handful of the telemetry the PWS transmits.  There are many more.
Lines 6-12:  Setting up the options for a REST call to ThingSpace.  In this case, it's an HTTP POST, over HTTPS, with a JSON body.  The device name becomes part of the path for putting/getting data from ThingSpace.
Lines 14-15:  By default, anybody cause see anything on ThingSpace if they know the device name.  Here I scramble the PWS data with AES 256 encryption.  Someone can still the device transmitting data on ThingSpace, but its all hex garbage.
Lines 18-32:  Again, standard/core Node HTTP request functionality.  Line 24 -  The 'this' property of the JSON object returned by ThingSpace indicates success/failure of the request.  Line 31 - sends the previously encrypted data inside a JSON object to ThingSpace.

Sample Output

2017-1-19 15:38:42.425 - debug: Exiting - File: main.js, Method: wunderground() success
2017-1-19 15:38:42.652 - debug: Exiting - File: main.js, Method:thingspace() succeeded

ThingSpace Test Client - Get Latest Dweet

This is an example of another ThingSpace REST API call. This one returns the latest dweet published by a device.
function getDweet() {
 var options = {
   host : 'thingspace.io',
   port : '443',
   path : '/get/latest/dweet/for/' + properties.device,
   method : 'GET'
 };
 var retVal = '';
 var req = https.request(options, function(res) {
  res.on('data', function(chunk) {
   retVal += chunk;
  });
  res.on('end', function() {
   console.log(res.statusCode);
   var obj = JSON.parse(retVal);
   console.log(obj);
   console.log(obj.with[0].content);
   var decipher = crypto.createDecipher('aes256', properties.password);
   var plaintext = decipher.update(obj.with[0].content.mesg, 'hex', 'utf8') + decipher.final('utf8');
   console.log(plaintext);
  });
 });

 req.on('error', function(err1) {
  console.log(err1);
 });

 req.end();
}
Lines 2-7:  Options set up for an HTTP Get.  As discussed,  by default - all you need is a valid device name to read any device data on ThingSpace.  There is a 'lock' functionality available as well for a price.
Lines 9-12:  Standard Node HTTP request.  Lines 18-19 decrypt the data that was scrambled in the AES 256 encryption that happened during the HTTP Post.

Sample Output

200
{ this: 'succeeded',
  by: 'getting',
  the: 'dweets',
  with: 
   [ { thing: 'pwstest2017',
       created: '2017-01-19T22:59:46.642Z',
       content: [Object] } ] }
{ mesg: 'ad739f288341ac5ccb6c7b81b73b2bafda96e68ab7ebc17d9969a97d02670b1f28e9e82e0e08d5df813a991bbb1a32260d6825bf92deaef66c37bcf'}
{"tempf":"50.4","humidity":"22","winddir":"237","windspeedmph":"3.58","rainin":"0.00","solarradiation":"31.93"}

ThingSpace Test Client - Stream Dweets

The ThingSpace REST API also supports a subscription model for a device's dweets.  ThingSpace will send 'chunked' responses every time a device dweets to a subscriber.  
function streamDweet() {
 var options = {
   host : 'thingspace.io',
   port : '443',
   path : '/listen/for/dweets/from/' + properties.device,
   method : 'GET'
 };
 
 var req = https.request(options, function(res) {   
  res.on('data', function(chunk) {
   var str = JSON.parse(chunk.toString('utf8',3).trim());
   var obj = JSON.parse(str); 
   var decipher = crypto.createDecipher('aes256', properties.password);
   var plaintext = decipher.update(obj.content.mesg, 'hex', 'utf8') + decipher.final('utf8');
   console.log(plaintext); 
  });
  
  res.on('end', function() {
   console.log('end', res.statusCode);
  });
 });
 
 req.on('error', function(err1) {
  console.log('err', err1);
 });
 
 req.end();
}
Lines 9-16:  Standard Node request.  It was a little bit of a pain parsing the ThingSpace response.  In fact I needed two calls to JSON.parse() to get a legit JSON object out of the response.  Same decrypt steps in Lines 13, 14.

Sample Output

{"tempf":"48.0","humidity":"23","winddir":"199","windspeedmph":"4.03","rainin":"0.00","solarradiation":"26.55"}

Copyright ©1993-2024 Joey E Whelan, All rights reserved.