Monday, January 15, 2024

Change Data Capture w/Redis Enterprise

Summary

Redis Enterprise has the capability for continuous data integration with 3rd party data sources.  This capability is enabled via the Redis Data Integration (RDI) product.  With RDI, change data capture (CDC) can be achieved with all the major SQL databases for ingress.  Similarly, in the other direction, updates to Redis can be continuously written to 3rd party targets via the write-behind functionality of RDI.  

This post covers a demo-grade environment of Redis Enterprise + RDI with ingress and write-behind integrations with the following SQL databases:  Oracle, MS SQL, Postgres, and MySQL.  All components are containerized and run from a Docker environment.

Architecture


Ingress



Write-behind



Code Snippets

Docker Compose - Redis Enterprise Node


#RE Cluster - Node 1
re1:
image: redislabs/redis:latest
container_name: re1
restart: unless-stopped
tty: true
cap_add:
- sys_resource
ports:
- 12000
- 8443
- 9443
- 8070
profiles: ["mysql", "postgres", "sqlserver", "oracle_lm", "oracle_xs"]
networks:
re_cluster:
ipv4_address: 192.168.20.2
view raw rdi-re.yml hosted with ❤ by GitHub

Docker Compose - Oracle Enterprise


oracle_lm:
image: container-registry.oracle.com/database/enterprise:latest
container_name: oracle_lm
ports:
- 1521
depends_on:
- re1
- re2
- re3
environment:
- ORACLE_SID=ORCLCDB
- ORACLE_EDITION=enterprise
- ORACLE_PWD=Password1
- INIT_SGA_SIZE=1024
- INIT_PGA_SIZE=1024
volumes:
- $PWD/conf/oracle_lm/scripts:/opt/oracle/scripts/startup
profiles: ["oracle_lm"]
networks:
- re_cluster

Docker Compose - Debezium


debezium:
build:
context: $PWD/conf/debezium
args:
INSTANT_CLIENT: $INSTANT_CLIENT
container_name: debezium
volumes:
- $PWD/conf/$SOURCE_DB/application.properties:/debezium/conf/application.properties
profiles: ["debezium"]
networks:
- re_cluster

RDI Ingress w/Prometheus Integration


echo "*** Build Redis DI DB for Ingress ***"
./redis-di create --silent --cluster-host 192.168.20.2 --cluster-api-port 9443 --cluster-user redis@redis.com \
--cluster-password redis --rdi-port 13000 --rdi-password redis
echo "*** Deploy Redis DI for Ingress ***"
./redis-di deploy --dir ./conf/$SOURCE_DB/ingest --rdi-host 192.168.20.3 --rdi-port 13000 --rdi-password redis
echo "*** Start Debezium ***"
SOURCE_DB=$SOURCE_DB INSTANT_CLIENT=$INSTANT_CLIENT docker compose --profile debezium up -d
echo "*** Start Redis DI Monitor ***"
./redis-di monitor --rdi-host 192.168.20.3 --rdi-port 13000 --rdi-password redis &

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.

Monday, January 8, 2024

Document AI with Apache Airflow

Summary

In this post, I cover an approach to a document AI problem using a task flow implemented in Apache Airflow.  The particular problem is around the de-duplication of invoices.  This comes up in payment provider space.  I use Azure AI Document Intelligence for OCR, Azure OpenAI for vector embeddings, and Redis Enterprise for vector search.

Architecture



Code Snippets


File Sensor DAG


@task.sensor(task_id="check_inbox", mode="reschedule", timeout=10, executor_config=executor_config_volume_mount)
def check_inbox() -> PokeReturnValue:
""" File sensor for invoices inbox. If files are detected in the inbox, a cascade processing tasks are triggered:
OCR, Embed, Dedup.
"""
storage_var = Variable.get("storage", deserialize_json=True, default_var=None)
if (type(storage_var) != 'dict'): # hack for an apparent bug in airflow
storage_var = json.loads(storage_var)
inbox_path = storage_var['inbox']
inbox_files = list(map(lambda file: os.path.join(inbox_path, file), os.listdir(inbox_path)))
logging.info(f'Number of files to be processed: {len(inbox_files)}')
if len(inbox_files) > 0:
return PokeReturnValue(is_done=True, xcom_value=inbox_files)
else:
return PokeReturnValue(is_done=False)

OCR DAG


@task(task_id='parse_invoice', executor_config=executor_config_volume_mount)
def parse_invoice(inbox_file: str) -> dict:
""" OCR is performed on each of invoices in the inbox. The result of OCR is space delimited string of a
configurable number of invoice fields.
"""
from invoice.lib.ocr import ocr
invoice = ocr(inbox_file)
invoice['file'] = inbox_file
logging.info(f'Invoice: {pprint.pformat(invoice)}')
return invoice
view raw docai-ocrdag.py hosted with ❤ by GitHub

OCR Client (Azure AI Doc Intelligence)


@retry(wait=wait_random_exponential(min=10, max=60), stop=stop_after_attempt(3))
def ocr(filepath: str) -> dict:
""" Executes Azure Form Recognized OCR and returns a Python dict that includes a text string
of space-separated values from the input invoice.
"""
formrec_var = Variable.get("formrec", deserialize_json=True, default_var=None)
if (type(formrec_var) != 'dict'): # hack for an apparent bug in airflow
formrec_var = json.loads(formrec_var)
key = formrec_var["key"]
endpoint = formrec_var["endpoint"]
vector_fields = formrec_var["fields"]
client = DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key))
with open(filepath, "rb") as f:
poller = client.begin_analyze_document("prebuilt-invoice", document=f, locale="en-US")
invoice = (poller.result()).documents[0]
return stringify(invoice, vector_fields)

Embedding DAG


@task(task_id='embed_invoice')
def embed_invoice(invoice: dict) -> dict:
""" Accepts a invoice dict that includes a text field of the OCR output
and adds an OpenAI embedding (array of floats) to that dict
"""
from invoice.lib.embed import get_embedding
vector = get_embedding(invoice['ocr'])
invoice['vector'] = vector
logging.info(f'Invoice: {invoice["file"]}, Vector len: {invoice["vector"]}')
return invoice

Embedding Client (Azure OpenAI)


@retry(wait=wait_random_exponential(min=3, max=100), stop=stop_after_attempt(10))
def get_embedding(text: str) -> [float]:
response = openai.Embedding.create(
input=text,
engine="EmbeddingModel"
)
return response['data'][0]['embedding']

Vector Search DAG


@task(task_id='dedup_invoice', executor_config=executor_config_volume_mount)
def dedup_invoice(invoice: dict) -> None:
""" Sends the invoice dict into a Redis VSS lookup to determine disposition - process or call it a duplicate
"""
from invoice.lib.vss import dedup
result = dedup(invoice)
logging.info(f'Invoice: {invoice["file"]}, Result: {result}')
view raw docai-vssdag.py hosted with ❤ by GitHub

Vector Search Client (Redis Enterprise)


def dedup(invoice: dict) -> str:
""" Accepts a Python dict that includes a vector of a given invoice file. That vector is then sent into
Redis VSS to determine disposition. If there's another invoice in Redis within a given vector distance of the input invoice,
this invoice is disposed as a duplicate moved to the 'dups' directory. Otherwise, it is disposed as a net new invoice
and moved to the 'processed' directory.
"""
re_var = Variable.get("re", deserialize_json=True, default_var=None)
if (type(re_var) != 'dict'): # hack for an apparent bug in airflow
re_var = json.loads(re_var)
storage_var = Variable.get("storage", deserialize_json=True, default_var=None)
if (type(storage_var) != 'dict'): # hack for an apparent bug in airflow
storage_var = json.loads(storage_var)
creds = redis.UsernamePasswordCredentialProvider(re_var['user'], re_var['pwd'])
client = redis.Redis(host=re_var['host'], port=re_var['port'], credential_provider=creds)
try:
client.ft(re_var['vector_index']).info()
except:
idx_def = IndexDefinition(index_type=IndexType.HASH, prefix=[re_var['vector_prefix']])
schema = [
TextField('customer_name'),
VectorField('vector',
'HNSW',
{ 'TYPE': re_var['vector_type'], 'DIM': re_var['vector_dim'], 'DISTANCE_METRIC': re_var['vector_metric'] }
)
]
client.ft(re_var['vector_index']).create_index(schema, definition=idx_def)
vec = np.array(invoice['vector'], dtype=np.float32).tobytes()
q = Query(f'@customer_name:({invoice["customer_name"]}) => [KNN 1 @vector $query_vec AS score]')\
.return_fields('score')\
.dialect(2)
results = client.ft(re_var['vector_index']).search(q, query_params={'query_vec': vec})
docs = results.docs
if len(docs) > 0 and 1 - float(docs[0].score) > re_var['vector_similarity_bound']:
print(f'score:{float(docs[0].score)}')
shutil.move(invoice['file'], storage_var['dups'])
logging.info(f'Duplicate invoice:{os.path.basename(invoice["file"])}, Similarity:{round(1 - float(docs[0].score), 2)}')
return 'duplicate'
else:
if len(docs) > 0:
similarity = round(1 - float(docs[0].score), 2)
else:
similarity = 'N/A'
client.hset(f'invoice:{uuid.uuid4()}',
mapping={'customer_name': invoice['customer_name'], 'file': os.path.basename(invoice['file']),'vector': vec})
shutil.move(invoice['file'], storage_var['processed'])
logging.info(f'Processed invoice:{os.path.basename(invoice["file"])}, Similarity:{similarity}')
return 'processed'

Source


Copyright ©1993-2024 Joey E Whelan, All rights reserved.