Summary
This post covers a contact ACD implementation I've done utilizing Redis data structures. The applications are written in Python. The client interface is implemented as REST API via FastAPI. An internal Python app (Dispatcher) is used to monitor and administer the ACD data structures in Redis. Docker containers are used for architectural components.
- Part 1: Basic ACD with Redis Enterprise
- Part 2: HAProxy with Redis Enterprise
Application Architecture
Data Structures
Contact, Queue
Contacts are implemented as Redis JSON objects. Each contact has an associated array of skills necessary to service that contact. Example: English language proficiency.
A single queue for all contacts is implemented as a Redis Sorted Set. The members of the set are the Redis key names of the contacts. The associated scores are millisecond timestamps of the time the contact entered the queue. This allows for FIFO queue management
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
resp_type: RESPONSE_TYPE = None | |
result: str = None | |
contact_key: str = f'contact:{str(uuid4())}' | |
try: | |
await client.json().set(contact_key, '$', {'skills': skills, 'state': CONTACT_STATE.QUEUED.value, 'agent': None}) | |
await client.zadd('queue', mapping={ contact_key: round(time.time()*1000) }) #time in ms | |
resp_type = RESPONSE_TYPE.OK | |
result = contact_key | |
except Exception as err: | |
result = f'create_contact - {err}' | |
resp_type = RESPONSE_TYPE.ERR | |
finally: | |
return Response(resp_type, result) |
Agent
Agents are implemented as Redis JSON objects. Agent meta-data is stored as simple properties. Agent skills are maintained as arrays. The redis-py implementation of Redlock is used to ensure mutual exclusion to agent objects.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
resp_type: RESPONSE_TYPE = None | |
result: str = None | |
try: | |
lock: Lock = Lock(redis=client, name=f'{agent_key}:lock', timeout=LOCK_TIMEOUT, blocking_timeout=BLOCK_TIME) | |
lock_acquired: bool = await lock.acquire() | |
if lock_acquired: | |
exists: int = await client.exists(agent_key) | |
if exists: | |
result = f'create_agent - agent {agent_key} already exists' | |
resp_type = RESPONSE_TYPE.ERR | |
else: | |
agent_obj: dict = { 'id': agent_key, 'fname': fname, 'lname': lname, 'skills': skills, 'state': AGENT_STATE.UNAVAILABLE.value } | |
await client.json().set(agent_key, '$', agent_obj) | |
result = agent_key | |
resp_type = RESPONSE_TYPE.OK | |
else: | |
resp_type = RESPONSE_TYPE.LOCKED | |
except Exception as err: | |
result = f'create_agent - {err}' | |
resp_type = RESPONSE_TYPE.ERR | |
finally: | |
if await lock.locked(): | |
await lock.release() | |
return Response(resp_type, result) |
Agent Availability
Redis Sorted Sets are also used to track Agent availability. A sorted set is created per skill. The members of that set are the Redis keys for the agents that are available with the associated skill. The associated scores are millisecond timestamps of the time the agent became available. This use of sorted sets allows for multi-skill routing to the longest available agent (LAA).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
lock: Lock = Lock(redis=client, name=f'{agent_key}:lock', timeout=LOCK_TIMEOUT, blocking_timeout=BLOCK_TIME) | |
lock_acquired: bool = await lock.acquire() | |
if lock_acquired: | |
exists: int = await client.exists(agent_key) | |
if not exists: | |
result = f'set_agent_state - {agent_key} does not exist' | |
resp_type = RESPONSE_TYPE.ERR | |
else: | |
current_state = (await client.json().get(agent_key, '$.state'))[0] | |
if AGENT_STATE(current_state) != state: | |
skills: list[list[str]] = await client.json().get(agent_key, '$.skills') | |
for skill in skills[0]: | |
match state: | |
case AGENT_STATE.AVAILABLE: | |
await client.zadd(f'{{availAgentsSkill}}:{skill}', mapping={ agent_key: round(time.time()*1000) }) | |
await client.json().set(agent_key, '$.state', AGENT_STATE.AVAILABLE.value) | |
case AGENT_STATE.UNAVAILABLE: | |
await client.zrem(f'{{availAgentsSkill}}:{skill}', agent_key) | |
await client.json().set(agent_key, '$.state', AGENT_STATE.UNAVAILABLE.value) | |
case _: | |
raise Exception(f'invalid agent state parameter: {state}') | |
result = agent_key | |
resp_type = RESPONSE_TYPE.OK | |
else: | |
result = f'set_agent_state - {agent_key} already in {AGENT_STATE(current_state)}' | |
resp_type = RESPONSE_TYPE.ERR | |
else: | |
resp_type = RESPONSE_TYPE.LOCKED | |
except Exception as err: | |
result = f'set_agent_state - {err}' | |
resp_type = RESPONSE_TYPE.ERR | |
finally: | |
if await lock.locked(): | |
await lock.release() | |
return Response(resp_type, result) |
Operations
Agent Targeting
Routing of contacts to agents is performed by multiple Dispatcher processes. Each Dispatcher is running an infinite loop that does the following:
- Pop the oldest contact from the queue
- Perform an intersection of the availability sets for the skills necessary for that contact
- If there are agent(s) available, assign that agent to this contact and set the agent to unavailable.
- If there are no agents available with the necessary skills, put the contact back in the queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
while True: | |
try: | |
response: list[tuple] = await client.bzpopmin('queue') # using a sorted set as a fifo queue | |
contact_key: str = response[1].decode('utf-8') | |
timestamp: int = int(response[2]) | |
skills: list[list[str]] = await client.json().get(contact_key, '$.skills') | |
avail_keys: list[str] = [f'{{availAgentsSkill}}:{skill}' for skill in skills[0]] | |
agents: list[str] = await client.zinter(avail_keys) | |
agents = [agent.decode('utf-8') for agent in agents] | |
found: bool = False | |
for agent in agents: | |
response: Response = await ops.set_agent_state(client, agent, AGENT_STATE.UNAVAILABLE) | |
if response.resp_type == RESPONSE_TYPE.OK: | |
found = True | |
await client.json().mset([(contact_key, '$.agent', agent), | |
(contact_key, '$.state', CONTACT_STATE.ASSIGNED.value)]) | |
logger.info(f'{contact_key} assigned to {agent}') | |
break | |
if not found: | |
# check if the contact has been abandoned | |
state: list[int] = (await client.json().get(contact_key, '$.state'))[0] | |
if CONTACT_STATE(state) != CONTACT_STATE.COMPLETE: | |
# no agent avail. put contact back on queue with a 1 sec decelerator to allow other contacts to bubble up | |
await client.zadd('queue', mapping={ contact_key: timestamp+1000 }) | |
logger.info(f'{contact_key} queued') | |
await asyncio.sleep(uniform(0, 2)) | |
except Exception as err: | |
if str(err) != "Connection closed by server.": | |
logger.error(err) | |
raise err |