Tuesday, December 31, 2024

Gloo Gateway on Kind

Summary

In this post, I'll cover a demo deployment of Gloo Gateway in a Kind K8s environment.
  • 3-worker node Kind K8s environment
  • 2 dummy REST microservices implemented in Nodejs via Express.js
  • OSS deployment of Gloo Gateway providing ingress to the two microservices

Architecture

Deployment

Kind



echo -e "\n*** Deploy Cloud Provider Kind ***"
docker run -d --rm --name cloud-provider-kind --network kind \
-v /var/run/docker.sock:/var/run/docker.sock registry.k8s.io/cloud-provider-kind/cloud-controller-manager:v0.4.0
echo -e "\n*** Deploy Kind Cluster ***"
kind create cluster --config=$PWD/kind/config.yaml --name demo-kind-cluster
if [ -z "$(docker image ls -q ms1:1.0)" ]
then
echo -e "\n*** Build Microservice 1 Container ***"
docker build --no-cache --tag ms1:1.0 $PWD/ms1
fi
if [ -z "$(docker image ls -q ms2:1.0)" ]
then
echo -e "\n*** Build Microservice 2 Container ***"
docker build --no-cache --tag ms2:1.0 $PWD/ms2
fi
echo -e "\n*** Load Microservice Images ***"
kind --name demo-kind-cluster load docker-image ms1:1.0
kind --name demo-kind-cluster load docker-image ms2:1.0
view raw kind.sh hosted with ❤ by GitHub

Gloo Gateway



echo -e "\n*** Deploy Gloo Gateway ***"
./glooctl install gateway
echo -e "\n*** Create Routes to Microservices ***"
./glooctl add route --name vs1 --path-exact /ms1 --dest-name default-ms1-service-8000 --prefix-rewrite /service1
./glooctl add route --name vs1 --path-exact /ms2 --dest-name default-ms2-service-9000 --prefix-rewrite /service2
view raw gloo.sh hosted with ❤ by GitHub

Microservices



echo -e "\n*** Deploy Microservices ***"
kubectl apply -f $PWD/ms1/ms1.yaml
kubectl apply -f $PWD/ms2/ms2.yaml
kubectl rollout status deployment/ms1-app
kubectl rollout status deployment/ms2-app
view raw micro.sh hosted with ❤ by GitHub

Source

Thursday, February 22, 2024

Redis RAG with Nvidia NeMoGuardrails

Summary


This post will cover the usage of guardrails in the context of an RAG application using Redis Stack as the vector store.  
  • Nvidia's guardrail package is used for the railed implementation.
  • Langchain LCEL is used for the non-railed implementation.
  • Content from the online Redis vector search documentation is used for the RAG content
  • GUI is implemented with Chainlit


Application Architecture


This bot is operating within a Chainlit app.  It has two modes of operation:  
  • 'chain' - no guardrails
  • 'rails' - NeMo guardrails in place for both user inputs and LLM outputs


Screenshots


Bot without rails

This first screenshot shows the bot operating with no guardrails.  It does just fine until an off-topic question is posed - then it cheerfully deviates from its purpose.



Bot with rails

Same series of questions here with guardrails enabled.  Note that it keeps the user on topic now.



Code Snippets

Non-railed chain (LCEL)


retriever: BaseRetriever = redis.Redis.from_existing_index(
OpenAIEmbeddings(model='text-embedding-3-small', dimensions=512),
index_name=os.getenv('INDEX_NAME'),
redis_url=os.getenv('REDIS_URL'),
schema=os.getenv('SCHEMA')
).as_retriever(search_type='similarity_score_threshold', search_kwargs={'score_threshold':0.5})
chain: Runnable = (
{ 'chat_history': RunnablePassthrough(), 'input': RunnablePassthrough() }
| hub.pull("joeywhelan/rephrase")
| ChatOpenAI(model_name='gpt-3.5-turbo', temperature=0)
| StrOutputParser()
| RunnableParallel({ 'question': RunnablePassthrough() })
| { 'context': itemgetter('question') | retriever, 'question': itemgetter('question') }
| hub.pull('rlm/rag-prompt')
| ChatOpenAI(model_name='gpt-3.5-turbo', temperature=0)
| StrOutputParser()
)

Railed with NeMO Guardrails



config: RailsConfig = RailsConfig.from_path('./guardrails')
rails: LLMRails = LLMRails(config, verbose=False)
define user asks question specifically about redis vector search
"how does redis index vectors?"
"what is the purpose of the EF_RUNTIME parameter?"
"what redis data structures can be used for vector search?"
"what vector distance metrics does redis support?"
define flow vector_question
user asks question specifically about redis vector search
# Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question.
$rephrased = ...
$answer = execute rag(question=$rephrased)
bot $answer

Source


Copyright  ©2024 Joey E Whelan, All rights reserved.

Monday, January 15, 2024

Change Data Capture w/Redis Enterprise

Summary

Redis Enterprise has the capability for continuous data integration with 3rd party data sources.  This capability is enabled via the Redis Data Integration (RDI) product.  With RDI, change data capture (CDC) can be achieved with all the major SQL databases for ingress.  Similarly, in the other direction, updates to Redis can be continuously written to 3rd party targets via the write-behind functionality of RDI.  

This post covers a demo-grade environment of Redis Enterprise + RDI with ingress and write-behind integrations with the following SQL databases:  Oracle, MS SQL, Postgres, and MySQL.  All components are containerized and run from a Docker environment.

Architecture


Ingress



Write-behind



Code Snippets

Docker Compose - Redis Enterprise Node


#RE Cluster - Node 1
re1:
image: redislabs/redis:latest
container_name: re1
restart: unless-stopped
tty: true
cap_add:
- sys_resource
ports:
- 12000
- 8443
- 9443
- 8070
profiles: ["mysql", "postgres", "sqlserver", "oracle_lm", "oracle_xs"]
networks:
re_cluster:
ipv4_address: 192.168.20.2
view raw rdi-re.yml hosted with ❤ by GitHub

Docker Compose - Oracle Enterprise


oracle_lm:
image: container-registry.oracle.com/database/enterprise:latest
container_name: oracle_lm
ports:
- 1521
depends_on:
- re1
- re2
- re3
environment:
- ORACLE_SID=ORCLCDB
- ORACLE_EDITION=enterprise
- ORACLE_PWD=Password1
- INIT_SGA_SIZE=1024
- INIT_PGA_SIZE=1024
volumes:
- $PWD/conf/oracle_lm/scripts:/opt/oracle/scripts/startup
profiles: ["oracle_lm"]
networks:
- re_cluster

Docker Compose - Debezium


debezium:
build:
context: $PWD/conf/debezium
args:
INSTANT_CLIENT: $INSTANT_CLIENT
container_name: debezium
volumes:
- $PWD/conf/$SOURCE_DB/application.properties:/debezium/conf/application.properties
profiles: ["debezium"]
networks:
- re_cluster

RDI Ingress w/Prometheus Integration


echo "*** Build Redis DI DB for Ingress ***"
./redis-di create --silent --cluster-host 192.168.20.2 --cluster-api-port 9443 --cluster-user redis@redis.com \
--cluster-password redis --rdi-port 13000 --rdi-password redis
echo "*** Deploy Redis DI for Ingress ***"
./redis-di deploy --dir ./conf/$SOURCE_DB/ingest --rdi-host 192.168.20.3 --rdi-port 13000 --rdi-password redis
echo "*** Start Debezium ***"
SOURCE_DB=$SOURCE_DB INSTANT_CLIENT=$INSTANT_CLIENT docker compose --profile debezium up -d
echo "*** Start Redis DI Monitor ***"
./redis-di monitor --rdi-host 192.168.20.3 --rdi-port 13000 --rdi-password redis &

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Monday, January 8, 2024

Document AI with Apache Airflow

Summary

In this post, I cover an approach to a document AI problem using a task flow implemented in Apache Airflow.  The particular problem is around the de-duplication of invoices.  This comes up in payment provider space.  I use Azure AI Document Intelligence for OCR, Azure OpenAI for vector embeddings, and Redis Enterprise for vector search.

Architecture



Code Snippets


File Sensor DAG


@task.sensor(task_id="check_inbox", mode="reschedule", timeout=10, executor_config=executor_config_volume_mount)
def check_inbox() -> PokeReturnValue:
""" File sensor for invoices inbox. If files are detected in the inbox, a cascade processing tasks are triggered:
OCR, Embed, Dedup.
"""
storage_var = Variable.get("storage", deserialize_json=True, default_var=None)
if (type(storage_var) != 'dict'): # hack for an apparent bug in airflow
storage_var = json.loads(storage_var)
inbox_path = storage_var['inbox']
inbox_files = list(map(lambda file: os.path.join(inbox_path, file), os.listdir(inbox_path)))
logging.info(f'Number of files to be processed: {len(inbox_files)}')
if len(inbox_files) > 0:
return PokeReturnValue(is_done=True, xcom_value=inbox_files)
else:
return PokeReturnValue(is_done=False)

OCR DAG


@task(task_id='parse_invoice', executor_config=executor_config_volume_mount)
def parse_invoice(inbox_file: str) -> dict:
""" OCR is performed on each of invoices in the inbox. The result of OCR is space delimited string of a
configurable number of invoice fields.
"""
from invoice.lib.ocr import ocr
invoice = ocr(inbox_file)
invoice['file'] = inbox_file
logging.info(f'Invoice: {pprint.pformat(invoice)}')
return invoice
view raw docai-ocrdag.py hosted with ❤ by GitHub

OCR Client (Azure AI Doc Intelligence)


@retry(wait=wait_random_exponential(min=10, max=60), stop=stop_after_attempt(3))
def ocr(filepath: str) -> dict:
""" Executes Azure Form Recognized OCR and returns a Python dict that includes a text string
of space-separated values from the input invoice.
"""
formrec_var = Variable.get("formrec", deserialize_json=True, default_var=None)
if (type(formrec_var) != 'dict'): # hack for an apparent bug in airflow
formrec_var = json.loads(formrec_var)
key = formrec_var["key"]
endpoint = formrec_var["endpoint"]
vector_fields = formrec_var["fields"]
client = DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key))
with open(filepath, "rb") as f:
poller = client.begin_analyze_document("prebuilt-invoice", document=f, locale="en-US")
invoice = (poller.result()).documents[0]
return stringify(invoice, vector_fields)

Embedding DAG


@task(task_id='embed_invoice')
def embed_invoice(invoice: dict) -> dict:
""" Accepts a invoice dict that includes a text field of the OCR output
and adds an OpenAI embedding (array of floats) to that dict
"""
from invoice.lib.embed import get_embedding
vector = get_embedding(invoice['ocr'])
invoice['vector'] = vector
logging.info(f'Invoice: {invoice["file"]}, Vector len: {invoice["vector"]}')
return invoice

Embedding Client (Azure OpenAI)


@retry(wait=wait_random_exponential(min=3, max=100), stop=stop_after_attempt(10))
def get_embedding(text: str) -> [float]:
response = openai.Embedding.create(
input=text,
engine="EmbeddingModel"
)
return response['data'][0]['embedding']

Vector Search DAG


@task(task_id='dedup_invoice', executor_config=executor_config_volume_mount)
def dedup_invoice(invoice: dict) -> None:
""" Sends the invoice dict into a Redis VSS lookup to determine disposition - process or call it a duplicate
"""
from invoice.lib.vss import dedup
result = dedup(invoice)
logging.info(f'Invoice: {invoice["file"]}, Result: {result}')
view raw docai-vssdag.py hosted with ❤ by GitHub

Vector Search Client (Redis Enterprise)


def dedup(invoice: dict) -> str:
""" Accepts a Python dict that includes a vector of a given invoice file. That vector is then sent into
Redis VSS to determine disposition. If there's another invoice in Redis within a given vector distance of the input invoice,
this invoice is disposed as a duplicate moved to the 'dups' directory. Otherwise, it is disposed as a net new invoice
and moved to the 'processed' directory.
"""
re_var = Variable.get("re", deserialize_json=True, default_var=None)
if (type(re_var) != 'dict'): # hack for an apparent bug in airflow
re_var = json.loads(re_var)
storage_var = Variable.get("storage", deserialize_json=True, default_var=None)
if (type(storage_var) != 'dict'): # hack for an apparent bug in airflow
storage_var = json.loads(storage_var)
creds = redis.UsernamePasswordCredentialProvider(re_var['user'], re_var['pwd'])
client = redis.Redis(host=re_var['host'], port=re_var['port'], credential_provider=creds)
try:
client.ft(re_var['vector_index']).info()
except:
idx_def = IndexDefinition(index_type=IndexType.HASH, prefix=[re_var['vector_prefix']])
schema = [
TextField('customer_name'),
VectorField('vector',
'HNSW',
{ 'TYPE': re_var['vector_type'], 'DIM': re_var['vector_dim'], 'DISTANCE_METRIC': re_var['vector_metric'] }
)
]
client.ft(re_var['vector_index']).create_index(schema, definition=idx_def)
vec = np.array(invoice['vector'], dtype=np.float32).tobytes()
q = Query(f'@customer_name:({invoice["customer_name"]}) => [KNN 1 @vector $query_vec AS score]')\
.return_fields('score')\
.dialect(2)
results = client.ft(re_var['vector_index']).search(q, query_params={'query_vec': vec})
docs = results.docs
if len(docs) > 0 and 1 - float(docs[0].score) > re_var['vector_similarity_bound']:
print(f'score:{float(docs[0].score)}')
shutil.move(invoice['file'], storage_var['dups'])
logging.info(f'Duplicate invoice:{os.path.basename(invoice["file"])}, Similarity:{round(1 - float(docs[0].score), 2)}')
return 'duplicate'
else:
if len(docs) > 0:
similarity = round(1 - float(docs[0].score), 2)
else:
similarity = 'N/A'
client.hset(f'invoice:{uuid.uuid4()}',
mapping={'customer_name': invoice['customer_name'], 'file': os.path.basename(invoice['file']),'vector': vec})
shutil.move(invoice['file'], storage_var['processed'])
logging.info(f'Processed invoice:{os.path.basename(invoice["file"])}, Similarity:{similarity}')
return 'processed'

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.