Sunday, December 31, 2023

HAProxy with Redis Enterprise

Summary

This is Part 2 of a two-part series on the implementation of a contact center ACD using Redis data structures.  This part is focused on the network configuration.  In particular, I explain the configuration of HAProxy load balancing with VRRP redundancy in a Redis Enterprise environment.  To boot, I explain some of the complexities of doing this inside a Docker container environment.

Network Architecture


Load Balancing Configuration


HAProxy w/Keepalived

Docker Container

Dockerfile and associated Docker compose script below for two instances of HAProxy w/keepalived.  Note the default start-up for the HAProxy container is overridden with a CMD to start keepalived and haproxy.

FROM haproxytech/haproxy-ubuntu:latest
USER root
RUN apt-get update
RUN apt-get install keepalived -y
RUN apt-get install psmisc -y
CMD service keepalived start; haproxy -f /usr/local/etc/haproxy/haproxy.cfg
lb1:
build:
context: .
dockerfile: $PWD/haproxy/Dockerfile
container_name: lb1
cap_add:
- NET_ADMIN
ports:
- 8000
- 8443
- 9443
- 12000
profiles: ["loadbalancer"]
volumes:
- $PWD/haproxy/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
- $PWD/haproxy/server.pem:/usr/local/etc/haproxy/server.pem
- $PWD/haproxy/keepalived1.conf:/etc/keepalived/keepalived.conf
networks:
- re_cluster
lb2:
build:
context: .
dockerfile: $PWD/haproxy/Dockerfile
container_name: lb2
cap_add:
- NET_ADMIN
ports:
- 8000
- 8443
- 9443
- 12000
profiles: ["loadbalancer"]
volumes:
- $PWD/haproxy/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
- $PWD/haproxy/server.pem:/usr/local/etc/haproxy/server.pem
- $PWD/haproxy/keepalived2.conf:/etc/keepalived/keepalived.conf
networks:
- re_cluster

Keepalived Config

VRRP redundancy of the two HAProxy instances is implemented with keepalived.  Below is the config for the Master instance.  The Backup instance is identical except for the priority.

global_defs {
script_user nobody
enable_script_security
}
vrrp_script chk_haproxy {
script "/usr/bin/killall -0 haproxy"
interval 2
weight 2
}
vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 51
priority 101
advert_int 1
authentication {
auth_type PASS
auth_pass passwd
}
virtual_ipaddress {
192.168.20.100
}
track_script {
chk_haproxy
}
}

Web Servers

I'll start with the simplest load-balancing scenario - web farm.


Docker Container

Below is the Dockerfile and associated Docker compose scripting for a 2-server deployment of Python FastAPI.  Note that no IP addresses are assigned and multiple instances are deployed via Docker compose 'replicas'.

FROM python:3.10-slim
WORKDIR /app
COPY ./requirements.txt ./
RUN pip install --no-cache-dir --upgrade -r ./requirements.txt
COPY ./restapi/log_conf.yaml ./src/main.py ./src/operations.py ./src/response.py ./src/states.py ./
COPY ./.env ./
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--log-config=log_conf.yaml"]
rest:
build:
context: .
dockerfile: $PWD/restapi/Dockerfile
deploy:
replicas: 2
ports:
- 8000
profiles: ["rest"]
networks:
- re_cluster


HAProxy Config

Below are the front and backend configurations.  Note the use of Docker's DNS server to enable dynamic mapping of the web servers via a HAProxy server template.

resolvers docker
nameserver dns1: 127.0.0.11:53
frontend rest_fe
mode http
bind :8000
default_backend rest_be
backend rest_be
mode http
balance roundrobin
server-template restapi- 2 rest:8000 check resolvers docker init-addr none

Redis Enterprise Components

Redis Enterprise can provide its own load balancing via internal DNS servers.  For those that do not want to use DNS, external load balancing is also supported.  Official Redis documentation on the general configuration of external load balancing is here.  I'm going to go into detail on the specifics of setting this up with the HAProxy load balancer in a Docker environment.

Docker Containers

A three-node cluster is provisioned below.  Note the ports that are opened:
  • 8443 - Redis Enterprise Admin Console
  • 9443 - Redis Enterprise REST API
  • 12000 - The client port configured for the database.

re1:
image: redislabs/redis:latest
container_name: re1
restart: unless-stopped
tty: true
cap_add:
- sys_resource
ports:
- 8443
- 9443
- 12000
profiles: ["redis"]
networks:
re_cluster:
ipv4_address: 192.168.20.2
re2:
image: redislabs/redis:latest
container_name: re2
restart: unless-stopped
tty: true
cap_add:
- sys_resource
ports:
- 8443
- 9443
- 12000
profiles: ["redis"]
networks:
re_cluster:
ipv4_address: 192.168.20.3
re3:
image: redislabs/redis:latest
container_name: re3
restart: unless-stopped
tty: true
cap_add:
- sys_resource
ports:
- 8443
- 9443
- 12000
profiles: ["redis"]
networks:
re_cluster:
ipv4_address: 192.168.20.4

RE Database Configuration

Below is a JSON config that can be used via the RE REST API to create a Redis database.  Note the proxy policy.  "all-nodes" enables a database client connection point on all the Redis nodes.

{
"name": "redb",
"type": "redis",
"memory_size": 10000000,
"port": 12000,
"authentication_redis_pass": "redis",
"proxy_policy": "all-nodes",
"sharding": true,
"shards_count": 2,
"shards_placement": "sparse",
"shard_key_regex": [{"regex": ".*\\{(?<tag>.*)\\}.*"}, {"regex": "(?<tag>.*)"}],
"replication": false,
"module_list": [{
"module_name":"ReJSON",
"module_args": ""
}]
}
view raw lb-redb.json hosted with ❤ by GitHub

RE Cluster Configuration

In the start.sh script, this command below is added to configure redirects in the Cluster (per the Redis documentation).

docker exec -it re1 /opt/redislabs/bin/rladmin cluster config handle_redirects enabled
view raw lb-recluster.sh hosted with ❤ by GitHub

HAProxy Config - RE Admin Console

Redis Enterprise has a web interface for configuration and monitoring (TLS, port 8443).  I configure back-to-back TLS sessions below with a local SSL cert for the front end.  Additionally, I configure 'sticky' sessions via cookies.

frontend redisadmin_fe
mode http
bind :8443 ssl crt /usr/local/etc/haproxy/server.pem
default_backend redisadmin_be
backend redisadmin_be
mode http
balance leastconn
cookie SERVER_USED insert indirect nocache
server re1 re1:8443 check cookie re1 ssl verify none
server re2 re2:8443 check cookie re2 ssl verify none
server re3 re3:8443 check cookie re3 ssl verify none

HAProxy Config - RE REST API

Redis Enterprise provides a REST API for programmatic configuration and provisioning (TLS, port 9443).  For this scenario, I simply pass the TLS sessions through HAProxy via TCP.

frontend redisrest_fe
mode tcp
bind :9443
default_backend redisrest_be
backend redisrest_be
mode tcp
balance roundrobin
server re1 re1:9443 check
server re2 re2:9443 check
server re3 re3:9443 check

HAProxy Config - RE Database

A Redis Enterprise database can have a configurable client connection port.  In this case, I've configured it to 12000 (TCP).  Note in the backend configuration I've set up a Layer 7 health check that will attempt to create an authenticated Redis client connection, send a Redis PING, and then close that connection.

frontend redb_fe
mode tcp
bind :12000
default_backend redb_be
backend redb_be
mode tcp
balance roundrobin
option tcp-check
tcp-check send AUTH\ redis\r\n
tcp-check expect string +OK
tcp-check send PING\r\n
tcp-check expect string +PONG
tcp-check send QUIT\r\n
tcp-check expect string +OK
server re1 re1:12000 check
server re2 re2:12000 check
server re3 re3:12000 check
view raw lb-redb.cfg hosted with ❤ by GitHub

Source


https://github.com/redis-developer/basic-acd

Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Basic ACD with Redis Enterprise

Summary

This post covers a contact ACD implementation I've done utilizing Redis data structures.  The applications are written in Python.  The client interface is implemented as REST API via FastAPI.  An internal Python app (Dispatcher) is used to monitor and administer the ACD data structures in Redis.  Docker containers are used for architectural components.


Application Architecture



Data Structures


Contact, Queue


Contacts are implemented as Redis JSON objects.  Each contact has an associated array of skills necessary to service that contact.  Example:  English language proficiency.

A single queue for all contacts is implemented as a Redis Sorted Set.  The members of the set are the Redis key names of the contacts.  The associated scores are millisecond timestamps of the time the contact entered the queue.  This allows for FIFO queue management  
resp_type: RESPONSE_TYPE = None
result: str = None
contact_key: str = f'contact:{str(uuid4())}'
try:
await client.json().set(contact_key, '$', {'skills': skills, 'state': CONTACT_STATE.QUEUED.value, 'agent': None})
await client.zadd('queue', mapping={ contact_key: round(time.time()*1000) }) #time in ms
resp_type = RESPONSE_TYPE.OK
result = contact_key
except Exception as err:
result = f'create_contact - {err}'
resp_type = RESPONSE_TYPE.ERR
finally:
return Response(resp_type, result)
view raw bacd-contact.py hosted with ❤ by GitHub


Agent


Agents are implemented as Redis JSON objects.  Agent meta-data is stored as simple properties.  Agent skills are maintained as arrays.  The redis-py implementation of Redlock is used to ensure mutual exclusion to agent objects.
resp_type: RESPONSE_TYPE = None
result: str = None
try:
lock: Lock = Lock(redis=client, name=f'{agent_key}:lock', timeout=LOCK_TIMEOUT, blocking_timeout=BLOCK_TIME)
lock_acquired: bool = await lock.acquire()
if lock_acquired:
exists: int = await client.exists(agent_key)
if exists:
result = f'create_agent - agent {agent_key} already exists'
resp_type = RESPONSE_TYPE.ERR
else:
agent_obj: dict = { 'id': agent_key, 'fname': fname, 'lname': lname, 'skills': skills, 'state': AGENT_STATE.UNAVAILABLE.value }
await client.json().set(agent_key, '$', agent_obj)
result = agent_key
resp_type = RESPONSE_TYPE.OK
else:
resp_type = RESPONSE_TYPE.LOCKED
except Exception as err:
result = f'create_agent - {err}'
resp_type = RESPONSE_TYPE.ERR
finally:
if await lock.locked():
await lock.release()
return Response(resp_type, result)
view raw bacd-agent.py hosted with ❤ by GitHub


Agent Availability


Redis Sorted Sets are also used to track Agent availability.  A sorted set is created per skill.  The members of that set are the Redis keys for the agents that are available with the associated skill.  The associated scores are millisecond timestamps of the time the agent became available.  This use of sorted sets allows for multi-skill routing to the longest available agent (LAA).


try:
lock: Lock = Lock(redis=client, name=f'{agent_key}:lock', timeout=LOCK_TIMEOUT, blocking_timeout=BLOCK_TIME)
lock_acquired: bool = await lock.acquire()
if lock_acquired:
exists: int = await client.exists(agent_key)
if not exists:
result = f'set_agent_state - {agent_key} does not exist'
resp_type = RESPONSE_TYPE.ERR
else:
current_state = (await client.json().get(agent_key, '$.state'))[0]
if AGENT_STATE(current_state) != state:
skills: list[list[str]] = await client.json().get(agent_key, '$.skills')
for skill in skills[0]:
match state:
case AGENT_STATE.AVAILABLE:
await client.zadd(f'{{availAgentsSkill}}:{skill}', mapping={ agent_key: round(time.time()*1000) })
await client.json().set(agent_key, '$.state', AGENT_STATE.AVAILABLE.value)
case AGENT_STATE.UNAVAILABLE:
await client.zrem(f'{{availAgentsSkill}}:{skill}', agent_key)
await client.json().set(agent_key, '$.state', AGENT_STATE.UNAVAILABLE.value)
case _:
raise Exception(f'invalid agent state parameter: {state}')
result = agent_key
resp_type = RESPONSE_TYPE.OK
else:
result = f'set_agent_state - {agent_key} already in {AGENT_STATE(current_state)}'
resp_type = RESPONSE_TYPE.ERR
else:
resp_type = RESPONSE_TYPE.LOCKED
except Exception as err:
result = f'set_agent_state - {err}'
resp_type = RESPONSE_TYPE.ERR
finally:
if await lock.locked():
await lock.release()
return Response(resp_type, result)

Operations


Agent Targeting 


Routing of contacts to agents is performed by multiple Dispatcher processes.  Each Dispatcher is running an infinite loop that does the following:
  • Pop the oldest contact from the queue
  • Perform an intersection of the availability sets for the skills necessary for that contact
  • If there are agent(s) available, assign that agent to this contact and set the agent to unavailable.
  • If there are no agents available with the necessary skills, put the contact back in the queue

while True:
try:
response: list[tuple] = await client.bzpopmin('queue') # using a sorted set as a fifo queue
contact_key: str = response[1].decode('utf-8')
timestamp: int = int(response[2])
skills: list[list[str]] = await client.json().get(contact_key, '$.skills')
avail_keys: list[str] = [f'{{availAgentsSkill}}:{skill}' for skill in skills[0]]
agents: list[str] = await client.zinter(avail_keys)
agents = [agent.decode('utf-8') for agent in agents]
found: bool = False
for agent in agents:
response: Response = await ops.set_agent_state(client, agent, AGENT_STATE.UNAVAILABLE)
if response.resp_type == RESPONSE_TYPE.OK:
found = True
await client.json().mset([(contact_key, '$.agent', agent),
(contact_key, '$.state', CONTACT_STATE.ASSIGNED.value)])
logger.info(f'{contact_key} assigned to {agent}')
break
if not found:
# check if the contact has been abandoned
state: list[int] = (await client.json().get(contact_key, '$.state'))[0]
if CONTACT_STATE(state) != CONTACT_STATE.COMPLETE:
# no agent avail. put contact back on queue with a 1 sec decelerator to allow other contacts to bubble up
await client.zadd('queue', mapping={ contact_key: timestamp+1000 })
logger.info(f'{contact_key} queued')
await asyncio.sleep(uniform(0, 2))
except Exception as err:
if str(err) != "Connection closed by server.":
logger.error(err)
raise err

Source