Sunday, February 26, 2017

IoT Data Pipeline - Part 3: Kafka Integration


Summary

In this post I'll scale up the architecture by simulating multiple IoT devices and then consolidate the resulting data stream into Apache Kafka.

IoT Data Pipeline - Part 2: Node/Java Crypto Interop
IoT Data Pipeline - Part 3:  Kafka Integration

Implementation

Diagram below depicting the overall set up I'm trying to achieve here.  I wanted to scale up the number of devices generating data streams to the IoT cloud and then consolidate them into a scalable streaming architecture - which is Kafka.  

As mentioned in the first post, this exercise is somewhat contrived.  I scaled up data generators by creating device simulators.  On the receiving side of the IoT cloud, multiple REST clients poll for data (1 per device) and then write into Kafka.

Lower level diagram of the code below:


Node.js - Device Server Code Modifications


var devices = [];
for (var i=0; i < properties.deviceCount; i++) {
 devices.push('pws_' + i);
}

function rand(min, max) {
 return Math.random() * (max - min) + min + 1;
}
function randomize(data) {
 for (var prop in data){
  if (data.hasOwnProperty(prop)) {
   rn = rand(-.5,.5);
   data[prop] = (data[prop] * rn).toFixed(2);
  }
 }
 return data;
}

function thingspace(requrl) {
 var query = url.parse(requrl, true).query;
 var wx_data = {'windspeedmph' : query.windspeedmph, 'solarradiation' : query.solarradiation};
 
 var options = {
   host : 'thingspace.io',
   port : '443',
   headers: {'Content-Type' : 'application/json'},
   method : 'POST'
 };
 
 async.map(devices, function(device, callback){
  var iv = crypto.randomBytes(16);
  var cipher = crypto.createCipheriv('aes-256-cbc', key, iv);
  wx_data = randomize(wx_data);
  wx_data.id = device;
  var cipherText = cipher.update(JSON.stringify(wx_data), 'utf8','hex') + cipher.final('hex');
  options.path = '/dweet/for/' + device;
  
  var retVal = '';
  var req = https.request(options, function(res) {
   res.on('data', function(chunk) {
    retVal += chunk;
   });
   res.on('end', function() {
    retVal = JSON.parse(retVal);
    callback(null, retVal.this);
   });
  });

  req.on('error', function(err1) {
   logger.error('File: main.js, Method: thingspace(), Message err1: ', err1.message);
   callback(err1, null);
  });
  req.write(JSON.stringify({'iv': iv.toString('base64'), 'mesg' : cipherText}));
  req.end(); 
 }, 
 function (err, results){
  if (err) {
   logger.error('File: main.js, Method: thingspace(), Message err: ', err.message);
  }
  else {
   logger.debug('Exiting - File: main.js, Method: thingspace()');
  }
 });
}


function handler(req, res) {
 wunderground(req.url, res);
 thingspace(req.url);
}

var httpServer = http.createServer(handler).listen(9080);
logger.debug('File: main.js, Listening on port 9080');
Lines 1-4:  Set up an array of simulated IoT devices.
Lines 6-17:  Couple functions to generate random data metrics to the simulated devices.  Data from the one real device is fed into these functions and random data for the simulators is generated from that.
Line 21:  The two metrics being sent from the devices are solar radiation and wind speed.
Lines 30-47:  This is where the device simulation happens.  Using the Async module, send out multiple HTTP REST requests (1 per simulated device) to the IoT cloud.

Java - Custom JSON Serializer/Deserializer for Kafka

Kafka operates with byte arrays.  It has a built-in serializer for Strings.  If you want pass anything other than that, you have to roll your own.

public class JSONSerializer implements Serializer {
 @Override
  public void close() {
  
  }
 
 @Override
  public void configure(Map arg0, boolean arg1) {
  
  }
 
 @Override
 public byte[] serialize(String s, JSONObject json) {
  byte[] byteArr = null;
  
     try {
      ByteArrayOutputStream out_byte = new ByteArrayOutputStream();
      ObjectOutputStream out_object = new ObjectOutputStream(out_byte);
      out_object.writeObject(json);
      byteArr = out_byte.toByteArray();
     }
     catch (Exception e) {
      e.printStackTrace();
     }
     
     return byteArr;
 }
}
Lines 17-20:  Convert the JSON object into a byte array.

public class JSONDeserializer implements Deserializer {
 @Override
  public void close() {
  
  }
 
 @Override
  public void configure(Map arg0, boolean arg1) {
  
  }
 
 @Override
 public JSONObject deserialize(String s, byte[] bytes) {
  JSONObject json = null;
  
  try {
   ByteArrayInputStream bIstream= new ByteArrayInputStream(bytes);
   ObjectInputStream in = new ObjectInputStream(bIstream);
   json = (JSONObject) in.readObject();
  }
  catch (Exception e) {
   e.printStackTrace();
  }
  return json;
 }
}
Lines 17-19:  Convert a byte array into a JSON object.

Java - REST Clients/Kafka Producer


public class DweetProducer implements Runnable {

 private static final Logger logger = LogManager.getLogger(DweetProducer.class);
 private static final String propFile = "pwskafka.properties";
 private static final String dweetUri = "https://thingspace.io";
 private static final String streamPath = "/listen/for/dweets/from";
 private static byte[] salt;
 private static String password;
 private static String topic;
 private static String bootstrap;
 private static Producer producer;
 private String device;
 
 static {
  try {
   FileInputStream fis = new FileInputStream(propFile);
   Properties props = new Properties();
   props.load(fis);
   fis.close();
   salt = props.getProperty("salt").getBytes();
   password = props.getProperty("password");
   bootstrap = props.getProperty("bootstrap");
   topic = props.getProperty("topic");
   props.clear();
      props.put("bootstrap.servers", DweetProducer.bootstrap); 
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "pwskafka.JSONSerializer");
      producer = new KafkaProducer(props);
  }
  catch (Exception e) {
   logger.error(e);
   System.exit(1);
  }
 }
 
 public DweetProducer(String device) {
  this.device = device;
  logger.debug("Producer for " + device  + " starting.");
 }
  
 private JSONObject decryptDweet(byte[] iv, String cipherText) throws GeneralSecurityException, 
 UnsupportedEncodingException, ParseException  {
  KeySpec spec = new PBEKeySpec(DweetProducer.password.toCharArray(), DweetProducer.salt, 100000, 256);
  SecretKeyFactory f = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
  SecretKey tmp = f.generateSecret(spec);
  SecretKey key = new SecretKeySpec(tmp.getEncoded(), "AES");
  
  Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
        cipher.init(Cipher.DECRYPT_MODE, key,  new IvParameterSpec(iv));
        String val = new String(cipher.doFinal(DatatypeConverter.parseHexBinary(cipherText)), "UTF-8");
        JSONParser parser = new JSONParser();
        return (JSONObject) parser.parse(val); 
 }
 
 public void run() { 
  JSONObject dweet = null;
  Client client = ClientBuilder.newClient();
  WebTarget target = client.target(dweetUri).path(streamPath).path(this.device);
  Response response =  target.request(MediaType.APPLICATION_JSON).get(Response.class);
  ChunkedInput chunkedInput =
          response.readEntity(new GenericType>() {});
  String chunk;
  JSONParser parser = new JSONParser();
  
  while ((chunk = chunkedInput.read()) != null) {
      if (!chunk.chars().allMatch( Character::isDigit )) {
       try {
        chunk = (String) parser.parse(chunk);
        JSONObject obj = (JSONObject) parser.parse(chunk);
        JSONObject content = (JSONObject) obj.get("content");
        byte[] iv = DatatypeConverter.parseBase64Binary((String) content.get("iv"));
     String cipherText = (String) content.get("mesg");
     dweet = decryptDweet(iv, cipherText);
     DweetProducer.producer.send(new ProducerRecord(DweetProducer.topic, this.device, dweet));
     logger.debug(dweet);
       }
       catch (Exception e) {
        logger.error(e);
       }
      }
  }
 }
 
 public static void main(String[] args) { 
  if (args != null && args.length == 2) {
   int startDeviceNum = Integer.parseInt(args[0]);
   int endDeviceNum = Integer.parseInt(args[1]);
   if (startDeviceNum > endDeviceNum) {
    logger.error("Usage:  arg 1: starting device number, arg 2: ending device number");
    System.exit(1);
   }
   ExecutorService pool;
   int cores = Runtime.getRuntime().availableProcessors();
   pool = Executors.newFixedThreadPool(cores);
   String device = null;
   
   for (int i = startDeviceNum; i <= endDeviceNum; i++){
    device = "pws_" + i;
    pool.execute(new DweetProducer(device));
   }
   pool.shutdown();
  }
  else {
   logger.error("Usage:  arg 1: starting device number, arg 2: ending device number");
   System.exit(1);
  }
 }

}
Line 1:  Setting up a multi-threaded class.
Line 12:  The Kafka Producer class is thread-safe, so we only need 1 instance that can be shared by all the threads.
Lines 14-34:  Load all the configuration variables from a properties file and create the Producer.  Line 27:  Tell the Producer how to serialize JSON objects using the customer serializer.
Lines 41-53:  AES decryption routine.  Same code as was discussed in Part 2.
Lines 55-82:  Thread code for HTTP clients.  Each client is doing HTTP chunked input using the IoT cloud's streaming interface.  This was discussed in Part 1.  Line 74:  Send the device data (dweet) to Kafka.
Lines 84-107:  Using the Java ExecutorService, set up a thread pool using all available cores.  Each thread will be executing a HTTP client with a common Kafka producer.

Test Kafka Consumer

public class DweetConsumer {
 
 public static void main(String[] args) throws Exception{
    String topicName = "pws_metrics";
    
    Properties props = new Properties();
    props.put("bootstrap.servers", "webserv3:9092"); 
    props.put("group.id", "group_01");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "pwskafka.JSONDeserializer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    
    KafkaConsumer consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topicName));
    
    while (true) {
     ConsumerRecords records = consumer.poll(100);
     for (ConsumerRecord record : records) {
      System.out.println("Received: " + record.key() + " " + record.value());
     }
    }
   
  }
}
Line 10:  Tell the Consumer how deserialize the incoming byte array back in to a JSON object.
Lines 15-16:  Set up a Kafka Consumer and subscribe to the IoT topic the Producer is creating records for.
Lines 18-23:  Poll for incoming records and print them to console.

No comments:

Post a Comment