Thursday, May 29, 2014

Distributed Job Scheduling with Redis/Node.js

Summary
I'll be discussing a distributed job scheduling implementation utilizing Node.js as the application engine and Redis for the underlying data structures.

In this implementation, I'll be using an event-driven model (no polling) with a separate Dispatcher process for assigning workers to jobs.  Events are announced via the Redis publisher/subscriber model.  Available job and worker queues are realized via Redis Lists.

Implementation

Figure 1 below depicts my overall approach for this application.
  • Test Runner - Node.js application to kick-off a job generator process.  Jobs are generated at a configurable rate.
  • Job Generator - Node.js  application that inserts jobs into the available job queue and notifies the Dispatcher of the available job.
  • Redis - provides the shared data structures and pub/sub mechanism for notifications.
  • Dispatcher - Node.js application that assigns available workers to jobs.
  • Worker - Node.js application that simulates working on a job.
Figure 1

Figure 2 below depicts the data structures and overall flow of this application.  Redis List structures are used for FIFO queuing.

Figure 2

Code Snippets

main.js - This is the hub of the application.  Node's cluster mechanism is used to create a single dispatcher process and (#CPU's - 1) worker processes.   Each worker is assigned a name that equates to their host machine name and process id on that host.  Various internal counters and data structures are reset on start up (jobId, jobQueue, etc).  Node domains are used for resilience.

var cluster = require('cluster');
var domain = require('domain');
var redis = require('redis');
var os = require('os');
var logger = require('./logger');
var timestamper = require('./timeStamper');
var worker = require('./worker');
var dispatcher = require('./dispatcher');
var utilities = require('./utilities');
var properties = require('./properties');
var numProcesses = os.cpus().length;

if (cluster.isMaster)
{
 
    var client = redis.createClient(properties.redisServer.port, properties.redisServer.host);
    var multi = client.multi();
    multi.del('start');
    multi.del('jobId');
    multi.del('jobQueue');
    multi.del('workerQueue');
    multi.exec(function (err, res){
        if (err)
            throw err;
        cluster.fork({processType : 'dispatcher'});
        numProcesses--;
     
        do
        {
            cluster.fork({processType : 'worker'});
            numProcesses--;
        }
        while (numProcesses > 0);
        multi.quit();
        client.quit();
    });
 
cluster.on('disconnect', function(worker) {  //process died
logger.error('File: main.js, Worker %d died', worker.process.pid);
});
}
else
{
    logger.debug('File: main.js, ProcessType %s launched', process.env.processType);
    initDomain(process.env.processType);
}


function initDomain(processType)
{
    logger.debug('Entering - File: main.js, Method: initDomain, processType:%s, pid:%d', processType, process.pid);
    var d = domain.create();
    var client = redis.createClient(properties.redisServer.port, properties.redisServer.host);
    var subscriber = redis.createClient(properties.redisServer.port, properties.redisServer.host);
 
    d.on('error', function (err) {
        try
        {
            logger.error('Crash: ' + err.message);
            var killtimer = setTimeout(function() {process.exit(1);}, 10000);
            killtimer.unref();
         
            client.quit();
            subscriber.quit();
            cluster.worker.disconnect();
        }
        catch (exc)
        {
            console.log(timestamper() + 'Error encountered during crash recovery: ' + exc.message);
        }
    });
 
    switch (processType)
    {
        case 'dispatcher':
            d.run(function() {
                new dispatcher(client, subscriber).start();
            });
            break;
   
        case 'worker':
            d.run(function() {
                new worker(os.hostname() + ':' + process.pid, client, subscriber).start();
            });
            break;
    }
 
    logger.debug('Exiting - File: main.js, Method: initDomain, processType:%s', processType);

};



worker.js - Below is the start up code for a worker.  A worker uses two Redis client connections: a general-use connection and a second connection dedicated for messaging via pub/sub.

worker.prototype.start = function()
{
    logger.debug('Entering - File: worker.js, Method: start, workerId:%s', this.workerId);
 
    var that = this;
 
    this.client.on('error', function (err){
        logger.error('File: worker.js, Method: start, redis client error: ' + err.message);
        throw err;
    });
    this.subscriber.on('error', function (err){
        logger.error('File: worker.js, Method: start, subscriber client error: ' + err.message);
        throw err;
    });
 
    this.subscriber.subscribe(this.workerId);  //subscribe to a message channel specific to this worker
 
    this.subscriber.on('subscribe', function(channel, count){
        setWorkerAvailable(that.client, that.workerId);
    });
 
    this.subscriber.on('message', function(channel, msg){
        logger.debug('worker %s received job %s', that.workerId, msg);
        doWork(msg, function(){
                setWorkerAvailable(that.client, that.workerId);
                if (msg == properties.numJobs)
                {
                    that.client.get('start', function (err, res){
                        var duration = new Date().getTime() - res;
                        logger.debug('*****duration(ms): ' + duration);
                    });
                }
        });
    });
 
    logger.debug('Exiting - File: worker.js, Method: start, workerId:%s', this.workerId);
};


function setWorkerAvailable(client, workerId)
{
    logger.debug('Entering - File: worker.js, Method: setWorkerAvailable, workerId: %s', workerId);
    
    client.lpush('workerQueue', workerId, function (err, res){
        if (err)
        {
            logger.error('File: worker.js, Method: setWorkerAvailable, client.lpush error: ' + err.message);
            throw err;
        }
        
        utils.notifyDispatcher(client, JSON.stringify({'type' : 'worker', 'id' : workerId}));
    });
    logger.debug('Exiting - File: worker.js, Method: setWorkerAvailable, workerId: %s', workerId);

};

dispatcher.js - The dispatcher process provides the 'brains' of the operation.  It listens for job/worker events and then assigns jobs to available workers, first come - first serve.  A Lua script is used to ensure atomicity of the job/worker assignment step.

var dispatchScript = 'local jobQueueLen = redis.call(\'llen\', KEYS[1]) \
                      local workerQueueLen = redis.call(\'llen\', KEYS[2]) \
                      if (jobQueueLen > 0 and workerQueueLen > 0) then \
                        local job = redis.call(\'rpop\', KEYS[1]) \
                        local worker = redis.call(\'rpop\', KEYS[2]) \
                        return {job, worker} \
                      else \
                        return nil \
                      end';

function messageHandler(client, msg)
{
    logger.debug('Entering - File: dispatcher.js, Method: messageHandler');
    client.eval(dispatchScript, 2, 'jobQueue', 'workerQueue', function (err, res){
        if (err)
        {
            logger.error('File: dispatcher.js, Method: messageHandler, redis eval error: ' + err.message);
            throw err;
        }
        if (res && res.length == 2)
            notifyWorker(client, res[0], res[1]);
    });
    logger.debug('Exiting - File: dispatcher.js, Method: messageHandler');
}; 



generator.js - This process simulates job insertions into the job queue.  It notifies the dispatcher accordingly when a job has become available.

generator.prototype.generate = function()
{
    logger.debug('Entering - File: generator.js, Method: generate');
 
    var that = this;
 
    that.client.incr('jobId', function (err1, jobId){
        if (err1)
        {
            logger.error('File: generator.js, Method: generate, client.incr error: ' + err1.message);
            throw err1;
        }
     
     
        that.client.lpush('jobQueue', jobId, function(err2, res){
            if (err2)
            {
                logger.error('File: generator.js, Method: generate, client.lpush error: ' + err2.message);
                throw err2;
            }
            logger.debug('File: generator.js, Method: generate, jobId %d placed on queue', jobId);
            utils.notifyDispatcher(that.client, JSON.stringify({'type' : 'job', 'id' : jobId}));
        });
    });
 
    logger.debug('Exiting - File: generator.js, Method: generate');
}; 


utilities.js - This code provides a common mechanism for the worker and generator processes to notify the dispatcher of available jobs and/or workers.  A Lua script is used to optimize (and atomize) the dispatcher notification such that a broadcast only goes out if there is at least 1 job and 1 worker currently available.

var notifyScript = 'local jobQueueLen = redis.call(\'llen\', KEYS[1]) \
                    local workerQueueLen = redis.call(\'llen\', KEYS[2]) \
                    if (jobQueueLen > 0 and workerQueueLen > 0) then \
                        return redis.call(\'publish\', KEYS[3], ARGV[1]) \
                    else \
                        return -1 \
                    end';

utilities.notifyDispatcher = function(client, msg)
{
    logger.debug('Entering - File: utilities.js, Method: notifyDispatcher, msg: %s', msg);
    client.eval(notifyScript, 3, 'jobQueue', 'workerQueue', 'queueMessages', msg, function (err, res){
        if (err)
        {
            logger.error('File: utilities.js, Method: notifyDispatcher, client.eval error: ' + err.message);
            throw err;
        }
     
        logger.debug('File: utilities.js, Method: notifyDispatcher, client eval res:' + res);
        if (res == 0)  //resend 1 second later if no Dispatcher received the message
        {
            setTimeout(function(){ utilities.notifyDispatcher(client, msg);}, 1000);
        }
    });
 
    logger.debug('Exiting - File: utilities.js, Method: notifyDispatcher, msg: %s', msg);
};