blog.gopenai.com Open in urlscan Pro
162.159.152.4  Public Scan

Submitted URL: https://blog.gopenai.com/unveiling-the-power-of-mixture-of-workflows-for-financial-data-insights-c97a4634ebfe
Effective URL: https://blog.gopenai.com/unveiling-the-power-of-mixture-of-workflows-for-financial-data-insights-c97a4634ebfe?gi=c1439a15...
Submission: On October 21 via api from US — Scanned from DE

Form analysis 0 forms found in the DOM

Text Content

Open in app

Sign up

Sign in

Write


Sign up

Sign in




UNVEILING THE POWER OF MIXTURE OF WORKFLOWS FOR FINANCIAL DATA INSIGHTS

M K Pavan Kumar

·

Follow

Published in

GoPenAI

·
9 min read
·
Aug 19, 2024

210

1

Listen

Share

In an era where financial data is both abundant and complex, extracting
meaningful insights from banking statements requires more than traditional
analysis methods. The Mixture of Workflows (MoW) approach introduces a powerful
solution by deploying multiple specialized AI agentic workflows — each optimized
for different aspects of financial data interpretation. This article explores
how integrating advanced models like Phi-3, Qwen-2, Gemma-2 and Stablelm2 within
a robust Kafka and Qdrantbased architecture allows for precise, context-aware
analysis of financial (sec 10-k) data. Through hands-on coding examples and
architectural insights, you’ll discover how this innovative approach can
revolutionize financial statement analysis, delivering clarity and actionable
intelligence to drive informed decision-making.


Mixture of Workflows (MoW). created by author, M K Pavan Kumar


THE ARCHITECTURE:

The architecture titled “Mixture of Workflows (MoW)” integrates multiple
workflows, each of which is internally powered by a Corrective RAG Process. The
entire setup, including all Language Models and workflow steps, is driven by
Ollama with the additional support of LlamaIndex and Qdrant to ensure efficient
processing and retrieval.

In this architecture, Azure Blob Storage serves as the data source, feeding into
Kafka for data streaming. The streamed data then reaches a series of workflows,
each leveraging a different model for processing: Phi-3: mini in Workflow-1,
Qwen-2: 1.5b in Workflow-2, gemma-2: 2b in Workflow-3, and stablelm-2: 1.6b in
Workflow-4. These workflows handle specific parts of the incoming request.

The outputs of these workflows are aggregated using Llama3.1: 8b in an
aggregator workflow, ensuring a consolidated response is generated. The
corrective RAG process within each workflow involves retrieving relevant nodes,
checking their relevance, filtering retrieved nodes, and then passing them for
final processing. The results of these processes help determine the best
possible responses to the user’s requests.

Overall, this architecture allows for a robust, scalable, and accurate
processing of requests, driven by the combined power of Ollama, advanced models,
and efficient data retrieval techniques using Qdrant.


C-RAG METHODOLOGY:

Corrective Retrieval Augmented Generation (C-RAG) is an advanced framework
designed to enhance the robustness of Retrieval-Augmented Generation (RAG)
systems by addressing the issue of inaccurate document retrieval, which can
significantly impact the quality of generated responses. The C-RAG system
introduces a retrieval evaluator that assesses the relevance of documents
retrieved for a given query. Based on this assessment, it categorizes the
retrievals into three confidence levels: Correct, Incorrect, and Ambiguous.


taken from the official C-RAG paper, cited below.

If a document is deemed Correct, it undergoes a refinement process to extract
the most relevant knowledge strips, ensuring that only high-quality information
contributes to the generation. For Incorrect documents, the system discards them
and instead performs a web search to gather more reliable external knowledge. In
cases where the confidence is Ambiguous, the system combines both internal
refinement and external searches to produce a more accurate result.

This corrective mechanism makes C-RAG particularly effective in scenarios where
the initial retrieval might be flawed, allowing for a more resilient and
adaptable generation process that can dynamically adjust its approach based on
the quality of the retrieved information. The system leverages lightweight
models like T5-large for the retrieval evaluator, making it efficient without
requiring extensive additional human or LLM-based annotations (ar5iv) (ar5iv).

We have slightly modified the C-RAG implementation as below where in the
ambiguous and error results are directly filtered instead of searching them
again just to keep the code base simple.


created by author, M K Pavan Kumar


STREAM DATA TO QDRANT:

refer to the official documentation of qdrant on how to stream the data to
qdrant using Qdrant’s kafka sink connector.

How to Setup Seamless Data Streaming with Kafka and Qdrant — Qdrant


THE IMPLEMENTATION:

Below is the explanation of the project scaffolding.

data/:

 * Contains the source data necessary for analysis, such as financial reports.
   In this case, it includes a file named
   OpenText-Reports-Q4-F-2024-Results.pdf, which likely serves as an input for
   the workflows.

rag_core/:

 * This directory houses the core operations related to Retrieval-Augmented
   Generation (RAG) powered by Qdrant. It contains:
 * rag_operations.py: Likely contains the primary functions and methods that
   define the operations of the RAG process.

workflows/:

 * This is the heart of the project where the different workflows are defined.
   The workflows are likely designed to analyze different aspects of the banking
   data using various agents.
 * base_financial_analyser_agent.py: This file likely defines the base class or
   functions for the financial analysis agents, serving as a foundation for
   other specialized agents.
 * financial_aggregator_agent.py: This file likely aggregates results from
   different financial analysis workflows to produce a comprehensive output.
 * gemma2_financial_analyser_agent.py: Implements the financial analysis using
   the Gemma-2 model.
 * phi3_financial_analyser_agent.py: Implements the financial analysis using the
   Phi-3 model.
 * qwen2_financial_analyser_agent.py: Implements the financial analysis using
   the Qwen-2 model.
 * stablelm2_financial_analyser_agent.py: Implements the financial analysis
   using the StableLM-2 model.

utils/:

 * prompts.py: This file probably includes the prompts used by the agents during
   the analysis.
 * workflow_events.py: Likely manages events within the workflows, coordinating
   the execution and flow of data.

driver.py:

 * This script likely serves as the main entry point for the project. It could
   be responsible for initializing the workflows, loading the data, and
   executing the analysis.

requirements.txt:

 * This file lists the Python dependencies required for the project, ensuring
   that all necessary packages are installed in the environment for the project
   to function correctly.


project scaffolding and structure


IMPLEMENTATION OF MIXTURE OF WORKFLOWS (MOW)

code for BaseFinancialAnalyserAgent.py

from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.core import Settings
from llama_index.core.query_pipeline import QueryPipeline
from llama_index.core.workflow import (
    Workflow,
    Context,
    StartEvent,
    step
)
from workflows.utils.workflow_events import RetrieveEvent, TextExtractEvent, RelevanceEvalEvent
from rag_core.rag_operations import RAGOperations
from workflows.utils.prompts import DEFAULT_RELEVANCY_PROMPT_TEMPLATE
import logging

logging.basicConfig(level=logging.INFO)


class BaseFinancialAnalyserAgent(Workflow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.eval_llm = Ollama(model='llama3.1', base_url='http://localhost:11434', temperature=0.8,
                               request_timeout=300)
        Settings.embed_model = OllamaEmbedding(model_name='all-minilm:33m', base_url='http://localhost:11434')

    @step(pass_context=True)
    async def ingest_and_retrieve_docs(self, ctx: Context, ev: StartEvent) -> RetrieveEvent:
        try:
            """Ingest step (for ingesting docs and initializing prompt templates)."""
            rag_ops = RAGOperations()
            ctx.data['rag_ops'] = rag_ops
            ctx.data["relevancy_pipeline"] = QueryPipeline(
                chain=[DEFAULT_RELEVANCY_PROMPT_TEMPLATE, self.eval_llm]
            )

            """Retrieve the relevant nodes from the user query."""
            rag_ops: RAGOperations = ctx.data['rag_ops']
            user_query = ev.get("user_query")
            retrieved_nodes = rag_ops.create_retriever().retrieve(str_or_query_bundle=user_query)
            ctx.data["retrieved_nodes"] = retrieved_nodes
            ctx.data["user_query"] = user_query
            return RetrieveEvent(retrieved_nodes=retrieved_nodes)
        except Exception as e:
            logging.error(str(e))

    @step(pass_context=True)
    async def evaluate_relevance(self, ctx: Context, ev: RetrieveEvent) -> RelevanceEvalEvent:
        """Evaluate relevancy of retrieved nodes with the user query."""
        try:
            retrieved_nodes = ev.retrieved_nodes
            user_query = ctx.data["user_query"]

            relevancy_results = []
            for node in retrieved_nodes:
                relevancy = ctx.data["relevancy_pipeline"].run(context_str=node.text, query_str=user_query)
                relevancy_results.append(relevancy.message.content.lower().strip())

            ctx.data["relevancy_results"] = relevancy_results
            return RelevanceEvalEvent(relevant_results=relevancy_results)
        except Exception as e:
            logging.error(str(e))

    @step(pass_context=True)
    async def extract_relevant_text(self, ctx: Context, ev: RelevanceEvalEvent) -> TextExtractEvent:
        """Extract relevant texts from retrieved documents."""
        try:
            retrieved_nodes = ctx.data["retrieved_nodes"]
            relevancy_results = ev.relevant_results

            relevant_texts = [
                retrieved_nodes[i].text
                for i, result in enumerate(relevancy_results)
                if "**yes**" in result
            ]

            result = "\n".join(relevant_texts)
            return TextExtractEvent(relevant_text=result)
        except Exception as e:
            logging.error(str(e))

The above code defines a class called BaseFinancialAnalyserAgent that extends a
workflow, designed to analyze financial documents using a series of asynchronous
steps. It is powered by Ollama's language models and embeddings. Upon
initialization, a global language models is configured: that is llama3.1 for
evaluating the relevance of retrieved documents . The first step,
ingest_and_retrieve_docs, involves ingesting documents, setting up a relevancy
pipeline, and retrieving relevant nodes based on a user query. The second step,
evaluate_relevance, assesses the relevance of these nodes by comparing their
content with the user query using the configured relevancy pipeline. Finally,
the extract_relevant_text step filters and extracts only the relevant text
segments from the retrieved documents, which are identified as relevant based on
the evaluation in the previous step. Throughout the process, any exceptions are
logged to aid in troubleshooting.

Code: Phi3FinancialAnalyserAgent.py

Similar to Phi3FinancialAnalyserAgent.py we can create
Gemma2FinancialAnalyserAgent.py, Qwen2FinancialAnalyserAgent.py,
Stablelm2FinancialAnalyserAgent.py just by changing the query_llm to the
respective models in Ollama .

from llama_index.llms.ollama import Ollama
from llama_index.core import Document, SummaryIndex
from llama_index.core.workflow import (
    Context,
    StopEvent,
    step
)
from workflows.utils.workflow_events import TextExtractEvent
from workflows.base_financial_analyser_agent import BaseFinancialAnalyserAgent
import logging

logging.basicConfig(level=logging.INFO)


class Phi3FinancialAnalyserAgent(BaseFinancialAnalyserAgent):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.query_llm = Ollama(model='phi3:mini',
                                base_url='http://localhost:11434',
                                temperature=0.2, request_timeout=300,
                                system_prompt="You are an AI Assistant to answer any questions from user. "
                                              "Strictly use the context provided to answer the user to answer the query"
                                              "don't consider your prior knowledge to answer, if you dont find the answer "
                                              "please respond 'I don't know.'")

    @step(pass_context=True)
    async def query_result(self, ctx: Context, ev: TextExtractEvent) -> StopEvent:
        """Get result with relevant text."""
        try:
            relevant_text = ev.relevant_text
            user_query = ctx.data["user_query"]

            documents = [Document(text=relevant_text)]
            # print(f"Documents: {documents}")
            index = SummaryIndex.from_documents(documents)
            query_engine = index.as_query_engine(llm=self.query_llm)
            result = query_engine.query(user_query)
            # print(f"Result: {result.response}")
            return StopEvent(result=str(result.response))
        except Exception as e:
            logging.error(str(e))

The code defines a class called Phi3FinancialAnalyserAgent, which extends the
BaseFinancialAnalyserAgent to perform financial analysis using the phi3:mini
model. The query_llm is initialized with specific parameters to ensure that the
language model strictly adheres to the context provided when answering user
queries. The query_result method, which is an asynchronous step in the workflow,
takes the relevant text extracted in the previous steps and processes it to
generate a response to the user's query. This is achieved by creating a Document
from the relevant text, constructing a SummaryIndex from these documents, and
then using a query engine associated with the index to query the user's input
using the phi3:mini model. The final result is captured in a StopEvent and is
logged in case of any exceptions during the process.

code: workflows/utils/prompts.py

This prompt template is taken from the C-RAG code base of llamaindex

from llama_index.core import PromptTemplate

DEFAULT_RELEVANCY_PROMPT_TEMPLATE = PromptTemplate(
    template="""As a grader, your task is to evaluate the relevance of a document retrieved in response to a user's question.

    Retrieved Document:
    -------------------
    {context_str}

    User Question:
    --------------
    {query_str}

    Evaluation Criteria:
    - Consider whether the document contains keywords or topics related to the user's question.
    - The evaluation should not be overly stringent; the primary objective is to identify and filter out clearly irrelevant retrievals.

    Decision:
    - Assign a binary score to indicate the document's relevance.
    - Use 'yes' if the document is relevant to the question, or 'no' if it is not.

    Please provide your binary score ('yes' or 'no') below to indicate the document's relevance to the user question."""
)

code: workflows/utils/workflow_events.py

from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore


class RetrieveEvent(Event):
    """Retrieve event (gets retrieved nodes)."""

    retrieved_nodes: list[NodeWithScore]


class RelevanceEvalEvent(Event):
    """Relevance evaluation event (gets results of relevance evaluation)."""

    relevant_results: list[str]


class TextExtractEvent(Event):
    """Text extract event. Extracts relevant text and concatenates."""

    relevant_text: str

code: project_root/driver.py

from workflows.phi3_financial_analyser_agent import Phi3FinancialAnalyserAgent
from workflows.gemma2_financial_analyser_agent import Gemma2FinancialAnalyserAgent
from workflows.qwen2_financial_analyser_agent import Qwen2FinancialAnalyserAgent
from workflows.stablelm2_financial_analyser_agent import Stablelm2FinancialAnalyserAgent
from workflows.financial_aggregator_agent import FinancialAggregatorAgent
import nest_asyncio

# Apply the nest_asyncio
nest_asyncio.apply()


async def main():
    w1 = Phi3FinancialAnalyserAgent(timeout=300, verbose=True)
    w2 = Gemma2FinancialAnalyserAgent(timeout=300, verbose=True)
    w3 = Qwen2FinancialAnalyserAgent(timeout=300, verbose=True)
    w4 = Stablelm2FinancialAnalyserAgent(timeout=300, verbose=True)
    w5 = FinancialAggregatorAgent(timeout=300, verbose=True)
    user_query = "what are Fourth Quarter Highlights?"

    result_1 = await w1.run(user_query=user_query)
    result_2 = await w2.run(user_query=user_query)
    result_3 = await w3.run(user_query=user_query)
    result_4 = await w4.run(user_query=user_query)
    print(f"Phi3 Result: {result_1}")
    print(f"Gemma2 Result: {result_2}")
    print(f"Qwen2 Result: {result_3}")
    print(f"StableLM2 Result: {result_4}")

    summary = await w5.run(slm_results=[result_1, result_2, result_3, result_4])

    print(f"Final Summary: {summary}")


if __name__ == '__main__':
    import asyncio

    asyncio.run(main=main())

The code defines an asynchronous main function that runs a series of financial
analysis workflows using different agents (Phi3FinancialAnalyserAgent,
Gemma2FinancialAnalyserAgent, Qwen2FinancialAnalyserAgent, and
Stablelm2FinancialAnalyserAgent) to process a user query regarding Fourth
Quarter Highlights. Each agent processes the query independently, and their
results are printed. Finally, a FinancialAggregatorAgent aggregates these
results into a final summary, which is then printed. The program uses
nest_asyncio to allow the asynchronous tasks to run smoothly in environments
that typically do not support nested event loops. The entire process is executed
within an asyncio event loop when the script is run.


THE OUTPUT:


output showing the execution of independent SLM workflow and LLM aggregator


THE CONCLUSION:

The Mixture of Workflows (MoW) architecture, powered by Ollama, LlamaIndex, and
Qdrant, stands out for its innovative integration of Corrective Retrieval
Augmented Generation (C-RAG) mechanisms, ensuring high accuracy and relevance in
data processing. By employing a modular and adaptive approach, MoW dynamically
corrects and refines information retrieval, making it a powerful and reliable
solution for complex data analysis tasks, particularly in financial contexts.
This architecture exemplifies the future of data processing, where intelligent
systems are capable of self-correction and delivering consistently high-quality
insights.

References:

Corrective rag — LlamaIndex

Corrective RAG (CRAG) (langchain-ai.github.io)

2401.15884 (arxiv.org)




SIGN UP TO DISCOVER HUMAN STORIES THAT DEEPEN YOUR UNDERSTANDING OF THE WORLD.


FREE



Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.


Sign up for free


MEMBERSHIP



Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app


Try for 5 $/month
Llamaindex
Qdrant
Agents
Large Language Models
Small Language Model


210

210

1


Follow



WRITTEN BY M K PAVAN KUMAR

2K Followers
·Writer for

GoPenAI

Data Scientist and Lead GenAI by profession, blogger by passion

Follow




MORE FROM M K PAVAN KUMAR AND GOPENAI

M K Pavan Kumar

in

Towards Dev


BUILDING VISIONRAG: AI-POWERED IMAGE SEARCH WITH LLAMA 3.2, QDRANT AND LITSERVE


VISIONRAG COMBINES POWERFUL AI TOOLS TO DELIVER A SEAMLESS IMAGE-BASED SEARCH
EXPERIENCE. USERS UPLOAD IMAGES LIKE ARCHITECTURAL DIAGRAMS…

Oct 5
127
1



kirouane Ayoub

in

GoPenAI


FINE-TUNING EMBEDDINGS FOR SPECIFIC DOMAINS: A COMPREHENSIVE GUIDE


IMAGINE YOU’RE BUILDING A QUESTION ANSWERING SYSTEM FOR A MEDICAL DOMAIN. YOU
WANT TO ENSURE IT CAN ACCURATELY RETRIEVE RELEVANT MEDICAL…

Sep 30
441
2



Paras Madan

in

GoPenAI


BUILDING A MULTI PDF RAG CHATBOT: LANGCHAIN, STREAMLIT WITH CODE


TALKING TO BIG PDF’S IS COOL. YOU CAN CHAT WITH YOUR NOTES, BOOKS AND DOCUMENTS
ETC. THIS BLOG POST WILL HELP YOU BUILD A MULTI RAG…

Jun 6
769
4



M K Pavan Kumar

in

Towards Dev


ADVANCED RAG SOLUTIONS WITH LLAMA DEPLOY, LLAMA WORKFLOWS AND QDRANT HYBRID
SEARCH


IN THIS ARTICLE, I EXPLORE HOW TO LEVERAGE THE COMBINED CAPABILITIES OF LLAMA
DEPLOY, LLAMA WORKFLOWS, AND QDRANT’S HYBRID SEARCH TO BUILD…

Sep 10
291


See all from M K Pavan Kumar
See all from GoPenAI



RECOMMENDED FROM MEDIUM

Pavan Nagula


USER-CENTRIC RAG: TRANSFORMING RAG WITH LLAMAINDEX MULTI-AGENT SYSTEM AND QDRANT


RETRIEVAL-AUGMENTED GENERATION (RAG) MODELS HAVE EVOLVED SIGNIFICANTLY OVER
TIME. INITIALLY, TRADITIONAL RAG SYSTEMS FACED NUMEROUS…

Aug 17
217



Pankaj


CRAWL4AI: YOUR ULTIMATE ASYNCHRONOUS WEB CRAWLING COMPANION 🕷️🤖


ASYNCHRONOUS WEB CRAWLING COMPANION


Oct 6
341
1




LISTS


NATURAL LANGUAGE PROCESSING

1765 stories·1365 saves


AI REGULATION

6 stories·592 saves


CHATGPT PROMPTS

50 stories·2118 saves


GENERATIVE AI RECOMMENDED READING

52 stories·1445 saves


Pavan Emani

in

Art of Data Engineering


MASTERING ARCHITECTURE DIAGRAMS AND TECHNICAL PRESENTATIONS: A DATA ARCHITECT’S
GUIDE


HOW TO CREATE IMPACTFUL TECHNICAL PRESENTATIONS AND DIAGRAMS THAT EFFECTIVELY
COMMUNICATE COMPLEX IDEAS WITH PRECISION AND STORYTELLING


Sep 20
505
5



Kamal Dhungana


IMPLEMENTING HUMAN-IN-THE-LOOP WITH LANGGRAPH


STREAMLIT APP — HIL (AGENT FRAMEWORK — LANGGRAPH)


Jul 16
205



Tirupati Rao

in

Generative AI


I TESTED 10 AI CODING ASSISTANTS, AND HOLY COW, YOU WON’T BELIEVE WHAT HAPPENED!


“IT WAS SUPPOSED TO BE IMPOSSIBLE.”


Sep 29
398
22



Ferry Djaja


CREATE A RAG AGENT WITH LANGGRAPH TO EXTRACT THE INFORMATION FROM A PDF FILE


IN THIS BLOG, WE WILL BUILD A SIMPLE AGENT TO EXTRACT THE INFORMATION FROM A PDF
FILE WITH LANGGRAPH. WE WILL BE USING GPT-4O TO EXTRACT…


Sep 23
109


See more recommendations

Help

Status

About

Careers

Press

Blog

Privacy

Terms

Text to speech

Teams


To make Medium work, we log user data. By using Medium, you agree to our Privacy
Policy, including cookie policy.