Thursday, June 8, 2017

Secure DNS Conversion


Summary


I'll be discussing a little distraction I was working on lately in this article.  There are a large number of articles out there about the inherent insecurity of DNS.  Your requests/responses are clear text that run (predominantly) over UDP.  Every ISP, DNS provider, etc basically has goods on whatever Internet sites you may visit.

In a few lines of code (~200), I show one way to keep your DNS traffic private.  Part 1 hinges on Google's HTTPS DNS services.  I convert the UDP DNS request into a HTTPS request to Google for an encrypted DNS request/response system.  To raise this to another level of paranoia (for those that don't even trust Google with their DNS traffic), I show how to tunnel those requests to Google through Tor.

I made little attempt to faithfully follow the DNS RFC in this implementation.  Just wasn't necessary for my limited use-case.

Implementation - Part 1

Below is a diagram of the overall architecture I was looking for.  I use a local caching DNS server for my own network.  I reconfigure that DNS server to send all of its requests to my DNS converter that translates DNS UDP requests into HTTP calls to Google's DNS over HTTPS service.


Python Implementation


cfgParser = configparser.ConfigParser()
cfgParser.optionxform = str
cfgParser.read('sdns.cfg')  
host = cfgParser.get('ConfigData', 'host')
port = int(cfgParser.get('ConfigData', 'port'))
server = socketserver.UDPServer((host, port), DNSHandler)
server.serve_forever()
Lines 1-5:  Read the host IP address and Port for new DNS converter/forwarder.  DNS runs on port 53 typically.
Lines 8-9:  Start a UDP server on that IP address and port.  DNSHandler is a class that contains the code to process the DNS request and generate the response.
class DNSHandler(socketserver.BaseRequestHandler):
    
    def handle(self):
        data = self.request[0]
        socket = self.request[1]
        response = self.__createResponse(data)
        socket.sendto(response, self.client_address)
Lines 3-7:  Main code body.  Accepts the request, processes it, and send out the response.

    def __createResponse(self, data):
        tid = data[0:2] #transaction id
        opcode = data[2] & 0b01111000   #data[2] is the flags field. bits 2-5 is the opcode  
        name, queryType, question = self.__processQuestion(data[12:]) 
        
        if opcode == 0 and queryType == '1':  #RFC departure.  Only processing standard queries (0) and 'A' query types.  
            flags, numbers, records = self.__getRecords(name)
            response = tid + flags + numbers + question + records
        else:
            #qr (response), recursion desired, recursion avail bits set.  set the rcode to 'not implemented'
            flags = ((0b100000011000 << 4) | 4).to_bytes(2, byteorder='big') 
            numbers = (0).to_bytes(8, byteorder='big')
            response = tid + flags + numbers
 
        return response
Lines 2-3:  Parse out the DNS transaction ID and flags field from the request.
Line 4:  Send the rest of the DNS request to another function for parsing out the question.
Lines 6-8:  If it's a DNS request I chose to implement, standard 'A' type,  process it.
Lines 10-13:  If it's not a request I implemented, return a DNS error.

    def __processQuestion(self, quesData):
        i = 0
        name = ''
        
        while True:
            count = int.from_bytes(quesData[i:i+1], byteorder='big')
            i = i+1
            if count == 0:
                break
            else:
                name = name + str(quesData[i:i+count],'utf-8') + '.'
                i = i + count
            
        name = name[:-1]
        queryType = str(int.from_bytes(quesData[i:i+2], byteorder='big'))
        question = quesData[0:i+4]
   
        return name, queryType, question
Lines 5-12:  Loop thru the labels in the DNS question (it has a specific format, see RFC).
Lines 14-16: Set up 3 return variables with the domain name in question, query type, and the entire question byte array (for the response to be sent later).

    def __getRecords(self, name): 
        payload = {'name' : name, 'type' : '1'}
        data = requests.get(GOOGLE_DNS, params=payload).json()
   
        flags = self.__getFlags(data)
        records = bytes(0)
        count = 0
        if 'Answer' in data:
            for answer in data['Answer']:
                if answer['type'] == 1:
                    count = count + 1
                    name = (0xc00c).to_bytes(2, byteorder='big') #RFC departure.  Hard-coded offset to domain name in initial question.
                    rectype = (1).to_bytes(2, byteorder='big')
                    classtype = (1).to_bytes(2, byteorder='big')
                    ttl = answer['TTL'].to_bytes(4, byteorder='big')
                    length = (4).to_bytes(2, byteorder='big') #4 byte IP addresses only
                    quad = list(map(int, answer['data'].split('.')))
                    res = bytes(0)
                    for i in quad:
                        res = res + i.to_bytes(1, byteorder='big')
                    records = records + name + rectype + classtype + ttl + length + res
        
        nques = (1).to_bytes(2, byteorder='big') #hard coded to 1
        nans = (count).to_bytes(2, byteorder='big')
        nath = (0).to_bytes(2, byteorder='big')    #hard coded to 0
        nadd = (0).to_bytes(2, byteorder='big') #hard coded to 0
        numbers = nques + nans + nath + nadd
     
        return flags, numbers, records
Lines 2-3:  Send the DNS request to Google's HTTPS service.
Line 5:  Construct the flags field for the response based on the request flags field.
Lines 8-21:  Construct the Return Records fields for the response from the JSON response from Google's DNS/HTTPS service.
Lines 23-27:  Construct the number of records field for the response.
    def __getFlags(self, data):
        flags = 0b100000 #qr=1, opcode=0000, aa=0
        flags = (flags << 1) | data['TC'] #set tc bit
        flags = (flags << 1) | data['RD'] #set rd bit
        flags = (flags << 1) | data['RA'] #set ra bit
        flags = flags << 1 #One zero
        flags = (flags << 1) | data['AD'] #set ad bit
        flags = (flags << 1) | data['CD'] #set cd bit
        flags = ((flags << 4) | data['Status']).to_bytes(2, byteorder='big') 
 
        return flags
Lines 2-9:  Construct the flags field of the DNS response based on the JSON response from Google (and some hard-coded values).

Implementation - Part 2

Now to prevent even Google to track your DNS activity, we can send all this HTTPS traffic on a world-wide trip through the Tor network.  You need to set up a local Tor controller instance and then add the lines of code below to direct the HTTPS request through that network.  Here's an excellent explanation on that.


Python Implementation


PROXIES = {'http':  'socks5://127.0.0.1:9050',
           'https': 'socks5://127.0.0.1:9050'} 

data = requests.get(GOOGLE_DNS, params=payload, proxies=PROXIES).json()
Lines 1-2:  Tor uses port 9050 as its local SOCKS port.
Line 4: Simply modify the HTTPS Get to Google (from __getRecords()) with these proxies.
def renew():
    with Controller.from_port(port = 9051) as controller:
        controller.authenticate(password="test")
        controller.signal(Signal.NEWNYM)

  
scheduler = BackgroundScheduler()
scheduler.add_job(renew, 'interval', hours=1)
scheduler.start()
Lines 1-4:  For some extra security, signal your Tor controller to change its IP address periodically.
Lines 7-9:  Set that change interval with a scheduler.

Summary

This was a fairly simplistic DNS server implementation in Python.  I didn't attempt to implement the full the RFC as it's not necessary for my use case.  You get secure DNS traffic from this but at a price of latency.  The HTTPS conversion and Tor overhead equates to a factor of 10 increase in DNS latency (vast majority of that is attributable to Tor).  I don't find it noticeable though for my use.

Full source code here.

Sunday, February 26, 2017

IoT Data Pipeline - Part 3: Kafka Integration


Summary

In this post I'll scale up the architecture by simulating multiple IoT devices and then consolidate the resulting data stream into Apache Kafka.

IoT Data Pipeline - Part 2: Node/Java Crypto Interop
IoT Data Pipeline - Part 3:  Kafka Integration

Implementation

Diagram below depicting the overall set up I'm trying to achieve here.  I wanted to scale up the number of devices generating data streams to the IoT cloud and then consolidate them into a scalable streaming architecture - which is Kafka.  

As mentioned in the first post, this exercise is somewhat contrived.  I scaled up data generators by creating device simulators.  On the receiving side of the IoT cloud, multiple REST clients poll for data (1 per device) and then write into Kafka.

Lower level diagram of the code below:


Node.js - Device Server Code Modifications


var devices = [];
for (var i=0; i < properties.deviceCount; i++) {
 devices.push('pws_' + i);
}

function rand(min, max) {
 return Math.random() * (max - min) + min + 1;
}
function randomize(data) {
 for (var prop in data){
  if (data.hasOwnProperty(prop)) {
   rn = rand(-.5,.5);
   data[prop] = (data[prop] * rn).toFixed(2);
  }
 }
 return data;
}

function thingspace(requrl) {
 var query = url.parse(requrl, true).query;
 var wx_data = {'windspeedmph' : query.windspeedmph, 'solarradiation' : query.solarradiation};
 
 var options = {
   host : 'thingspace.io',
   port : '443',
   headers: {'Content-Type' : 'application/json'},
   method : 'POST'
 };
 
 async.map(devices, function(device, callback){
  var iv = crypto.randomBytes(16);
  var cipher = crypto.createCipheriv('aes-256-cbc', key, iv);
  wx_data = randomize(wx_data);
  wx_data.id = device;
  var cipherText = cipher.update(JSON.stringify(wx_data), 'utf8','hex') + cipher.final('hex');
  options.path = '/dweet/for/' + device;
  
  var retVal = '';
  var req = https.request(options, function(res) {
   res.on('data', function(chunk) {
    retVal += chunk;
   });
   res.on('end', function() {
    retVal = JSON.parse(retVal);
    callback(null, retVal.this);
   });
  });

  req.on('error', function(err1) {
   logger.error('File: main.js, Method: thingspace(), Message err1: ', err1.message);
   callback(err1, null);
  });
  req.write(JSON.stringify({'iv': iv.toString('base64'), 'mesg' : cipherText}));
  req.end(); 
 }, 
 function (err, results){
  if (err) {
   logger.error('File: main.js, Method: thingspace(), Message err: ', err.message);
  }
  else {
   logger.debug('Exiting - File: main.js, Method: thingspace()');
  }
 });
}


function handler(req, res) {
 wunderground(req.url, res);
 thingspace(req.url);
}

var httpServer = http.createServer(handler).listen(9080);
logger.debug('File: main.js, Listening on port 9080');
Lines 1-4:  Set up an array of simulated IoT devices.
Lines 6-17:  Couple functions to generate random data metrics to the simulated devices.  Data from the one real device is fed into these functions and random data for the simulators is generated from that.
Line 21:  The two metrics being sent from the devices are solar radiation and wind speed.
Lines 30-47:  This is where the device simulation happens.  Using the Async module, send out multiple HTTP REST requests (1 per simulated device) to the IoT cloud.

Java - Custom JSON Serializer/Deserializer for Kafka

Kafka operates with byte arrays.  It has a built-in serializer for Strings.  If you want pass anything other than that, you have to roll your own.

public class JSONSerializer implements Serializer {
 @Override
  public void close() {
  
  }
 
 @Override
  public void configure(Map arg0, boolean arg1) {
  
  }
 
 @Override
 public byte[] serialize(String s, JSONObject json) {
  byte[] byteArr = null;
  
     try {
      ByteArrayOutputStream out_byte = new ByteArrayOutputStream();
      ObjectOutputStream out_object = new ObjectOutputStream(out_byte);
      out_object.writeObject(json);
      byteArr = out_byte.toByteArray();
     }
     catch (Exception e) {
      e.printStackTrace();
     }
     
     return byteArr;
 }
}
Lines 17-20:  Convert the JSON object into a byte array.

public class JSONDeserializer implements Deserializer {
 @Override
  public void close() {
  
  }
 
 @Override
  public void configure(Map arg0, boolean arg1) {
  
  }
 
 @Override
 public JSONObject deserialize(String s, byte[] bytes) {
  JSONObject json = null;
  
  try {
   ByteArrayInputStream bIstream= new ByteArrayInputStream(bytes);
   ObjectInputStream in = new ObjectInputStream(bIstream);
   json = (JSONObject) in.readObject();
  }
  catch (Exception e) {
   e.printStackTrace();
  }
  return json;
 }
}
Lines 17-19:  Convert a byte array into a JSON object.

Java - REST Clients/Kafka Producer


public class DweetProducer implements Runnable {

 private static final Logger logger = LogManager.getLogger(DweetProducer.class);
 private static final String propFile = "pwskafka.properties";
 private static final String dweetUri = "https://thingspace.io";
 private static final String streamPath = "/listen/for/dweets/from";
 private static byte[] salt;
 private static String password;
 private static String topic;
 private static String bootstrap;
 private static Producer producer;
 private String device;
 
 static {
  try {
   FileInputStream fis = new FileInputStream(propFile);
   Properties props = new Properties();
   props.load(fis);
   fis.close();
   salt = props.getProperty("salt").getBytes();
   password = props.getProperty("password");
   bootstrap = props.getProperty("bootstrap");
   topic = props.getProperty("topic");
   props.clear();
      props.put("bootstrap.servers", DweetProducer.bootstrap); 
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "pwskafka.JSONSerializer");
      producer = new KafkaProducer(props);
  }
  catch (Exception e) {
   logger.error(e);
   System.exit(1);
  }
 }
 
 public DweetProducer(String device) {
  this.device = device;
  logger.debug("Producer for " + device  + " starting.");
 }
  
 private JSONObject decryptDweet(byte[] iv, String cipherText) throws GeneralSecurityException, 
 UnsupportedEncodingException, ParseException  {
  KeySpec spec = new PBEKeySpec(DweetProducer.password.toCharArray(), DweetProducer.salt, 100000, 256);
  SecretKeyFactory f = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
  SecretKey tmp = f.generateSecret(spec);
  SecretKey key = new SecretKeySpec(tmp.getEncoded(), "AES");
  
  Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
        cipher.init(Cipher.DECRYPT_MODE, key,  new IvParameterSpec(iv));
        String val = new String(cipher.doFinal(DatatypeConverter.parseHexBinary(cipherText)), "UTF-8");
        JSONParser parser = new JSONParser();
        return (JSONObject) parser.parse(val); 
 }
 
 public void run() { 
  JSONObject dweet = null;
  Client client = ClientBuilder.newClient();
  WebTarget target = client.target(dweetUri).path(streamPath).path(this.device);
  Response response =  target.request(MediaType.APPLICATION_JSON).get(Response.class);
  ChunkedInput chunkedInput =
          response.readEntity(new GenericType>() {});
  String chunk;
  JSONParser parser = new JSONParser();
  
  while ((chunk = chunkedInput.read()) != null) {
      if (!chunk.chars().allMatch( Character::isDigit )) {
       try {
        chunk = (String) parser.parse(chunk);
        JSONObject obj = (JSONObject) parser.parse(chunk);
        JSONObject content = (JSONObject) obj.get("content");
        byte[] iv = DatatypeConverter.parseBase64Binary((String) content.get("iv"));
     String cipherText = (String) content.get("mesg");
     dweet = decryptDweet(iv, cipherText);
     DweetProducer.producer.send(new ProducerRecord(DweetProducer.topic, this.device, dweet));
     logger.debug(dweet);
       }
       catch (Exception e) {
        logger.error(e);
       }
      }
  }
 }
 
 public static void main(String[] args) { 
  if (args != null && args.length == 2) {
   int startDeviceNum = Integer.parseInt(args[0]);
   int endDeviceNum = Integer.parseInt(args[1]);
   if (startDeviceNum > endDeviceNum) {
    logger.error("Usage:  arg 1: starting device number, arg 2: ending device number");
    System.exit(1);
   }
   ExecutorService pool;
   int cores = Runtime.getRuntime().availableProcessors();
   pool = Executors.newFixedThreadPool(cores);
   String device = null;
   
   for (int i = startDeviceNum; i <= endDeviceNum; i++){
    device = "pws_" + i;
    pool.execute(new DweetProducer(device));
   }
   pool.shutdown();
  }
  else {
   logger.error("Usage:  arg 1: starting device number, arg 2: ending device number");
   System.exit(1);
  }
 }

}
Line 1:  Setting up a multi-threaded class.
Line 12:  The Kafka Producer class is thread-safe, so we only need 1 instance that can be shared by all the threads.
Lines 14-34:  Load all the configuration variables from a properties file and create the Producer.  Line 27:  Tell the Producer how to serialize JSON objects using the customer serializer.
Lines 41-53:  AES decryption routine.  Same code as was discussed in Part 2.
Lines 55-82:  Thread code for HTTP clients.  Each client is doing HTTP chunked input using the IoT cloud's streaming interface.  This was discussed in Part 1.  Line 74:  Send the device data (dweet) to Kafka.
Lines 84-107:  Using the Java ExecutorService, set up a thread pool using all available cores.  Each thread will be executing a HTTP client with a common Kafka producer.

Test Kafka Consumer

public class DweetConsumer {
 
 public static void main(String[] args) throws Exception{
    String topicName = "pws_metrics";
    
    Properties props = new Properties();
    props.put("bootstrap.servers", "webserv3:9092"); 
    props.put("group.id", "group_01");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "pwskafka.JSONDeserializer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    
    KafkaConsumer consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topicName));
    
    while (true) {
     ConsumerRecords records = consumer.poll(100);
     for (ConsumerRecord record : records) {
      System.out.println("Received: " + record.key() + " " + record.value());
     }
    }
   
  }
}
Line 10:  Tell the Consumer how deserialize the incoming byte array back in to a JSON object.
Lines 15-16:  Set up a Kafka Consumer and subscribe to the IoT topic the Producer is creating records for.
Lines 18-23:  Poll for incoming records and print them to console.

Sunday, January 29, 2017

IoT Data Pipeline - Part 2: Node/Java Crypto Interop


Summary

I wasn't planning on dedicating an entire post to encryption, but this task proved to be enough of a pain that I decided it was justified.  Maybe this will help others bypass that pain.

This project developed into a mixed-language solution:  Node on the server side (simulated device) and Java on the client/analytics side.  Getting Node-side encryption to work with the Java-side decryption is focus of this post.

IoT Data Pipeline - Part 2: Node/Java Crypto Interop
IoT Data Pipeline - Part 3: Kafka Integration

Implementation

Below is a diagram depicting the high-level steps I used to get a functioning encryption system between Node and Java.  There are likely other ways to go about this, but this is what worked for me.

Node Server Code

I've highlighted the crypto changes I made from original server code in the first blog post.
var key = crypto.pbkdf2Sync(properties.password, properties.salt, 100000, 32, 'sha256');

function thingspace(requrl) {
 var query = url.parse(requrl, true).query;
 var wx_data = {'tempf' : query.tempf, 'humidity': query.humidity, 
   'winddir' : query.winddir, 'windspeedmph' : query.windspeedmph, 
   'rainin' : query.rainin, 'solarradiation' : query.solarradiation};
 var options = {
   host : 'thingspace.io',
   port : '443',
   path : '/dweet/for/' + properties.device,
   headers: {'Content-Type' : 'application/json'},
   method : 'POST'
 };
 
 var iv = crypto.randomBytes(16);
 var cipher = crypto.createCipheriv('aes-256-cbc', key, iv);
 var cipherText = cipher.update(JSON.stringify(wx_data), 'utf8','hex') + cipher.final('hex');
 
 var retVal = '';
 var req = https.request(options, function(res) {
  res.on('data', function(chunk) {
   retVal += chunk;
  });
  res.on('end', function() {
   retVal = JSON.parse(retVal);
   logger.debug('Exiting - File: main.js, Method:thingspace()', retVal.this);
  });
 });

 req.on('error', function(err1) {
  logger.error('File: main.js, Method: thingspace(), Message err1: ', err1.message);
 });
 req.write(JSON.stringify({'iv': iv.toString('base64'), 'mesg' : cipherText}));
 req.end();
}
Line 1:  Create a secure 256-bit key with PBKDF2.
Line 16:  Generate a random 128-bit initialization vector.  Buffer of 16 bytes.
Line 17-18:  Encrypt the weather data (stringified JSON object) with AES 256 with UTF-8 input encoding and hex output encoding.
Line 34:  Encode the IV bytes into a Base64 string.  Then, put it and the encrypted message inside a JSON object.  Finally, stringify that object and send it out in the body of an HTTP POST.  There's no harm in sending the IV along with the encrypted message.

Java Client Code

Similar to the node server sample code, I've highlighted the crypto-interesting areas.

 public Object getDweet(String device) {
  
  Object dweet = null;
  Client client = ClientBuilder.newClient();
  WebTarget target = client.target(dweetUri).path(getPath).path(device);
  Response response =  target.request(MediaType.APPLICATION_JSON).get(Response.class);
  
  if(response.getStatus() == 200) {
   String message = response.readEntity(String.class);
   JSONParser parser = new JSONParser();
   try {
    JSONObject obj = (JSONObject) parser.parse(message);
    JSONObject item = (JSONObject) ((JSONArray)obj.get("with")).get(0);
    JSONObject content = (JSONObject)item.get("content");
    byte[] iv = DatatypeConverter.parseBase64Binary((String) content.get("iv"));
    String cipherText = (String) content.get("mesg");
    dweet = decryptDweet(iv, cipherText);
   }
   catch (Exception e) {
    e.printStackTrace();
   }
  }  
  return dweet;
 }

 private Object decryptDweet(byte[] iv, String cipherText) throws GeneralSecurityException, 
 UnsupportedEncodingException, ParseException  {
  KeySpec spec = new PBEKeySpec(this.password.toCharArray(), this.salt, 100000, 256);
  SecretKeyFactory f = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
  SecretKey tmp = f.generateSecret(spec);
  SecretKey key = new SecretKeySpec(tmp.getEncoded(), "AES");
  
  Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
                cipher.init(Cipher.DECRYPT_MODE, key,  new IvParameterSpec(iv));
                String val = new String(cipher.doFinal(DatatypeConverter.parseHexBinary(cipherText)), "UTF-8");
                JSONParser parser = new JSONParser();
                return parser.parse(val);
 }
Lines 12-14: Parse out the JSON object within the HTTP response body.  I'm using the json.simple toolkit for this.
Line 15:  Decode the Base64 string-encoded IV back to a byte array.
Lines 16-17:  Pass the encrypted text and and IV as input to a decrypt function.
Lines 28-31:  Generate the PBKDF2 secret key.
Lines 33-34:  Set up the AES decrypt cipher, with the IV.
Line 35:  Decrypt the message with hex input and UTF-8 output.
Lines 36-37:  Return the decrypted message as a JSON object.

Thursday, January 19, 2017

IoT Data Pipeline - Part 1: Device Set up


Summary

In this post I'll describe my beginning steps in building a IoT data pipeline.  The scenario is somewhat contrived, but it uses infrastructure I have combined with no-cost interfaces.  The IoT 'device' in this scenario is a Personal Weather Station (PWS) I have installed on my place.  It's made by Ambient Weather - Model WS-1400-IP.  I integrated that to a cloud storage service from Verizon with a bit of Node.js code.

IoT Data Pipeline - Part 1: Device Set up
IoT Data Pipeline - Part 2: Node/Java Crypto Interop
IoT Data Pipeline - Part 3: Kafka Integration

Device

The 'IoT' device I used was an existing WS-1400-IP PWS I installed last year.  The PWS has an outdoor unit with various sensors for temperature, wind speed/direction, precipitation, etc.  It's solar-powered.  There's an additional indoor unit with a wireless receiver, Ethernet port, and firmware to run a web stack.  The indoor unit communicates with the outdoor via wireless transmissions.  Sensor data is then output to an internal HTTP server that can be accessed via the Ethernet interface. Optionally, the firmware supports transmission of the data to the Wunderground network.

Below is a picture of the PWS, photo-bombed by my mules.


The firmware that was installed on my PWS left much to be desired as far as access to the raw data stream.  By default, the PWS can send an HTTP GET with the data as a query string to Wunderground.  Unfortunately, that destination address is hard-coded into the firmware - making it impossible to redirect it for my own use.  There are various folks out there that have hacked around that limitation either by issuing commands directly to the firmware or with IP routing redirects - examples here.  I ended up putting another firmware load on the unit, not necessarily supported for my model, that provided a web interface for directing output to somewhere other than Wunderground.

Implementation

Now with access to the PWS data stream, I was in a position to do something with it.  I decided to build a simple Node.js server for accepting data from the PWS.  The PWS sends its data in the clear as query string via HTTP Get.  That plaintext data includes your Wunderground username and password - which is less than desirable, but it is what it is.  

The Node server in this case simulates a device that can then communicate to Verizon's IoT infrastructure.  Verizon ThingSpace has a freely available (as of now) REST API for messaging from various IoT-enabled devices.  Verizon refers to those messages from a device as a 'dweet'.  Additionally, the Node server is sending a copy of the PWS data to Wunderground.  I actually like the Wunderground presentation and wanted to keep that functional while this server was in operation.  Finally, I wrote a simple HTTP client in Node to fetch dweets ThingSpace for testing.

Below is a figure depicting the test architecture I put together.

Node Server Code

Main Server

function handler(req, res) {
 wunderground(req.url, res);
 thingspace(req.url);
}

var httpServer = http.createServer(handler).listen(9080);
logger.debug('File: main.js, Listening on port 9080');
Lines 1-4: Super simple request handler.  Call two functions (to be discussed next) to send the data to Wunderground and ThingSpace.
Line 6: Instantiate HTTP server and listen on a non-reserved port.

Send Data to Wunderground

function wunderground(requrl, res) {
 var retVal = '';
 var options = {
  host : 'rtupdate.wunderground.com',
  port : '80',
  path : requrl,
  method : 'GET'
 };
 
 var fwdreq = http.request(options, function(fwdres) {
  fwdres.on('data', function(chunk) {
   retVal += chunk;
  });
  fwdres.on('end', function() {
   res.end(retVal);
   logger.debug('Exiting - File: main.js, Method: wunderground()', retVal.trim());
  });
 });
 fwdreq.on('error', function(err1) {
  logger.error('File: main.js, Method: wunderground(), Message err1: ', err1.message);
 });
 fwdreq.end(retVal);
}
Line 1:  I pass the original request URL that contains all the PWS data as a query string with the URL as a parameter.  The 'res' parameter is the Response object from the original request.  Successful transmissions to Wunderground result in a HTTP 200 status with 'success' in the response body.
Lines 3-8:  Set up an options object for the forwarding an HTTP Get request to Wunderground.
Lines 10-23:  HTTP request set up using core Node functionality.

Send Data to ThingSpace

function thingspace(requrl) {
 var query = url.parse(requrl, true).query;
 var wx_data = {'tempf' : query.tempf, 'humidity': query.humidity, 
   'winddir' : query.winddir, 'windspeedmph' : query.windspeedmph, 
   'rainin' : query.rainin, 'solarradiation' : query.solarradiation};
 var options = {
   host : 'thingspace.io',
   port : '443',
   path : '/dweet/for/' + properties.device,
   headers: {'Content-Type' : 'application/json'},
   method : 'POST'
 };
 
 var cipher = crypto.createCipher('aes256', properties.password);
 var cipherText = cipher.update(JSON.stringify(wx_data), 'utf8','hex') + cipher.final('hex');
 
 var retVal = '';
 var req = https.request(options, function(res) {
  res.on('data', function(chunk) {
   retVal += chunk;
  });
  res.on('end', function() {
   retVal = JSON.parse(retVal);
   logger.debug('Exiting - File: main.js, Method:thingspace()', retVal.this);
  });
 });

 req.on('error', function(err1) {
  logger.error('File: main.js, Method: thingspace(), Message err1: ', err1.message);
 });
 req.write(JSON.stringify({'mesg' : cipherText}));
 req.end();
}
Line 1:  Similar to the Wunderground function, I've passed the original HTTP Get URL from the PWS to this function.  It contains the PWS data in its query string.
Lines 2-5:  For this exercise, I parsed out a handful of the telemetry the PWS transmits.  There are many more.
Lines 6-12:  Setting up the options for a REST call to ThingSpace.  In this case, it's an HTTP POST, over HTTPS, with a JSON body.  The device name becomes part of the path for putting/getting data from ThingSpace.
Lines 14-15:  By default, anybody cause see anything on ThingSpace if they know the device name.  Here I scramble the PWS data with AES 256 encryption.  Someone can still the device transmitting data on ThingSpace, but its all hex garbage.
Lines 18-32:  Again, standard/core Node HTTP request functionality.  Line 24 -  The 'this' property of the JSON object returned by ThingSpace indicates success/failure of the request.  Line 31 - sends the previously encrypted data inside a JSON object to ThingSpace.

Sample Output

2017-1-19 15:38:42.425 - debug: Exiting - File: main.js, Method: wunderground() success
2017-1-19 15:38:42.652 - debug: Exiting - File: main.js, Method:thingspace() succeeded

ThingSpace Test Client - Get Latest Dweet

This is an example of another ThingSpace REST API call. This one returns the latest dweet published by a device.
function getDweet() {
 var options = {
   host : 'thingspace.io',
   port : '443',
   path : '/get/latest/dweet/for/' + properties.device,
   method : 'GET'
 };
 var retVal = '';
 var req = https.request(options, function(res) {
  res.on('data', function(chunk) {
   retVal += chunk;
  });
  res.on('end', function() {
   console.log(res.statusCode);
   var obj = JSON.parse(retVal);
   console.log(obj);
   console.log(obj.with[0].content);
   var decipher = crypto.createDecipher('aes256', properties.password);
   var plaintext = decipher.update(obj.with[0].content.mesg, 'hex', 'utf8') + decipher.final('utf8');
   console.log(plaintext);
  });
 });

 req.on('error', function(err1) {
  console.log(err1);
 });

 req.end();
}
Lines 2-7:  Options set up for an HTTP Get.  As discussed,  by default - all you need is a valid device name to read any device data on ThingSpace.  There is a 'lock' functionality available as well for a price.
Lines 9-12:  Standard Node HTTP request.  Lines 18-19 decrypt the data that was scrambled in the AES 256 encryption that happened during the HTTP Post.

Sample Output

200
{ this: 'succeeded',
  by: 'getting',
  the: 'dweets',
  with: 
   [ { thing: 'pwstest2017',
       created: '2017-01-19T22:59:46.642Z',
       content: [Object] } ] }
{ mesg: 'ad739f288341ac5ccb6c7b81b73b2bafda96e68ab7ebc17d9969a97d02670b1f28e9e82e0e08d5df813a991bbb1a32260d6825bf92deaef66c37bcf'}
{"tempf":"50.4","humidity":"22","winddir":"237","windspeedmph":"3.58","rainin":"0.00","solarradiation":"31.93"}

ThingSpace Test Client - Stream Dweets

The ThingSpace REST API also supports a subscription model for a device's dweets.  ThingSpace will send 'chunked' responses every time a device dweets to a subscriber.  
function streamDweet() {
 var options = {
   host : 'thingspace.io',
   port : '443',
   path : '/listen/for/dweets/from/' + properties.device,
   method : 'GET'
 };
 
 var req = https.request(options, function(res) {   
  res.on('data', function(chunk) {
   var str = JSON.parse(chunk.toString('utf8',3).trim());
   var obj = JSON.parse(str); 
   var decipher = crypto.createDecipher('aes256', properties.password);
   var plaintext = decipher.update(obj.content.mesg, 'hex', 'utf8') + decipher.final('utf8');
   console.log(plaintext); 
  });
  
  res.on('end', function() {
   console.log('end', res.statusCode);
  });
 });
 
 req.on('error', function(err1) {
  console.log('err', err1);
 });
 
 req.end();
}
Lines 9-16:  Standard Node request.  It was a little bit of a pain parsing the ThingSpace response.  In fact I needed two calls to JSON.parse() to get a legit JSON object out of the response.  Same decrypt steps in Lines 13, 14.

Sample Output

{"tempf":"48.0","humidity":"23","winddir":"199","windspeedmph":"4.03","rainin":"0.00","solarradiation":"26.55"}

Friday, January 6, 2017

UPnP With VLAN's

Summary

DLNA is a common standard for media devices today.  UPnP is the underlying discovery/control protocol for DLNA.  UPnP relies on IP multicast for the device discovery.  I'll discuss how to make UPnP functional in a segmented network.  Specifically, I'll show how to configure a Cisco multi-layer switch for multicast routing to VLANs.

Environment

Figure 1 below depicts the target environment for this discussion.  A UPnP media server is connected to a Layer 3 switch via EtherChannel.  Clients (media players) are located on separate VLANs on that same switch. 

Figure 1
The Discovery protocol (SSDP) of UPnP relies on IP Multicast.  The multicast address used is 239.255.255.250.

Implementation

Step 1:  Turn on multicast routing.
switch(config)#ip multicast-routing distributed

Step 2:  Configure PIM on the interface(s) to the Media Server.  In this case, a Port Channel interface.  Sparse-Dense or Dense mode will work.
switch(config)#int port-channel1
switch(config-if)#ip pim sparse-dense-mode

Step 3:  Configure the VLAN interfaces to pass IGMP traffic.
switch(config-if)#int vlan2
switch(config-if)#ip pim passive

Step 4:  Verify multicast routing is functional.  192.168.1.3 represents the UPnP media server.  192.168.2.113 is a media player client of that server.
switch#show ip mroute
IP Multicast Routing Table
Flags: D - Dense, S - Sparse, B - Bidir Group, s - SSM Group, C - Connected,
       L - Local, P - Pruned, R - RP-bit set, F - Register flag,
       T - SPT-bit set, J - Join SPT, M - MSDP created entry, E - Extranet,
       X - Proxy Join Timer Running, A - Candidate for MSDP Advertisement,
       U - URD, I - Received Source Specific Host Report, 
       Z - Multicast Tunnel, z - MDT-data group sender, 
       Y - Joined MDT-data group, y - Sending to MDT-data group, 
       V - RD & Vector, v - Vector
Outgoing interface flags: H - Hardware switched, A - Assert winner
 Timers: Uptime/Expires
 Interface state: Interface, Next-Hop or VCD, State/Mode

(*, 239.255.255.250), 01:59:36/stopped, RP 0.0.0.0, flags: DC
  Incoming interface: Null, RPF nbr 0.0.0.0
  Outgoing interface list:
    Vlan6, Forward/Sparse-Dense, 01:31:17/00:02:41

(192.168.2.113, 239.255.255.250), 00:01:11/00:01:52, flags: PT
  Incoming interface: Vlan2, RPF nbr 0.0.0.0
  Outgoing interface list: Null

(192.168.1.3, 239.255.255.250), 01:59:36/00:02:53, flags: T
  Incoming interface: Port-channel1, RPF nbr 0.0.0.0
  Outgoing interface list:
    Vlan6, Forward/Sparse-Dense, 01:31:17/00:02:41

(*, 224.0.1.40), 03:51:35/00:02:28, RP 0.0.0.0, flags: DCL
  Incoming interface: Null, RPF nbr 0.0.0.0
  Outgoing interface list:
    Port-channel1, Forward/Sparse-Dense, 01:59:37/stopped

Sunday, January 1, 2017

AI-Driven Investing with Lending Club - Part 4: Third Party Data Integration


Summary

This is an extension of the series I wrote on my experience implementing a machine learning engine for selecting loans for investment from Lending Club.  In this article, I'll discuss the modifications I made to support integration of data outside of the Lending Club data set using the ZIP code field on loans.  The results were less than satisfying.  I believe this was mostly due to the three-digit limitation Lending Club puts on their ZIP code fields.

Data Source Selection

The zip code field on Lending Club's loan data opens up the possibility to link a loan's origination location to a wide-array of data sets out there.  Government data sets are generally available at no cost.  Proprietary ones, from such vendors as Corelogic, have a price tag.  Below are some of the sets I looked at:

  • Housing.  Zillow has numerous free sets here.  I focused on their foreclosure data.  This is a CSV file containing foreclosure data from 1998 to present.
  • Income.  The US Census Bureau maintains US household income data here.  While they provide a wide-range of income-related data points, I focused on median household income.  This data is structured as Excel files, per year.
  • Unemployment.  The Bureau of Labor Statistics maintains unemployment data here.  I used their County/State set.  It's structured similarly to the Census Bureau set - Excel files, per year.

Data Wrangling

With prospective data sets chosen, the real work begins.  Lending Club's loan data set only provides the first 3 digits of the loan origination's zip code.  Naturally, none of the external data sets I looked at organized their data on three-digit ZIPs.




  • Zillow.  Below is a snippet of their data headers.  RegionName corresponds to a 5-digit ZIP

  • RegionID RegionName City State
    61639 10025 New York NY





  • Census Bureau.  Some of their data headers below.  Like most government data, they use FIPS codes instead of ZIP for locations.  The FIPS code consists of five digits, concatenating the the State and County codes.

  • State FIPS Code County FIPS Code Postal Code Name
    01 000 AL Alabama




  • Bureau of Labor Statistics.  Similarly to the Census Bureau, they use FIPS for the location identifier.  Some headers below:

  • State County
    FIPS FIPS
    Code Code County Name/State Abbreviation
    01 001 Autauga County, AL


    Converting FIPS to ZIP

    I had to create a translation from FIPS to ZIP codes to use the government data.  That requires yet another data set.  HUD has a crosswalk between FIPS and ZIPs here.  Below is the header and a row from that set:

    COUNTY ZIP RES_RATIO BUS_RATIO OTH_RATIO TOT_RATIO
    01001 36051 0.031420007 0.014141414 0.010940919 0.029940933


    FIPS to ZIP Lookup Table

    Below is what I did to transform the FIPS/ZIP crosswalk into a look-up table based on 3 digit ZIPs.  All code is in Python 3.
    frame = pd.read_excel(filename, usecols=[0,1,2], converters={0 : lambda x: str(x), 1 : lambda x: str(x)})
    frame.columns = ['fips','zip','ratio']
    frame = frame.iloc[frame.groupby(['fips']).apply(lambda x: x['ratio'].idxmax())]
    frame.drop('ratio', 1, inplace=True)
    frame.set_index(['fips'], inplace=True)
    frame['zip'] = frame['zip'].map(lambda x: str(x).zfill(5)[0:3])
    
    Line 1:  The HUD crosswalk is in Excel format.  Read columns 0-2 of that file.  Column 0: FIPS code.  Column 1:  ZIP code.  Column 2:  Percentage of that FIPS area that is represented by residential addresses for that ZIP code.  Convert the FIPS and ZIP values to strings.
    Line 2: Rename those 3 columns.
    Line 3:  Thus begins the compromises that must be made to accommodate Lending Club's policy of only showing 3 digits of ZIP code.  Here, I'm grouping all the 5 digit ZIP's within a FIPS and then selecting the one that represents the largest percentage of residential addresses in that FIPS area.
    Line 4:  Drop the 'ratio' column as it's unneeded now.
    Line 5:  Set the FIPS column as the index.
    Line 6:  Add leading zeros, if necessary, and chop the last 2 digits off to yield a 3-digit ZIP code.

    Below are a few rows of the resulting frame.
    fips zip
    1001 360
    1003 365
    1005 360


    Example 3rd Party Data Extraction:  BLS Unemployment Data Set

    With the FIPS/ZIP crosswalk in hand, I'm now in a position to manipulate the government data sets.  Below is what I did for the BLS unemployment data to create a lookup table based on 3-digit ZIP code and Year.
    for filename in os.listdir(data_dir):
        if filename.startswith('laucnty'):
            temp = pd.read_excel(os.path.join(data_dir,filename), skiprows=6, usecols=[1,2,4,9], \
                                converters={1 : lambda x: str(x), 2: lambda x: str(x)}, header=None) 
            temp.columns = ['stFIPS', 'coFIPS', 'year', 'rate']
            temp['rate'] = temp['rate'].apply(lambda x: pd.to_numeric(x, errors='coerce')) 
            temp['FIPS'] = temp['stFIPS'] + temp['coFIPS']
            year = int(temp.get_value(0,'year'))
            temp.drop(['stFIPS', 'coFIPS', 'year'], axis=1, inplace=True)
            temp.rename(columns={'rate' : str(year)}, inplace=True)
            temp = temp[['FIPS',str(year)]]
            temp.dropna(inplace=True)
            temp.set_index(['FIPS'],inplace=True)
            frame = pd.concat([frame,temp], join='inner', axis=1)
                    
    xwalk = getCrosswalk(CROSSWALK_DIR)       
    frame = pd.concat([frame,xwalk], join='inner', axis=1)
    frame = frame.groupby(['zip']).mean().round(4).reset_index()
    frame = frame.apply(pd.to_numeric)
    frame.set_index(['zip'],inplace=True)
    frame = frame.reindex(range(0,1000))
    frame.fillna(frame.mean().round(4),inplace=True)
    frame.index = frame.index.map(lambda x: str(x).zfill(3)[0:3])
    
    Line 1:  Iterate through the BLS files.  There's one file per year.
    Line 2:  The BLS files start with that prefix.
    Line 3:  These are Excel files.  Skip the 6 lines of whitespace/header.  Only use the State FIPS, County FIPS, Year, and Unemployment Rate columns.  Convert the FIPS columns to strings to maintain leading zeros.
    Line 5:  Name the columns accordingly.
    Line 6:  Force the rate column to float values.
    Line 7:  Create a new 'FIPS' column by concatenating the the State and County FIPS codes.
    Line 8:  Read the first row's value under the year and convert it to an int.
    Line 9:  Drop the unnecessary columns.
    Line 10-11:  Rename the rate column as a Year.  Reset the frame to just 2 columns:  FIPS and Year.
    Line 12:  Drop all NaN rows.
    Line 13:  Set the index to the FIPS column.
    Line 14:  Concat the resulting temp frame, representing 1 year's worth of unemployment to data, to a master frame by joining on the FIPS code.
    Line 16:  Create a FIPS/ZIP crosswalk frame as was discussed above.
    Line 17:  Concatenate the 3-digit ZIPs of that crosswalk to the master frame by joining on the FIPS code.
    Line 18:  Another compromise for 3 digit ZIPs.  Group the unemployment data on the resulting 3 digit ZIPs and set the groups to their average.
    Line 19:  Set all values to numeric - in particular, I want the ZIP to be turned into an int.
    Line 20:  Set the ZIP as the frame index.
    Line 21:  Fill in rows for the entire 3 digit range 0 - 999.  Rows (ZIPs) that were not present prior will be filled with NaN's.  Not every 3 digit number is a valid ZIP code; however, this step ensures all valid ZIP's that were missing in the BLS data set are now represented.
    Line 22:  Down the Year columns, fill in NaN's created in Line 21 with that column's (year's) average.  In this case, this a compromise for missing data along a ZIP code.
    Line 23:  Cast the 3-digit ZIP's back into strings and add leading zeros where necessary.

    A few rows from the resulting frame below:
    2007 2008 2009 2010 2011 2012 2013 2014 2015
    009 8.6625 9.35 12.2125 11.8875 11.45 9.9375 9.2625 9.225 8.325
    010 4.8 5.6 7.85 8.6 7.7 7.15 7.45 6.4 5.6
    011 4.8546 5.8157 9.089 9.4514 8.8045 8.0227 7.5131 6.3701 5.629


    Integrate with the Lending Club Historical Data

    In Part 1 of this series, I demonstrated how to pull the Lending Club data into a Pandas frame.  Below, I'm now integrating the unemployment data to that frame using a nifty function in Pandas called lookup().
    unFrame = getExternalData() 
    frame['unemp'] = unFrame.lookup(frame.zip_code, frame.issue_d)
    
    Line 1:  Get the unemployment data frame created in the step above.
    Line 2:  Create a new column in the Lending Club data frame by collating the loan ZIP code and issue date (converted to a 4-digit year value) against the unemployment frame.

    Analyzing Results

    The question that comes to mind now:  Is this external data meaningful, particularly in terms of predicting a loan's outcome.  In a uni-variate analysis , I have an independent variable (unemployment rates) that represents a continuous data set and the dependent variable (loan status) that is categorical - in fact, it's dichotomous (two values:  0 = default, 1 = paid).  This sort of scenario doesn't lend itself to quantitative methods of correlation analysis such as Pearson, Spearman, regression, etc.  Analysis by graphing does provide some insight, however.
    frame[['unemp', 'loan_status']].boxplot(by='loan_status')
    plt.show()
    
    Line 1:  Create a boxplot graph of the unemployment rate in the 3-digit ZIP of a loan vs its eventual disposition: 0 = default, 1 = paid.
    Line 2:  Display the graph.

    Figure 1
    As can be seen in the graph, the results are very uninspiring.  With medians that are roughly the same, there's really no differentiation of a loan's disposition based on unemployment rate.  The huge number of outliers is also troubling.  I saw similar results with foreclosure rates and median household incomes.

    Nevertheless, I do believe a factor such as unemployment rates in an area does have a correlation to Lending Club loan disposition.  My opinion is that this correlation is lost with those last 2 digits of ZIP code that Lending Club masks in their data sets.