Sunday, December 31, 2023

Basic ACD with Redis Enterprise

Summary

This post covers a contact ACD implementation I've done utilizing Redis data structures.  The applications are written in Python.  The client interface is implemented as REST API via FastAPI.  An internal Python app (Dispatcher) is used to monitor and administer the ACD data structures in Redis.  Docker containers are used for architectural components.


Application Architecture



Data Structures


Contact, Queue


Contacts are implemented as Redis JSON objects.  Each contact has an associated array of skills necessary to service that contact.  Example:  English language proficiency.

A single queue for all contacts is implemented as a Redis Sorted Set.  The members of the set are the Redis key names of the contacts.  The associated scores are millisecond timestamps of the time the contact entered the queue.  This allows for FIFO queue management  
resp_type: RESPONSE_TYPE = None
result: str = None
contact_key: str = f'contact:{str(uuid4())}'
try:
await client.json().set(contact_key, '$', {'skills': skills, 'state': CONTACT_STATE.QUEUED.value, 'agent': None})
await client.zadd('queue', mapping={ contact_key: round(time.time()*1000) }) #time in ms
resp_type = RESPONSE_TYPE.OK
result = contact_key
except Exception as err:
result = f'create_contact - {err}'
resp_type = RESPONSE_TYPE.ERR
finally:
return Response(resp_type, result)
view raw bacd-contact.py hosted with ❤ by GitHub


Agent


Agents are implemented as Redis JSON objects.  Agent meta-data is stored as simple properties.  Agent skills are maintained as arrays.  The redis-py implementation of Redlock is used to ensure mutual exclusion to agent objects.
resp_type: RESPONSE_TYPE = None
result: str = None
try:
lock: Lock = Lock(redis=client, name=f'{agent_key}:lock', timeout=LOCK_TIMEOUT, blocking_timeout=BLOCK_TIME)
lock_acquired: bool = await lock.acquire()
if lock_acquired:
exists: int = await client.exists(agent_key)
if exists:
result = f'create_agent - agent {agent_key} already exists'
resp_type = RESPONSE_TYPE.ERR
else:
agent_obj: dict = { 'id': agent_key, 'fname': fname, 'lname': lname, 'skills': skills, 'state': AGENT_STATE.UNAVAILABLE.value }
await client.json().set(agent_key, '$', agent_obj)
result = agent_key
resp_type = RESPONSE_TYPE.OK
else:
resp_type = RESPONSE_TYPE.LOCKED
except Exception as err:
result = f'create_agent - {err}'
resp_type = RESPONSE_TYPE.ERR
finally:
if await lock.locked():
await lock.release()
return Response(resp_type, result)
view raw bacd-agent.py hosted with ❤ by GitHub


Agent Availability


Redis Sorted Sets are also used to track Agent availability.  A sorted set is created per skill.  The members of that set are the Redis keys for the agents that are available with the associated skill.  The associated scores are millisecond timestamps of the time the agent became available.  This use of sorted sets allows for multi-skill routing to the longest available agent (LAA).


try:
lock: Lock = Lock(redis=client, name=f'{agent_key}:lock', timeout=LOCK_TIMEOUT, blocking_timeout=BLOCK_TIME)
lock_acquired: bool = await lock.acquire()
if lock_acquired:
exists: int = await client.exists(agent_key)
if not exists:
result = f'set_agent_state - {agent_key} does not exist'
resp_type = RESPONSE_TYPE.ERR
else:
current_state = (await client.json().get(agent_key, '$.state'))[0]
if AGENT_STATE(current_state) != state:
skills: list[list[str]] = await client.json().get(agent_key, '$.skills')
for skill in skills[0]:
match state:
case AGENT_STATE.AVAILABLE:
await client.zadd(f'{{availAgentsSkill}}:{skill}', mapping={ agent_key: round(time.time()*1000) })
await client.json().set(agent_key, '$.state', AGENT_STATE.AVAILABLE.value)
case AGENT_STATE.UNAVAILABLE:
await client.zrem(f'{{availAgentsSkill}}:{skill}', agent_key)
await client.json().set(agent_key, '$.state', AGENT_STATE.UNAVAILABLE.value)
case _:
raise Exception(f'invalid agent state parameter: {state}')
result = agent_key
resp_type = RESPONSE_TYPE.OK
else:
result = f'set_agent_state - {agent_key} already in {AGENT_STATE(current_state)}'
resp_type = RESPONSE_TYPE.ERR
else:
resp_type = RESPONSE_TYPE.LOCKED
except Exception as err:
result = f'set_agent_state - {err}'
resp_type = RESPONSE_TYPE.ERR
finally:
if await lock.locked():
await lock.release()
return Response(resp_type, result)

Operations


Agent Targeting 


Routing of contacts to agents is performed by multiple Dispatcher processes.  Each Dispatcher is running an infinite loop that does the following:
  • Pop the oldest contact from the queue
  • Perform an intersection of the availability sets for the skills necessary for that contact
  • If there are agent(s) available, assign that agent to this contact and set the agent to unavailable.
  • If there are no agents available with the necessary skills, put the contact back in the queue

while True:
try:
response: list[tuple] = await client.bzpopmin('queue') # using a sorted set as a fifo queue
contact_key: str = response[1].decode('utf-8')
timestamp: int = int(response[2])
skills: list[list[str]] = await client.json().get(contact_key, '$.skills')
avail_keys: list[str] = [f'{{availAgentsSkill}}:{skill}' for skill in skills[0]]
agents: list[str] = await client.zinter(avail_keys)
agents = [agent.decode('utf-8') for agent in agents]
found: bool = False
for agent in agents:
response: Response = await ops.set_agent_state(client, agent, AGENT_STATE.UNAVAILABLE)
if response.resp_type == RESPONSE_TYPE.OK:
found = True
await client.json().mset([(contact_key, '$.agent', agent),
(contact_key, '$.state', CONTACT_STATE.ASSIGNED.value)])
logger.info(f'{contact_key} assigned to {agent}')
break
if not found:
# check if the contact has been abandoned
state: list[int] = (await client.json().get(contact_key, '$.state'))[0]
if CONTACT_STATE(state) != CONTACT_STATE.COMPLETE:
# no agent avail. put contact back on queue with a 1 sec decelerator to allow other contacts to bubble up
await client.zadd('queue', mapping={ contact_key: timestamp+1000 })
logger.info(f'{contact_key} queued')
await asyncio.sleep(uniform(0, 2))
except Exception as err:
if str(err) != "Connection closed by server.":
logger.error(err)
raise err

Source