blog.gopenai.com
Open in
urlscan Pro
162.159.152.4
Public Scan
Submitted URL: https://blog.gopenai.com/unveiling-the-power-of-mixture-of-workflows-for-financial-data-insights-c97a4634ebfe
Effective URL: https://blog.gopenai.com/unveiling-the-power-of-mixture-of-workflows-for-financial-data-insights-c97a4634ebfe?gi=c1439a15...
Submission: On October 21 via api from US — Scanned from DE
Effective URL: https://blog.gopenai.com/unveiling-the-power-of-mixture-of-workflows-for-financial-data-insights-c97a4634ebfe?gi=c1439a15...
Submission: On October 21 via api from US — Scanned from DE
Form analysis
0 forms found in the DOMText Content
Open in app Sign up Sign in Write Sign up Sign in UNVEILING THE POWER OF MIXTURE OF WORKFLOWS FOR FINANCIAL DATA INSIGHTS M K Pavan Kumar · Follow Published in GoPenAI · 9 min read · Aug 19, 2024 210 1 Listen Share In an era where financial data is both abundant and complex, extracting meaningful insights from banking statements requires more than traditional analysis methods. The Mixture of Workflows (MoW) approach introduces a powerful solution by deploying multiple specialized AI agentic workflows — each optimized for different aspects of financial data interpretation. This article explores how integrating advanced models like Phi-3, Qwen-2, Gemma-2 and Stablelm2 within a robust Kafka and Qdrantbased architecture allows for precise, context-aware analysis of financial (sec 10-k) data. Through hands-on coding examples and architectural insights, you’ll discover how this innovative approach can revolutionize financial statement analysis, delivering clarity and actionable intelligence to drive informed decision-making. Mixture of Workflows (MoW). created by author, M K Pavan Kumar THE ARCHITECTURE: The architecture titled “Mixture of Workflows (MoW)” integrates multiple workflows, each of which is internally powered by a Corrective RAG Process. The entire setup, including all Language Models and workflow steps, is driven by Ollama with the additional support of LlamaIndex and Qdrant to ensure efficient processing and retrieval. In this architecture, Azure Blob Storage serves as the data source, feeding into Kafka for data streaming. The streamed data then reaches a series of workflows, each leveraging a different model for processing: Phi-3: mini in Workflow-1, Qwen-2: 1.5b in Workflow-2, gemma-2: 2b in Workflow-3, and stablelm-2: 1.6b in Workflow-4. These workflows handle specific parts of the incoming request. The outputs of these workflows are aggregated using Llama3.1: 8b in an aggregator workflow, ensuring a consolidated response is generated. The corrective RAG process within each workflow involves retrieving relevant nodes, checking their relevance, filtering retrieved nodes, and then passing them for final processing. The results of these processes help determine the best possible responses to the user’s requests. Overall, this architecture allows for a robust, scalable, and accurate processing of requests, driven by the combined power of Ollama, advanced models, and efficient data retrieval techniques using Qdrant. C-RAG METHODOLOGY: Corrective Retrieval Augmented Generation (C-RAG) is an advanced framework designed to enhance the robustness of Retrieval-Augmented Generation (RAG) systems by addressing the issue of inaccurate document retrieval, which can significantly impact the quality of generated responses. The C-RAG system introduces a retrieval evaluator that assesses the relevance of documents retrieved for a given query. Based on this assessment, it categorizes the retrievals into three confidence levels: Correct, Incorrect, and Ambiguous. taken from the official C-RAG paper, cited below. If a document is deemed Correct, it undergoes a refinement process to extract the most relevant knowledge strips, ensuring that only high-quality information contributes to the generation. For Incorrect documents, the system discards them and instead performs a web search to gather more reliable external knowledge. In cases where the confidence is Ambiguous, the system combines both internal refinement and external searches to produce a more accurate result. This corrective mechanism makes C-RAG particularly effective in scenarios where the initial retrieval might be flawed, allowing for a more resilient and adaptable generation process that can dynamically adjust its approach based on the quality of the retrieved information. The system leverages lightweight models like T5-large for the retrieval evaluator, making it efficient without requiring extensive additional human or LLM-based annotations (ar5iv) (ar5iv). We have slightly modified the C-RAG implementation as below where in the ambiguous and error results are directly filtered instead of searching them again just to keep the code base simple. created by author, M K Pavan Kumar STREAM DATA TO QDRANT: refer to the official documentation of qdrant on how to stream the data to qdrant using Qdrant’s kafka sink connector. How to Setup Seamless Data Streaming with Kafka and Qdrant — Qdrant THE IMPLEMENTATION: Below is the explanation of the project scaffolding. data/: * Contains the source data necessary for analysis, such as financial reports. In this case, it includes a file named OpenText-Reports-Q4-F-2024-Results.pdf, which likely serves as an input for the workflows. rag_core/: * This directory houses the core operations related to Retrieval-Augmented Generation (RAG) powered by Qdrant. It contains: * rag_operations.py: Likely contains the primary functions and methods that define the operations of the RAG process. workflows/: * This is the heart of the project where the different workflows are defined. The workflows are likely designed to analyze different aspects of the banking data using various agents. * base_financial_analyser_agent.py: This file likely defines the base class or functions for the financial analysis agents, serving as a foundation for other specialized agents. * financial_aggregator_agent.py: This file likely aggregates results from different financial analysis workflows to produce a comprehensive output. * gemma2_financial_analyser_agent.py: Implements the financial analysis using the Gemma-2 model. * phi3_financial_analyser_agent.py: Implements the financial analysis using the Phi-3 model. * qwen2_financial_analyser_agent.py: Implements the financial analysis using the Qwen-2 model. * stablelm2_financial_analyser_agent.py: Implements the financial analysis using the StableLM-2 model. utils/: * prompts.py: This file probably includes the prompts used by the agents during the analysis. * workflow_events.py: Likely manages events within the workflows, coordinating the execution and flow of data. driver.py: * This script likely serves as the main entry point for the project. It could be responsible for initializing the workflows, loading the data, and executing the analysis. requirements.txt: * This file lists the Python dependencies required for the project, ensuring that all necessary packages are installed in the environment for the project to function correctly. project scaffolding and structure IMPLEMENTATION OF MIXTURE OF WORKFLOWS (MOW) code for BaseFinancialAnalyserAgent.py from llama_index.llms.ollama import Ollama from llama_index.embeddings.ollama import OllamaEmbedding from llama_index.core import Settings from llama_index.core.query_pipeline import QueryPipeline from llama_index.core.workflow import ( Workflow, Context, StartEvent, step ) from workflows.utils.workflow_events import RetrieveEvent, TextExtractEvent, RelevanceEvalEvent from rag_core.rag_operations import RAGOperations from workflows.utils.prompts import DEFAULT_RELEVANCY_PROMPT_TEMPLATE import logging logging.basicConfig(level=logging.INFO) class BaseFinancialAnalyserAgent(Workflow): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.eval_llm = Ollama(model='llama3.1', base_url='http://localhost:11434', temperature=0.8, request_timeout=300) Settings.embed_model = OllamaEmbedding(model_name='all-minilm:33m', base_url='http://localhost:11434') @step(pass_context=True) async def ingest_and_retrieve_docs(self, ctx: Context, ev: StartEvent) -> RetrieveEvent: try: """Ingest step (for ingesting docs and initializing prompt templates).""" rag_ops = RAGOperations() ctx.data['rag_ops'] = rag_ops ctx.data["relevancy_pipeline"] = QueryPipeline( chain=[DEFAULT_RELEVANCY_PROMPT_TEMPLATE, self.eval_llm] ) """Retrieve the relevant nodes from the user query.""" rag_ops: RAGOperations = ctx.data['rag_ops'] user_query = ev.get("user_query") retrieved_nodes = rag_ops.create_retriever().retrieve(str_or_query_bundle=user_query) ctx.data["retrieved_nodes"] = retrieved_nodes ctx.data["user_query"] = user_query return RetrieveEvent(retrieved_nodes=retrieved_nodes) except Exception as e: logging.error(str(e)) @step(pass_context=True) async def evaluate_relevance(self, ctx: Context, ev: RetrieveEvent) -> RelevanceEvalEvent: """Evaluate relevancy of retrieved nodes with the user query.""" try: retrieved_nodes = ev.retrieved_nodes user_query = ctx.data["user_query"] relevancy_results = [] for node in retrieved_nodes: relevancy = ctx.data["relevancy_pipeline"].run(context_str=node.text, query_str=user_query) relevancy_results.append(relevancy.message.content.lower().strip()) ctx.data["relevancy_results"] = relevancy_results return RelevanceEvalEvent(relevant_results=relevancy_results) except Exception as e: logging.error(str(e)) @step(pass_context=True) async def extract_relevant_text(self, ctx: Context, ev: RelevanceEvalEvent) -> TextExtractEvent: """Extract relevant texts from retrieved documents.""" try: retrieved_nodes = ctx.data["retrieved_nodes"] relevancy_results = ev.relevant_results relevant_texts = [ retrieved_nodes[i].text for i, result in enumerate(relevancy_results) if "**yes**" in result ] result = "\n".join(relevant_texts) return TextExtractEvent(relevant_text=result) except Exception as e: logging.error(str(e)) The above code defines a class called BaseFinancialAnalyserAgent that extends a workflow, designed to analyze financial documents using a series of asynchronous steps. It is powered by Ollama's language models and embeddings. Upon initialization, a global language models is configured: that is llama3.1 for evaluating the relevance of retrieved documents . The first step, ingest_and_retrieve_docs, involves ingesting documents, setting up a relevancy pipeline, and retrieving relevant nodes based on a user query. The second step, evaluate_relevance, assesses the relevance of these nodes by comparing their content with the user query using the configured relevancy pipeline. Finally, the extract_relevant_text step filters and extracts only the relevant text segments from the retrieved documents, which are identified as relevant based on the evaluation in the previous step. Throughout the process, any exceptions are logged to aid in troubleshooting. Code: Phi3FinancialAnalyserAgent.py Similar to Phi3FinancialAnalyserAgent.py we can create Gemma2FinancialAnalyserAgent.py, Qwen2FinancialAnalyserAgent.py, Stablelm2FinancialAnalyserAgent.py just by changing the query_llm to the respective models in Ollama . from llama_index.llms.ollama import Ollama from llama_index.core import Document, SummaryIndex from llama_index.core.workflow import ( Context, StopEvent, step ) from workflows.utils.workflow_events import TextExtractEvent from workflows.base_financial_analyser_agent import BaseFinancialAnalyserAgent import logging logging.basicConfig(level=logging.INFO) class Phi3FinancialAnalyserAgent(BaseFinancialAnalyserAgent): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.query_llm = Ollama(model='phi3:mini', base_url='http://localhost:11434', temperature=0.2, request_timeout=300, system_prompt="You are an AI Assistant to answer any questions from user. " "Strictly use the context provided to answer the user to answer the query" "don't consider your prior knowledge to answer, if you dont find the answer " "please respond 'I don't know.'") @step(pass_context=True) async def query_result(self, ctx: Context, ev: TextExtractEvent) -> StopEvent: """Get result with relevant text.""" try: relevant_text = ev.relevant_text user_query = ctx.data["user_query"] documents = [Document(text=relevant_text)] # print(f"Documents: {documents}") index = SummaryIndex.from_documents(documents) query_engine = index.as_query_engine(llm=self.query_llm) result = query_engine.query(user_query) # print(f"Result: {result.response}") return StopEvent(result=str(result.response)) except Exception as e: logging.error(str(e)) The code defines a class called Phi3FinancialAnalyserAgent, which extends the BaseFinancialAnalyserAgent to perform financial analysis using the phi3:mini model. The query_llm is initialized with specific parameters to ensure that the language model strictly adheres to the context provided when answering user queries. The query_result method, which is an asynchronous step in the workflow, takes the relevant text extracted in the previous steps and processes it to generate a response to the user's query. This is achieved by creating a Document from the relevant text, constructing a SummaryIndex from these documents, and then using a query engine associated with the index to query the user's input using the phi3:mini model. The final result is captured in a StopEvent and is logged in case of any exceptions during the process. code: workflows/utils/prompts.py This prompt template is taken from the C-RAG code base of llamaindex from llama_index.core import PromptTemplate DEFAULT_RELEVANCY_PROMPT_TEMPLATE = PromptTemplate( template="""As a grader, your task is to evaluate the relevance of a document retrieved in response to a user's question. Retrieved Document: ------------------- {context_str} User Question: -------------- {query_str} Evaluation Criteria: - Consider whether the document contains keywords or topics related to the user's question. - The evaluation should not be overly stringent; the primary objective is to identify and filter out clearly irrelevant retrievals. Decision: - Assign a binary score to indicate the document's relevance. - Use 'yes' if the document is relevant to the question, or 'no' if it is not. Please provide your binary score ('yes' or 'no') below to indicate the document's relevance to the user question.""" ) code: workflows/utils/workflow_events.py from llama_index.core.workflow import Event from llama_index.core.schema import NodeWithScore class RetrieveEvent(Event): """Retrieve event (gets retrieved nodes).""" retrieved_nodes: list[NodeWithScore] class RelevanceEvalEvent(Event): """Relevance evaluation event (gets results of relevance evaluation).""" relevant_results: list[str] class TextExtractEvent(Event): """Text extract event. Extracts relevant text and concatenates.""" relevant_text: str code: project_root/driver.py from workflows.phi3_financial_analyser_agent import Phi3FinancialAnalyserAgent from workflows.gemma2_financial_analyser_agent import Gemma2FinancialAnalyserAgent from workflows.qwen2_financial_analyser_agent import Qwen2FinancialAnalyserAgent from workflows.stablelm2_financial_analyser_agent import Stablelm2FinancialAnalyserAgent from workflows.financial_aggregator_agent import FinancialAggregatorAgent import nest_asyncio # Apply the nest_asyncio nest_asyncio.apply() async def main(): w1 = Phi3FinancialAnalyserAgent(timeout=300, verbose=True) w2 = Gemma2FinancialAnalyserAgent(timeout=300, verbose=True) w3 = Qwen2FinancialAnalyserAgent(timeout=300, verbose=True) w4 = Stablelm2FinancialAnalyserAgent(timeout=300, verbose=True) w5 = FinancialAggregatorAgent(timeout=300, verbose=True) user_query = "what are Fourth Quarter Highlights?" result_1 = await w1.run(user_query=user_query) result_2 = await w2.run(user_query=user_query) result_3 = await w3.run(user_query=user_query) result_4 = await w4.run(user_query=user_query) print(f"Phi3 Result: {result_1}") print(f"Gemma2 Result: {result_2}") print(f"Qwen2 Result: {result_3}") print(f"StableLM2 Result: {result_4}") summary = await w5.run(slm_results=[result_1, result_2, result_3, result_4]) print(f"Final Summary: {summary}") if __name__ == '__main__': import asyncio asyncio.run(main=main()) The code defines an asynchronous main function that runs a series of financial analysis workflows using different agents (Phi3FinancialAnalyserAgent, Gemma2FinancialAnalyserAgent, Qwen2FinancialAnalyserAgent, and Stablelm2FinancialAnalyserAgent) to process a user query regarding Fourth Quarter Highlights. Each agent processes the query independently, and their results are printed. Finally, a FinancialAggregatorAgent aggregates these results into a final summary, which is then printed. The program uses nest_asyncio to allow the asynchronous tasks to run smoothly in environments that typically do not support nested event loops. The entire process is executed within an asyncio event loop when the script is run. THE OUTPUT: output showing the execution of independent SLM workflow and LLM aggregator THE CONCLUSION: The Mixture of Workflows (MoW) architecture, powered by Ollama, LlamaIndex, and Qdrant, stands out for its innovative integration of Corrective Retrieval Augmented Generation (C-RAG) mechanisms, ensuring high accuracy and relevance in data processing. By employing a modular and adaptive approach, MoW dynamically corrects and refines information retrieval, making it a powerful and reliable solution for complex data analysis tasks, particularly in financial contexts. This architecture exemplifies the future of data processing, where intelligent systems are capable of self-correction and delivering consistently high-quality insights. References: Corrective rag — LlamaIndex Corrective RAG (CRAG) (langchain-ai.github.io) 2401.15884 (arxiv.org) SIGN UP TO DISCOVER HUMAN STORIES THAT DEEPEN YOUR UNDERSTANDING OF THE WORLD. FREE Distraction-free reading. No ads. Organize your knowledge with lists and highlights. Tell your story. Find your audience. Sign up for free MEMBERSHIP Read member-only stories Support writers you read most Earn money for your writing Listen to audio narrations Read offline with the Medium app Try for 5 $/month Llamaindex Qdrant Agents Large Language Models Small Language Model 210 210 1 Follow WRITTEN BY M K PAVAN KUMAR 2K Followers ·Writer for GoPenAI Data Scientist and Lead GenAI by profession, blogger by passion Follow MORE FROM M K PAVAN KUMAR AND GOPENAI M K Pavan Kumar in Towards Dev BUILDING VISIONRAG: AI-POWERED IMAGE SEARCH WITH LLAMA 3.2, QDRANT AND LITSERVE VISIONRAG COMBINES POWERFUL AI TOOLS TO DELIVER A SEAMLESS IMAGE-BASED SEARCH EXPERIENCE. USERS UPLOAD IMAGES LIKE ARCHITECTURAL DIAGRAMS… Oct 5 127 1 kirouane Ayoub in GoPenAI FINE-TUNING EMBEDDINGS FOR SPECIFIC DOMAINS: A COMPREHENSIVE GUIDE IMAGINE YOU’RE BUILDING A QUESTION ANSWERING SYSTEM FOR A MEDICAL DOMAIN. YOU WANT TO ENSURE IT CAN ACCURATELY RETRIEVE RELEVANT MEDICAL… Sep 30 441 2 Paras Madan in GoPenAI BUILDING A MULTI PDF RAG CHATBOT: LANGCHAIN, STREAMLIT WITH CODE TALKING TO BIG PDF’S IS COOL. YOU CAN CHAT WITH YOUR NOTES, BOOKS AND DOCUMENTS ETC. THIS BLOG POST WILL HELP YOU BUILD A MULTI RAG… Jun 6 769 4 M K Pavan Kumar in Towards Dev ADVANCED RAG SOLUTIONS WITH LLAMA DEPLOY, LLAMA WORKFLOWS AND QDRANT HYBRID SEARCH IN THIS ARTICLE, I EXPLORE HOW TO LEVERAGE THE COMBINED CAPABILITIES OF LLAMA DEPLOY, LLAMA WORKFLOWS, AND QDRANT’S HYBRID SEARCH TO BUILD… Sep 10 291 See all from M K Pavan Kumar See all from GoPenAI RECOMMENDED FROM MEDIUM Pavan Nagula USER-CENTRIC RAG: TRANSFORMING RAG WITH LLAMAINDEX MULTI-AGENT SYSTEM AND QDRANT RETRIEVAL-AUGMENTED GENERATION (RAG) MODELS HAVE EVOLVED SIGNIFICANTLY OVER TIME. INITIALLY, TRADITIONAL RAG SYSTEMS FACED NUMEROUS… Aug 17 217 Pankaj CRAWL4AI: YOUR ULTIMATE ASYNCHRONOUS WEB CRAWLING COMPANION 🕷️🤖 ASYNCHRONOUS WEB CRAWLING COMPANION Oct 6 341 1 LISTS NATURAL LANGUAGE PROCESSING 1765 stories·1365 saves AI REGULATION 6 stories·592 saves CHATGPT PROMPTS 50 stories·2118 saves GENERATIVE AI RECOMMENDED READING 52 stories·1445 saves Pavan Emani in Art of Data Engineering MASTERING ARCHITECTURE DIAGRAMS AND TECHNICAL PRESENTATIONS: A DATA ARCHITECT’S GUIDE HOW TO CREATE IMPACTFUL TECHNICAL PRESENTATIONS AND DIAGRAMS THAT EFFECTIVELY COMMUNICATE COMPLEX IDEAS WITH PRECISION AND STORYTELLING Sep 20 505 5 Kamal Dhungana IMPLEMENTING HUMAN-IN-THE-LOOP WITH LANGGRAPH STREAMLIT APP — HIL (AGENT FRAMEWORK — LANGGRAPH) Jul 16 205 Tirupati Rao in Generative AI I TESTED 10 AI CODING ASSISTANTS, AND HOLY COW, YOU WON’T BELIEVE WHAT HAPPENED! “IT WAS SUPPOSED TO BE IMPOSSIBLE.” Sep 29 398 22 Ferry Djaja CREATE A RAG AGENT WITH LANGGRAPH TO EXTRACT THE INFORMATION FROM A PDF FILE IN THIS BLOG, WE WILL BUILD A SIMPLE AGENT TO EXTRACT THE INFORMATION FROM A PDF FILE WITH LANGGRAPH. WE WILL BE USING GPT-4O TO EXTRACT… Sep 23 109 See more recommendations Help Status About Careers Press Blog Privacy Terms Text to speech Teams To make Medium work, we log user data. By using Medium, you agree to our Privacy Policy, including cookie policy.