Saturday, March 2, 2019

Event Sourcing with Redis Streams


Summary

Streams were an addition to the Redis 5.0 release.  Redis streams are roughly analogous to a log file: an append-only data structure.  Redis streams also have some producer/consumer capabilities that are roughly analogous to Kafka streams, with some important differences.  A full explanation of Redis streams here.

Event sourcing is one of those old topics that has been brought back to life again in a new context.  The new context is state persistence and messaging for microservice architectures.  A full explanation of event sourcing here.

This post is about my adventures at implementing event sourcing via Redis streams.  The example is simple and contrived - an account microservice that allows deposits and withdrawals.  I made a rough attempt at implementing this in a 'domain-driven design' model, but I didn't fully adhere to that model as I didn't care for the level of abstraction necessary.

Even though the scenario is simple, my overall impression of event sourcing is - it's hard.  It's hard to think about code in an event-driven manner and it's hard to implement it correctly in a distributed architecture.

Overall Architecture

Diagram below of the high-level architecture:  REST-based microservice that leverages Redis for the event store and MongoDB for event data aggregation.

Service Architecture

Microserver application arch below.  I implemented this with a Node.js HTTP server for the REST routes and an Account Service class that has an event store client and an array of account aggregates.

Projection Architecture

Architecture for the aggregating data from the event store below.  Again, Node.js implementation with a Redis client that realizes an event store functionality and a MongoDB client for aggregating data from events.

Event Store Architecture

A rough outline of what the event store implementation looks like.  I use Redis objects, in particular, the stream object to realize event sourcing functionality such as fetching an event, publishing an event, subscribing for events, etc.

Code Snippets

Creating an Account - accountService.js

The code below leverages a Redis Set object to ensure unique account ID's.  A JSON object is then created with the corresponding 'create' event and then 'published' to Redis.

 create(id) {
  return this._client.addId(id, 'accountId')
  .then(isUnique => {
   logger.debug(`AccountService.create - id:${id} - isUnique:${isUnique}`);
   if (isUnique) {
    const newEvent = {'id' : id, 'version': 0, 'type': 'create'};
    return this._client.publish('accountStream', newEvent);
   }
   else {
    return new Promise((resolve, reject) => {
     resolve(null);
    });
   }
  })
  .then (results => {
   if (results && results.length === 2) {  //results is an array.  first item is the new version number of the aggregate, 
             //second is the timestamp of the create event that was published
    logger.debug(`AccountService.create - id:${id} - results:${results}`);
    const version = results[0];
    const timestamp = results[1];
    const account = new Account(id, version, timestamp);
    this._accounts[id] = account;  //add the new account to the cache
    return {'id' : id};
   }
   else {
    throw new Error('Attempting to create an account id that already exists');
   }
  })
  .catch(err => {
   logger.error(`AccountService.create - id:${id} - ${err}`);
   throw err;
  });
 }

Making a deposit to an Account - accountService.js

The code below attempts to load the account from cache or replay of events if not in cache.  Business logic for a deposit is implemented in the account aggregate (account.js).  If the aggregate allows the deposit, then an event is published.  If the publishing of the event fails (concurrency conflict), the deposit is rolled back and an error is thrown.

 deposit(id, amount) {
  let account;
  
  return this._loadAccount(id) //attempt to load the account from cache and/or rehydrate from events
  .then(result => {
   account = result;
   account.deposit(amount);
   const newEvent = {'id' : id, 'version' : account.version, 'type': 'deposit', 'amount': amount};
   return this._client.publish('accountStream', newEvent);
  })
  .then(results => {
   logger.debug(`AccountService.deposit - id:${id}, amount:${amount} - results:${results}`);
   if (results) {
    account.version = results[0];
    account.timestamp = results[1];
    this._accounts[id] = account; //update the account cache
    return {'id': id, 'amount': amount};
   }
   else {
    account.withdraw(amount); //rolling back aggregate due to unsuccessful publishing of deposit event
    return null;
   }
  })
  .catch(err => {
   logger.error(`PlayerService.deposit - id:${id}, amount:${amount} - ${err}`);
   throw err;
  });
 }

Publishing an event - eventStoreClient.js

The code below implements concurrency control to the event store with Redis' 'watch' method.  Only one process will be permitted to publish an event with a given 'version' number.

 publish(streamName, event) { 
  logger.debug(`EventStoreClient.publish`);
  this._client.watch(event.id); //watch the id (account)
  return this._getAsync(event.id)  //fetch the current version from a Redis key with that ID 
  .then(result => { 
   if (!result || parseInt(result) === parseInt(event.version)) {  //key doesn't exist or versions match
    event.version += 1;  //increment version number prior to publishing the event
    logger.debug(`EventStoreClient.publish - streamName:${streamName}, event:${JSON.stringify(event)}\
     - result:${result}`);
    return new Promise((resolve, reject) => {
     this._client.multi()  //atomic transaction that increments the version and adds event to stream
     .incr(event.id)
     .xadd(streamName, '*', 'event', JSON.stringify(event))
     .exec((err, replies) => {
      if (err) {
       reject(err);
      }
      else {
       resolve(replies);
      }
     });
    });
   }
   else {  //covers the scenario where a concurrent access causes a mismatch with event version numbers
     //return null and then it's up to the client to make another publish attempt
    return new Promise((resolve, reject) => {
     resolve(null);
    });
   }
  })
  .catch(err => {
   logger.error(`EventStoreClient.publish - streamName:${streamName}, event:${event} - ${err}`);
   throw err;
  });
 }

Subscribing for events - eventStoreClient.js

The Redis streams implementation doesn't provide standard pub/sub functionality.  Subscriber behavior can be emulated though using Node.js event emitters coupled with Redis consumer groups.  A given consumer group is read below periodically via setInterval.  If new events are present, they're emitted via the emitter.  The 'subscriber' would implement the corresponding Node event handler for the emitter returned by this function.  This is precisely how the 'projector' is implemented for aggregating event data to a MongoDB database.

 subscribe(streamName, consumerName) { 
  let emitter;
  let groupName = streamName + 'Group';
  logger.debug(`EventStoreClient.subscribe - streamName:${streamName}, groupName:${groupName}, consumerName:${consumerName}`);
  
  if (this._emitters[streamName] && this._emitters[streamName][groupName]) {
   emitter = this._emitters[streamName][groupName];
  }
  else {
   this._client.xgroup('CREATE', streamName, groupName, '0', (err) => {});  //attempt to create Redis group
   emitter = new events.EventEmitter();
   let obj = setInterval(() => {
    this._readGroup(streamName, groupName, consumerName)
    .then(eventList => {
     if (eventList.length > 0) {
      emitter.emit('event', eventList);
     }
    })
    .catch(err => {
     logger.error(`EventStoreClient.subscribe - streamName:${streamName}, groupName:${groupName},\
     consumerName:${consumerName} - ${err}`);
     throw err;     
    });
   }, this._readInterval);
   if (!this._emitters[streamName]) {
    this._emitters[streamName] = {};
   }
   this._emitters[streamName][groupName] = emitter;
   this._intervals.push(obj); 
  }
  
  return emitter;
 }

Sample Results

Below the state of a Redis instance after the following actions:  Create account, Deposit $100, Withdraw $100
127.0.0.1:6379> xrange accountStream - +
1) 1) "1551312621884-0"
   2) 1) "event"
      2) "{\"id\":\"JohnDoe\",\"version\":1,\"type\":\"create\"}"
2) 1) "1551312827949-0"
   2) 1) "event"
      2) "{\"id\":\"JohnDoe\",\"version\":2,\"type\":\"deposit\",\"amount\":100}"
3) 1) "1551312847014-0"
   2) 1) "event"
      2) "{\"id\":\"JohnDoe\",\"version\":3,\"type\":\"withdraw\",\"amount\":100}"
Below is the corresponding state of a MongoDB collection being used for event data aggregation.
> db.accountCollection.find()
{ "_id" : "JohnDoe", "funds" : 0, "timestamps" : [ "1551312621884-0", "1551312827949-0", "1551312847014-0" ] }

Source

Full source w/comments here: https://github.com/joeywhelan/redisStreamEventStore

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

NICE inContact WorkItem Routing


Summary

I'll demonstrate the use of the NICE inContact Work Item API in this post.  That API provides the ability to bring non-native tasks into the routing engine.  The 'workitem' to be routed will be email from Google Gmail.  I'll cover the use of the Gmail API in addition to WorkItem.

This is a contrived example with, at best, demo-grade code.

Overall Architecture

Below is a depiction of the overall set up of this exercise.  Emails are pulled from Gmail via API, submitted to InContact's routing engine via API, then delivered to an agent target via a routing a strategy.


Application Architecture

Below is the application layout.  

Routing Logic

Picture of the routing strategy used for this demo.  This a very simple strategy that pulls a work item from queue and then pops a web page with the email contents to an agent.


Code Snippets


Main Procedure

Code below performs the OAuth2 handshake with Google, gets a list of messages currently in the Inbox, then packages up an array of Promises each of which pulls message contents from Gmail and submit them to the Work Item API.
function processMessage(id) {
 return gmail.getMessage(id)
 .then(msg => {
  return workItem.sendEmail(msg.id, msg.from, msg.payload);
 })
 .then(contactId => {
  return {msgId : id, contactId : contactId};
 });
}

gmail.authorize()
.then(_ => {
 return gmail.getMessageList();
})
.then((msgs) => {
 let p = [];
 msgs.forEach(msg => {
  p.push(processMessage(msg.id));
 });
 return Promise.all(p);
})
.then(results => {
 console.log(results);
})
.catch((err) => {
 console.log(err);
});

Gmail GetMessageList and GetMessage

getMessageList() pulls a list of Gmail IDs currently with the INBOX label.  getMessage() pulls the content of a message for a given Gmail ID.

 getMessageList() {
  console.log('getMessageList()');
  const auth = this.oAuth2Client;
  const gmail = google.gmail({version: 'v1', auth});
  return new Promise((resolve, reject) => {
   gmail.users.messages.list({userId: 'me', labelIds: ['INBOX']}, (err, res) => {
    if (err) {
     reject(err);
    }
    resolve(res);
   });
  })
  .then((res) => {
   return res.data.messages;
  })
  .catch((err) => {
   console.error('getMessageList() - ' + err.message);
   throw err;
  });
 }


 getMessage(id) {
  console.log('getMessage() - id: ' + id);
  const auth = this.oAuth2Client;
  const gmail = google.gmail({version: 'v1', auth});
  return new Promise((resolve, reject) => {
   gmail.users.messages.get({userId: 'me', 'id': id}, (err, res) => {
    if (err) {
     reject(err);
    }
    resolve(res);
   });
  })
  .then((res) => {
   const id = res.data.id;
   const arr = (res.data.payload.headers.find(o => o.name === 'From')).value
   .match(/([a-zA-Z0-9._-]+@[a-zA-Z0-9._-]+\.[a-zA-Z0-9._-]+)/gi);
   let from;
   if (arr) {
    from = arr[0];
   }
   else {
    from = '';
   }
   let payload = '';
   if (res.data.payload.body.data) {
    payload = atob(res.data.payload.body.data);
   }
   return {id: id, from: from, payload: payload};
  })
  .catch((err) => {
   console.error('getMessage() - ' + err.message);
   throw err;
  });
 }


WorkItem API Call

This consists of a simple POST with the workItem params in the body.

 postWorkItem(workItemURL, token, id, from, payload, type) {
  console.log('postWorkItem() - url: ' + workItemURL + ' from: ' + from);
  const body = {
    'pointOfContact': this.poc,
    'workItemId': id,
    'workItemPayload': payload,
    'workItemType': type,
    'from': from
  };
 
  return fetch(workItemURL, {
   method: 'POST',
   body: JSON.stringify(body),
   headers: {
    'Content-Type' : 'application/json', 
    'Authorization' : 'bearer ' + token
   },
   cache: 'no-store',
   mode: 'cors'
  })
  .then(response => {
   if (response.ok) {
    return response.json();
   }
   else {
    const msg = 'response status: ' + response.status;
    throw new Error(msg);
   }
  })
  .then(json => {
    return json.contactId;
  })
  .catch(err => {
   console.error('postWorkItem() - ' + err.message);
   throw err;
  });
 }

Results


Source

Full source w/comments here: https://github.com/joeywhelan/workitem

Copyright ©1993-2024 Joey E Whelan, All rights reserved.