AGENTIC AI IN THE ENTERPRISE: STRATEGY, ARCHITECTURE, AND IMPLEMENTATION – PART 3

This is a continuation of my previous post, which can be found here.

Let us recap the key takaways from our previous post –

Enterprise AI, utilizing the Model Context Protocol (MCP), leverages an open standard that enables AI systems to securely and consistently access enterprise data and tools. MCP replaces brittle “N×M” integrations between models and systems with a standardized client–server pattern: an MCP host (e.g., IDE or chatbot) runs an MCP client that communicates with lightweight MCP servers, which wrap external systems via JSON-RPC. Servers expose three assets—Resources (data), Tools (actions), and Prompts (templates)—behind permissions, access control, and auditability. This design enables real-time context, reduces hallucinations, supports model- and cloud-agnostic interoperability, and accelerates “build once, integrate everywhere” deployment. A typical flow (e.g., retrieving a customer’s latest order) encompasses intent parsing, authorized tool invocation, query translation/execution, and the return of a normalized JSON result to the model for natural-language delivery. Performance introduces modest overhead (RPC hops, JSON (de)serialization, network transit) and scale considerations (request volume, significant results, context-window pressure). Mitigations include in-memory/semantic caching, optimized SQL with indexing, pagination, and filtering, connection pooling, and horizontal scaling with load balancing. In practice, small latency costs are often outweighed by the benefits of higher accuracy, stronger governance, and a decoupled, scalable architecture.

Compared to other approaches, the Model Context Protocol (MCP) offers a uniquely standardized and secure framework for AI-tool integration, shifting from brittle, custom-coded connections to a universal plug-and-play model. It is not a replacement for underlying systems, such as APIs or databases, but instead acts as an intelligent, secure abstraction layer designed explicitly for AI agents.

This approach was the traditional method for AI integration before standards like MCP emerged.

  • Custom API integrations (traditional): Each AI application requires a custom-built connector for every external system it needs to access, leading to an N x M integration problem (the number of connectors grows exponentially with the number of models and systems). This approach is resource-intensive, challenging to maintain, and prone to breaking when underlying APIs change.
  • MCP: The standardized protocol eliminates the N x M problem by creating a universal interface. Tool creators build a single MCP server for their system, and any MCP-compatible AI agent can instantly access it. This process decouples the AI model from the underlying implementation details, drastically reducing integration and maintenance costs.

For more detailed information, please refer to the following link.

RAG is a technique that retrieves static documents to augment an LLM’s knowledge, while MCP focuses on live interactions. They are complementary, not competing. 

  • RAG:
    • Focus: Retrieving and summarizing static, unstructured data, such as documents, manuals, or knowledge bases.
    • Best for: Providing background knowledge and general information, as in a policy lookup tool or customer service bot.
    • Data type: Unstructured, static knowledge.
  • MCP:
    • Focus: Accessing and acting on real-time, structured, and dynamic data from databases, APIs, and business systems.
    • Best for: Agentic use cases involving real-world actions, like pulling live sales reports from a CRM or creating a ticket in a project management tool.
    • Data type: Structured, real-time, and dynamic data.

Before MCP, platforms like OpenAI offered proprietary plugin systems to extend LLM capabilities.

  • LLM plugins:
    • Proprietary: Tied to a specific AI vendor (e.g., OpenAI).
    • Limited: Rely on the vendor’s API function-calling mechanism, which focuses on call formatting but not standardized execution.
    • Centralized: Managed by the AI vendor, creating a risk of vendor lock-in.
  • MCP:
    • Open standard: Based on a public, interoperable protocol (JSON-RPC 2.0), making it model-agnostic and usable across different platforms.
    • Infrastructure layer: Provides a standardized infrastructure for agents to discover and use any compliant tool, regardless of the underlying LLM.
    • Decentralized: Promotes a flexible ecosystem and reduces the risk of vendor lock-in. 

The “agent factory” pattern: Azure focuses on providing managed services for building and orchestrating AI agents, tightly integrated with its enterprise security and governance features. The MCP architecture is a core component of the Azure AI Foundry, serving as a secure, managed “agent factory.” 

  • AI orchestration layer: The Azure AI Agent Service, within Azure AI Foundry, acts as the central host and orchestrator. It provides the control plane for creating, deploying, and managing multiple specialized agents, and it natively supports the MCP standard.
  • AI model layer: Agents in the Foundry can be powered by various models, including those from Azure OpenAI Service, commercial models from partners, or open-source models.
  • MCP server and tool layer: MCP servers are deployed using serverless functions, such as Azure Functions or Azure Logic Apps, to wrap existing enterprise systems. These servers expose tools for interacting with enterprise data sources like SharePoint, Azure AI Search, and Azure Blob Storage.
  • Data and security layer: Data is secured using Microsoft Entra ID (formerly Azure AD) for authentication and access control, with robust security policies enforced via Azure API Management. Access to data sources, such as databases and storage, is managed securely through private networks and Managed Identity. 

The “composable serverless agent” pattern: AWS emphasizes a modular, composable, and serverless approach, leveraging its extensive portfolio of services to build sophisticated, flexible, and scalable AI solutions. The MCP architecture here aligns with the principle of creating lightweight, event-driven services that AI agents can orchestrate. 

  • The AI orchestration layer, which includes Amazon Bedrock Agents or custom agent frameworks deployed via AWS Fargate or Lambda, acts as the MCP hosts. Bedrock Agents provide built-in orchestration, while custom agents offer greater flexibility and customization options.
  • AI model layer: The models are sourced from Amazon Bedrock, which provides a wide selection of foundation models.
  • MCP server and tool layer: MCP servers are deployed as serverless AWS Lambda functions. AWS offers pre-built MCP servers for many of its services, including the AWS Serverless MCP Server for managing serverless applications and the AWS Lambda Tool MCP Server for invoking existing Lambda functions as tools.
  • Data and security layer: Access is tightly controlled using AWS Identity and Access Management (IAM) roles and policies, with fine-grained permissions for each MCP server. Private data sources like databases (Amazon DynamoDB) and storage (Amazon S3) are accessed securely within a Virtual Private Cloud (VPC). 

The “unified workbench” pattern: GCP focuses on providing a unified, open, and data-centric platform for AI development. The MCP architecture on GCP integrates natively with the Vertex AI platform, treating MCP servers as first-class tools that can be dynamically discovered and used within a single workbench. 

  • AI orchestration layer: The Vertex AI Agent Builder serves as the central environment for building and managing conversational AI and other agents. It orchestrates workflows and manages tool invocation for agents.
  • AI model layer: Agents use foundation models available through the Vertex AI Model Garden or the Gemini API.
  • MCP server and tool layer: MCP servers are deployed as containerized microservices on Cloud Run or managed by services like App Engine. These servers contain tools that interact with GCP services, such as BigQueryCloud Storage, and Cloud SQL. GCP offers pre-built MCP server implementations, such as the GCP MCP Toolbox, for integration with its databases.
  • Data and security layer: Vertex AI Vector Search and other data sources are encapsulated within the MCP server tools to provide contextual information. Access to these services is managed by Identity and Access Management (IAM) and secured through virtual private clouds. The MCP server can leverage Vertex AI Context Caching for improved performance.

Note that all the native technology is referred to in each respective cloud. Hence, some of the better technologies can be used in place of the tool mentioned here. This is more of a concept-level comparison rather than industry-wise implementation approaches.


We’ll go ahead and conclude this post here & continue discussing on a further deep dive in the next post.

Till then, Happy Avenging! 🙂

AGENTIC AI IN THE ENTERPRISE: STRATEGY, ARCHITECTURE, AND IMPLEMENTATION – PART 2

This is a continuation of my previous post, which can be found here.

Let us recap the key takaways from our previous post –

Agentic AI refers to autonomous systems that pursue goals with minimal supervision by planning, reasoning about next steps, utilizing tools, and maintaining context across sessions. Core capabilities include goal-directed autonomy, interaction with tools and environments (e.g., APIs, databases, devices), multi-step planning and reasoning under uncertainty, persistence, and choiceful decision-making.

Architecturally, three modules coordinate intelligent behavior: Sensing (perception pipelines that acquire multimodal data, extract salient patterns, and recognize entities/events); Observation/Deliberation (objective setting, strategy formation, and option evaluation relative to resources and constraints); and Action (execution via software interfaces, communications, or physical actuation to deliver outcomes). These functions are enabled by machine learning, deep learning, computer vision, natural language processing, planning/decision-making, uncertainty reasoning, and simulation/modeling.

At enterprise scale, open standards align autonomy with governance: the Model Context Protocol (MCP) grants an agent secure, principled access to enterprise tools and data (vertical integration), while Agent-to-Agent (A2A) enables specialized agents to coordinate, delegate, and exchange information (horizontal collaboration). Together, MCP and A2A help organizations transition from isolated pilots to scalable programs, delivering end-to-end automation, faster integration, enhanced security and auditability, vendor-neutral interoperability, and adaptive problem-solving that responds to real-time context.

Great! Let’s dive into this topic now.

Enterprise AI with MCP refers to the application of the Model Context Protocol (MCP), an open standard, to enable AI systems to securely and consistently access external enterprise data and applications. 

Before MCP, enterprise AI integration was characterized by a “many-to-many” or “N x M” problem. Companies had to build custom, fragile, and costly integrations between each AI model and every proprietary data source, which was not scalable. These limitations left AI agents with limited, outdated, or siloed information, restricting their potential impact. 
MCP addresses this by offering a standardized architecture for AI and data systems to communicate with each other.

The MCP framework uses a client-server architecture to enable communication between AI models and external tools and data sources. 

  • MCP Host: The AI-powered application or environment, such as an AI-enhanced IDE or a generative AI chatbot like Anthropic’s Claude or OpenAI’s ChatGPT, where the user interacts.
  • MCP Client: A component within the host application that manages the connection to MCP servers.
  • MCP Server: A lightweight service that wraps around an external system (e.g., a CRM, database, or API) and exposes its capabilities to the AI client in a standardized format, typically using JSON-RPC 2.0. 

An MCP server provides AI clients with three key resources: 

  • Resources: Structured or unstructured data that an AI can access, such as files, documents, or database records.
  • Tools: The functionality to perform specific actions within an external system, like running a database query or sending an email.
  • Prompts: Pre-defined text templates or workflows to help guide the AI’s actions. 
  • Standardized integration: Developers can build integrations against a single, open standard, which dramatically reduces the complexity and time required to deploy and scale AI initiatives.
  • Enhanced security and governance: MCP incorporates native support for security and compliance measures. It provides permission models, access control, and auditing capabilities to ensure AI systems only access data and tools within specified boundaries.
  • Real-time contextual awareness: By connecting AI agents to live enterprise data sources, MCP ensures they have access to the most current and relevant information, which reduces hallucinations and improves the accuracy of AI outputs.
  • Greater interoperability: MCP is model-agnostic & can be used with a variety of AI models (e.g., Anthropic’s Claude or OpenAI’s models) and across different cloud environments. This approach helps enterprises avoid vendor lock-in.
  • Accelerated development: The “build once, integrate everywhere” approach enables internal teams to focus on innovation instead of writing custom connectors for every system.

Let us understand one sample case & the flow of activities.

A customer support agent uses an AI assistant to get information about a customer’s recent orders. The AI assistant utilizes an MCP-compliant client to communicate with an MCP server, which is connected to the company’s PostgreSQL database.

1. User request: The support agent asks the AI assistant, “What was the most recent order placed by Priyanka Chopra Jonas?”

2. AI model processes intent: The AI assistant, running on an MCP host, analyzes the natural language query. It recognizes that to answer this question, it needs to perform a database query. It then identifies the appropriate tool from the MCP server’s capabilities. 

3. Client initiates tool call: The AI assistant’s MCP client sends a JSON-RPC request to the MCP server connected to the PostgreSQL database. The request specifies the tool to be used, such as get_customer_orders, and includes the necessary parameters: 

{
  "jsonrpc": "2.0",
  "method": "db_tools.get_customer_orders",
  "params": {
    "customer_name": "Priyanka Chopra Jonas",
    "sort_by": "order_date",
    "sort_order": "desc",
    "limit": 1
  },
  "id": "12345"
}

4. Server handles the request: The MCP server receives the request and performs several key functions: 

  • Authentication and authorization: The server verifies that the AI client and the user have permission to query the database.
  • Query translation: The server translates the standardized MCP request into a specific SQL query for the PostgreSQL database.
  • Query execution: The server executes the SQL query against the database.
SELECT order_id, order_date, total_amount
FROM orders
WHERE customer_name = 'Priyanka Chopra Jonas'
ORDER BY order_date DESC
LIMIT 1;

5. Database returns data: The PostgreSQL database executes the query and returns the requested data to the MCP server. 

6. Server formats the response: The MCP server receives the raw database output and formats it into a standardized JSON response that the MCP client can understand.

{
  "jsonrpc": "2.0",
  "result": {
    "data": [
      {
        "order_id": "98765",
        "order_date": "2025-08-25",
        "total_amount": 11025.50
      }
    ]
  },
  "id": "12345"
}

7. Client returns data to the model: The MCP client receives the JSON response and passes it back to the AI assistant’s language model. 

8. AI model generates final response: The language model incorporates this real-time data into its response and presents it to the user in a natural, conversational format. 

“Priyanka Chopra Jonas’s most recent order was placed on August 25, 2025, with an order ID of 98765, for a total of $11025.50.”

Using the Model Context Protocol (MCP) for database access introduces a layer of abstraction that affects performance in several ways. While it adds some latency and processing overhead, strategic implementation can mitigate these effects. For AI applications, the benefits often outweigh the costs, particularly in terms of improved accuracy, security, and scalability.

The MCP architecture introduces extra communication steps between the AI agent and the database, each adding a small amount of latency. 

  • RPC overhead: The JSON-RPC call from the AI’s client to the MCP server adds a small processing and network delay. This is an out-of-process request, as opposed to a simple local function call.
  • JSON serialization: Request and response data must be serialized and deserialized into JSON format, which requires processing time.
  • Network transit: For remote MCP servers, the data must travel over the network, adding latency. However, for a local or on-premise setup, this is minimal. The physical location of the MCP server relative to the AI model and the database is a significant factor.

The performance impact scales with the complexity and volume of the AI agent’s interactions. 

  • High request volume: A single AI agent working on a complex task might issue dozens of parallel database queries. In high-traffic scenarios, managing numerous simultaneous connections can strain system resources and require robust infrastructure.
  • Excessive data retrieval: A significant performance risk is an AI agent retrieving a massive dataset in a single query. This process can consume a large number of tokens, fill the AI’s context window, and cause bottlenecks at the database and client levels.
  • Context window usage: Tool definitions and the results of tool calls consume space in the AI’s context window. If a large number of tools are in use, this can limit the AI’s “working memory,” resulting in slower and less effective reasoning. 

Caching is a crucial strategy for mitigating the performance overhead of MCP. 

  • In-memory caching: The MCP server can cache results from frequent or expensive database queries in memory (e.g., using Redis or Memcached). This approach enables repeat requests to be served almost instantly without requiring a database hit.
  • Semantic caching: Advanced techniques can cache the results of previous queries and serve them for semantically similar future requests, reducing token consumption and improving speed for conversational applications. 

Designing the MCP server and its database interactions for efficiency is critical. 

  • Optimized SQL: The MCP server should generate optimized SQL queries. Database indexes should be utilized effectively to expedite lookups and minimize load.
  • Pagination and filtering: To prevent a single query from overwhelming the system, the MCP server should implement pagination. The AI agent can be prompted to use filtering parameters to retrieve only the necessary data.
  • Connection pooling: This technique reuses existing database connections instead of opening a new one for each request, thereby reducing latency and database load. 

For large-scale enterprise deployments, scaling is essential for maintaining performance. 

  • Multiple servers: The workload can be distributed across various MCP servers. One server could handle read requests, and another could handle writes.
  • Load balancing: A reverse proxy or other load-balancing solution can distribute incoming traffic across MCP server instances. Autoscaling can dynamically add or remove servers in response to demand.

For AI-driven tasks, a slight increase in latency for database access is often a worthwhile trade-off for significant gains. 

  • Improved accuracy: Accessing real-time, high-quality data through MCP leads to more accurate and relevant AI responses, reducing “hallucinations”.
  • Scalable ecosystem: The standardization of MCP reduces development overhead and allows for a more modular, scalable ecosystem, which saves significant engineering resources compared to building custom integrations.
  • Decoupled architecture: The MCP server decouples the AI model from the database, allowing each to be optimized and scaled independently. 

We’ll go ahead and conclude this post here & continue discussing on a further deep dive in the next post.

Till then, Happy Avenging! 🙂

Agentic AI in the Enterprise: Strategy, Architecture, and Implementation – Part 1

Today, we won’t be discussing any solutions. Today, we’ll be discussing the Agentic AI & its implementation in the Enterprise landscape in a series of upcoming posts.

So, hang tight! We’re about to launch a new venture as part of our knowledge drive.

Agentic AI refers to artificial intelligence systems that can act autonomously to achieve goals, making decisions and taking actions without constant human oversight. Unlike traditional AI, which responds to prompts, agentic AI can plan, reason about next steps, utilize tools, and work toward objectives over extended periods of time.

Key characteristics of agentic AI include:

  • Autonomy and Goal-Directed Behavior: These systems can pursue objectives independently, breaking down complex tasks into smaller steps and executing them sequentially.
  • Tool Use and Environment Interaction: Agentic AI can interact with external systems, APIs, databases, and software tools to gather information and perform actions in the real world.
  • Planning and Reasoning: They can develop multi-step strategies, adapt their approach based on feedback, and reason through problems to find solutions.
  • Persistence: Unlike single-interaction AI, agentic systems can maintain context and continue working on tasks across multiple interactions or sessions.
  • Decision Making: They can evaluate options, weigh trade-offs, and make choices about how to proceed when faced with uncertainty.

Agentic AI systems have several interconnected components that work together to enable intelligent behaviour. Each element plays a crucial role in the overall functioning of the AI system, and they must interact seamlessly to achieve desired outcomes. Let’s explore each of these components in more detail.

The sensing module serves as the AI’s eyes and ears, enabling it to understand its surroundings and make informed decisions. Think of it as the system that helps the AI “see” and “hear” the world around it, much like how humans use their senses.

  • Gathering Information: The system collects data from multiple sources, including cameras for visual information, microphones for audio, sensors for physical touch, and digital systems for data. This step provides the AI with a comprehensive understanding of what’s happening.
  • Making Sense of Data: Raw information from sensors can be messy and overwhelming. This component processes the data to identify the essential patterns and details that actually matter for making informed decisions.
  • Recognizing What’s Important: Utilizing advanced techniques such as computer vision (for images), natural language processing (for text and speech), and machine learning (for data patterns), the system identifies and understands objects, people, events, and situations within the environment.

This sensing capability enables AI systems to transition from merely following pre-programmed instructions to genuinely understanding their environment and making informed decisions based on real-world conditions. It’s the difference between a basic automated system and an intelligent agent that can adapt to changing situations.

The observation module serves as the AI’s decision-making center, where it sets objectives, develops strategies, and selects the most effective actions to take. This step is where the AI transforms what it perceives into purposeful action, much like humans think through problems and devise plans.

  • Setting Clear Objectives: The system establishes specific goals and desired outcomes, giving the AI a clear sense of direction and purpose. This approach helps ensure all actions are working toward meaningful results rather than random activity.
  • Strategic Planning: Using information about its own capabilities and the current situation, the AI creates step-by-step plans to reach its goals. It considers potential obstacles, available resources, and different approaches to find the most effective path forward.
  • Intelligent Decision-Making: When faced with multiple options, the system evaluates each choice against the current circumstances, established goals, and potential outcomes. It then selects the action most likely to move the AI closer to achieving its objectives.

This observation capability is what transforms an AI from a simple tool that follows commands into an intelligent system that can work independently toward business goals. It enables the AI to handle complex, multi-step tasks and adapt its approach when conditions change, making it valuable for a wide range of applications, from customer service to project management.

The action module serves as the AI’s hands and voice, turning decisions into real-world results. This step is where the AI actually puts its thinking and planning into action, carrying out tasks that make a tangible difference in the environment.

  • Control Systems: The system utilizes various tools to interact with the world, including motors for physical movement, speakers for communication, network connections for digital tasks, and software interfaces for system operation. These serve as the AI’s means of reaching out and making adjustments.
  • Task Implementation: Once the cognitive module determines the action to take, this component executes the actual task. Whether it’s sending an email, moving a robotic arm, updating a database, or scheduling a meeting, this module handles the execution from start to finish.

This action capability is what makes AI systems truly useful in business environments. Without it, an AI could analyze data and make significant decisions, but it couldn’t help solve problems or complete tasks. The action module bridges the gap between artificial intelligence and real-world impact, enabling AI to automate processes, respond to customers, manage systems, and deliver measurable business value.

Technology that is primarily involved in the Agentic AI is as follows –

1. Machine Learning
2. Deep Learning
3. Computer Vision
4. Natural Language Processing (NLP)
5. Planning and Decision-Making
6. Uncertainty and Reasoning
7. Simulation and Modeling

In an enterprise setting, agentic AI systems utilize the Model Context Protocol (MCP) and the Agent-to-Agent (A2A) protocol as complementary, open standards to achieve autonomous, coordinated, and secure workflows. An MCP-enabled agent gains the ability to access and manipulate enterprise tools and data. At the same time, A2A allows a network of these agents to collaborate on complex tasks by delegating and exchanging information.

This combined approach allows enterprises to move from isolated AI experiments to strategic, scalable, and secure AI programs.

ProtocolFunction in Agentic AIFocusExample use case
Model Context Protocol (MCP)Equips a single AI agent with the tools and data it needs to perform a specific job.Vertical integration: connecting agents to enterprise systems like databases, CRMs, and APIs.A sales agent uses MCP to query the company CRM for a client’s recent purchase history.
Agent-to-Agent (A2A)Enables multiple specialized agents to communicate, delegate tasks, and collaborate on a larger, multi-step goal.Horizontal collaboration: allowing agents from different domains to work together seamlessly.An orchestrating agent uses A2A to delegate parts of a complex workflow to specialized HR, IT, and sales agents.
  • End-to-end automation: Agents can handle tasks from start to finish, including complex, multi-step workflows, autonomously.
  • Greater agility and speed: Enterprise-wide adoption of these protocols reduces the cost and complexity of integrating AI, accelerating deployment timelines for new applications.
  • Enhanced security and governance: Enterprise AI platforms built on these open standards incorporate robust security policies, centralized access controls, and comprehensive audit trails.
  • Vendor neutrality and interoperability: As open standards, MCP and A2A allow AI agents to work together seamlessly, regardless of the underlying vendor or platform.
  • Adaptive problem-solving: Agents can dynamically adjust their strategies and collaborate based on real-time data and contextual changes, leading to more resilient and efficient systems.

We will discuss this topic further in our upcoming posts.

Till then, Happy Avenging! 🙂

Real-time video summary assistance App – Part 2

As a continuation of the previous post, I would like to continue my discussion about the implementation of MCP protocols among agents. But before that, I want to add the quick demo one more time to recap our objectives.

Let us recap the process flow –

Also, understand the groupings of scripts by each group as posted in the previous post –

Message-Chaining Protocol (MCP) Implementation:

    clsMCPMessage.py
    clsMCPBroker.py

YouTube Transcript Extraction:

    clsYouTubeVideoProcessor.py

Language Detection:

    clsLanguageDetector.py

Translation Services & Agents:

    clsTranslationAgent.py
    clsTranslationService.py

Documentation Agent:

    clsDocumentationAgent.py
    
Research Agent:

    clsDocumentationAgent.py

Great! Now, we’ll continue with the main discussion.


def extract_youtube_id(youtube_url):
    """Extract YouTube video ID from URL"""
    youtube_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url)
    if youtube_id_match:
        return youtube_id_match.group(1)
    return None

def get_youtube_transcript(youtube_url):
    """Get transcript from YouTube video"""
    video_id = extract_youtube_id(youtube_url)
    if not video_id:
        return {"error": "Invalid YouTube URL or ID"}
    
    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        
        # First try to get manual transcripts
        try:
            transcript = transcript_list.find_manually_created_transcript(["en"])
            transcript_data = transcript.fetch()
            print(f"Debug - Manual transcript format: {type(transcript_data)}")
            if transcript_data and len(transcript_data) > 0:
                print(f"Debug - First item type: {type(transcript_data[0])}")
                print(f"Debug - First item sample: {transcript_data[0]}")
            return {"text": transcript_data, "language": "en", "auto_generated": False}
        except Exception as e:
            print(f"Debug - No manual transcript: {str(e)}")
            # If no manual English transcript, try any available transcript
            try:
                available_transcripts = list(transcript_list)
                if available_transcripts:
                    transcript = available_transcripts[0]
                    print(f"Debug - Using transcript in language: {transcript.language_code}")
                    transcript_data = transcript.fetch()
                    print(f"Debug - Auto transcript format: {type(transcript_data)}")
                    if transcript_data and len(transcript_data) > 0:
                        print(f"Debug - First item type: {type(transcript_data[0])}")
                        print(f"Debug - First item sample: {transcript_data[0]}")
                    return {
                        "text": transcript_data, 
                        "language": transcript.language_code, 
                        "auto_generated": transcript.is_generated
                    }
                else:
                    return {"error": "No transcripts available for this video"}
            except Exception as e:
                return {"error": f"Error getting transcript: {str(e)}"}
    except Exception as e:
        return {"error": f"Error getting transcript list: {str(e)}"}

# ----------------------------------------------------------------------------------
# YouTube Video Processor
# ----------------------------------------------------------------------------------

class clsYouTubeVideoProcessor:
    """Process YouTube videos using the agent system"""
    
    def __init__(self, documentation_agent, translation_agent, research_agent):
        self.documentation_agent = documentation_agent
        self.translation_agent = translation_agent
        self.research_agent = research_agent
    
    def process_youtube_video(self, youtube_url):
        """Process a YouTube video"""
        print(f"Processing YouTube video: {youtube_url}")
        
        # Extract transcript
        transcript_result = get_youtube_transcript(youtube_url)
        
        if "error" in transcript_result:
            return {"error": transcript_result["error"]}
        
        # Start a new conversation
        conversation_id = self.documentation_agent.start_processing()
        
        # Process transcript segments
        transcript_data = transcript_result["text"]
        transcript_language = transcript_result["language"]
        
        print(f"Debug - Type of transcript_data: {type(transcript_data)}")
        
        # For each segment, detect language and translate if needed
        processed_segments = []
        
        try:
            # Make sure transcript_data is a list of dictionaries with text and start fields
            if isinstance(transcript_data, list):
                for idx, segment in enumerate(transcript_data):
                    print(f"Debug - Processing segment {idx}, type: {type(segment)}")
                    
                    # Extract text properly based on the type
                    if isinstance(segment, dict) and "text" in segment:
                        text = segment["text"]
                        start = segment.get("start", 0)
                    else:
                        # Try to access attributes for non-dict types
                        try:
                            text = segment.text
                            start = getattr(segment, "start", 0)
                        except AttributeError:
                            # If all else fails, convert to string
                            text = str(segment)
                            start = idx * 5  # Arbitrary timestamp
                    
                    print(f"Debug - Extracted text: {text[:30]}...")
                    
                    # Create a standardized segment
                    std_segment = {
                        "text": text,
                        "start": start
                    }
                    
                    # Process through translation agent
                    translation_result = self.translation_agent.process_text(text, conversation_id)
                    
                    # Update segment with translation information
                    segment_with_translation = {
                        **std_segment,
                        "translation_info": translation_result
                    }
                    
                    # Use translated text for documentation
                    if "final_text" in translation_result and translation_result["final_text"] != text:
                        std_segment["processed_text"] = translation_result["final_text"]
                    else:
                        std_segment["processed_text"] = text
                    
                    processed_segments.append(segment_with_translation)
            else:
                # If transcript_data is not a list, treat it as a single text block
                print(f"Debug - Transcript is not a list, treating as single text")
                text = str(transcript_data)
                std_segment = {
                    "text": text,
                    "start": 0
                }
                
                translation_result = self.translation_agent.process_text(text, conversation_id)
                segment_with_translation = {
                    **std_segment,
                    "translation_info": translation_result
                }
                
                if "final_text" in translation_result and translation_result["final_text"] != text:
                    std_segment["processed_text"] = translation_result["final_text"]
                else:
                    std_segment["processed_text"] = text
                
                processed_segments.append(segment_with_translation)
                
        except Exception as e:
            print(f"Debug - Error processing transcript: {str(e)}")
            return {"error": f"Error processing transcript: {str(e)}"}
        
        # Process the transcript with the documentation agent
        documentation_result = self.documentation_agent.process_transcript(
            processed_segments,
            conversation_id
        )
        
        return {
            "youtube_url": youtube_url,
            "transcript_language": transcript_language,
            "processed_segments": processed_segments,
            "documentation": documentation_result,
            "conversation_id": conversation_id
        }

Let us understand this step-by-step:

Part 1: Getting the YouTube Transcript

def extract_youtube_id(youtube_url):
    ...

This extracts the unique video ID from any YouTube link. 

def get_youtube_transcript(youtube_url):
    ...
  • This gets the actual spoken content of the video.
  • It tries to get a manual transcript first (created by humans).
  • If not available, it falls back to an auto-generated version (created by YouTube’s AI).
  • If nothing is found, it gives back an error message like: “Transcript not available.”

Part 2: Processing the Video with Agents

class clsYouTubeVideoProcessor:
    ...

This is like the control center that tells each intelligent agent what to do with the transcript. Here are the detailed steps:

1. Start the Process

def process_youtube_video(self, youtube_url):
    ...
  • The system starts with a YouTube video link.
  • It prints a message like: “Processing YouTube video: [link]”

2. Extract the Transcript

  • The system runs the get_youtube_transcript() function.
  • If it fails, it returns an error (e.g., invalid link or no subtitles available).

3. Start a “Conversation”

  • The documentation agent begins a new session, tracked by a unique conversation ID.
  • Think of this like opening a new folder in a shared team workspace to store everything related to this video.

4. Go Through Each Segment of the Transcript

  • The spoken text is often broken into small parts (segments), like subtitles.
  • For each part:
    • It checks the text.
    • It finds out the time that part was spoken.
    • It sends it to the translation agent to clean up or translate the text.

5. Translate (if needed)

  • If the translation agent finds a better or translated version, it replaces the original.
  • Otherwise, it keeps the original.

6. Prepare for Documentation

  • After translation, the segment is passed to the documentation agent.
  • This agent might:
    • Summarize the content,
    • Highlight important terms,
    • Structure it into a readable format.

7. Return the Final Result

The system gives back a structured package with:

  • The video link
  • The original language
  • The transcript in parts (processed and translated)
  • A documentation summary
  • The conversation ID (for tracking or further updates)

class clsDocumentationAgent:
    """Documentation Agent built with LangChain"""
    
    def __init__(self, agent_id: str, broker: clsMCPBroker):
        self.agent_id = agent_id
        self.broker = broker
        self.broker.register_agent(agent_id)
        
        # Initialize LangChain components
        self.llm = ChatOpenAI(
            model="gpt-4-0125-preview",
            temperature=0.1,
            api_key=OPENAI_API_KEY
        )
        
        # Create tools
        self.tools = [
            clsSendMessageTool(sender_id=self.agent_id, broker=self.broker)
        ]
        
        # Set up LLM with tools
        self.llm_with_tools = self.llm.bind(
            tools=[tool.tool_config for tool in self.tools]
        )
        
        # Setup memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        
        # Create prompt
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a Documentation Agent for YouTube video transcripts. Your responsibilities include:
                1. Process YouTube video transcripts
                2. Identify key points, topics, and main ideas
                3. Organize content into a coherent and structured format
                4. Create concise summaries
                5. Request research information when necessary
                
                When you need additional context or research, send a request to the Research Agent.
                Always maintain a professional tone and ensure your documentation is clear and organized.
            """),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        # Create agent
        self.agent = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: self.memory.load_memory_variables({})["chat_history"],
                "agent_scratchpad": lambda x: format_to_openai_tool_messages(x["intermediate_steps"]),
            }
            | self.prompt
            | self.llm_with_tools
            | OpenAIToolsAgentOutputParser()
        )
        
        # Create agent executor
        self.agent_executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            verbose=True,
            memory=self.memory
        )
        
        # Video data
        self.current_conversation_id = None
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
    def start_processing(self) -> str:
        """Start processing a new video"""
        self.current_conversation_id = str(uuid.uuid4())
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
        return self.current_conversation_id
    
    def process_transcript(self, transcript_segments, conversation_id=None):
        """Process a YouTube transcript"""
        if not conversation_id:
            conversation_id = self.start_processing()
        self.current_conversation_id = conversation_id
        
        # Store transcript segments
        self.transcript_segments = transcript_segments
        
        # Process segments
        processed_segments = []
        for segment in transcript_segments:
            processed_result = self.process_segment(segment)
            processed_segments.append(processed_result)
        
        # Generate summary
        summary = self.generate_summary()
        
        return {
            "processed_segments": processed_segments,
            "summary": summary,
            "conversation_id": conversation_id
        }
    
    def process_segment(self, segment):
        """Process individual transcript segment"""
        text = segment.get("text", "")
        start = segment.get("start", 0)
        
        # Use LangChain agent to process the segment
        result = self.agent_executor.invoke({
            "input": f"Process this video transcript segment at timestamp {start}s: {text}. If research is needed, send a request to the research_agent."
        })
        
        # Update video notes
        timestamp = start
        self.video_notes[timestamp] = {
            "text": text,
            "analysis": result["output"]
        }
        
        return {
            "timestamp": timestamp,
            "text": text,
            "analysis": result["output"]
        }
    
    def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
        """Handle an incoming MCP message"""
        if message.message_type == "research_response":
            # Process research information received from Research Agent
            research_info = message.content.get("text", "")
            
            result = self.agent_executor.invoke({
                "input": f"Incorporate this research information into video analysis: {research_info}"
            })
            
            # Send acknowledgment back to Research Agent
            response = clsMCPMessage(
                sender=self.agent_id,
                receiver=message.sender,
                message_type="acknowledgment",
                content={"text": "Research information incorporated into video analysis."},
                reply_to=message.id,
                conversation_id=message.conversation_id
            )
            
            self.broker.publish(response)
            return response
        
        elif message.message_type == "translation_response":
            # Process translation response from Translation Agent
            translation_result = message.content
            
            # Process the translated text
            if "final_text" in translation_result:
                text = translation_result["final_text"]
                original_text = translation_result.get("original_text", "")
                language_info = translation_result.get("language", {})
                
                result = self.agent_executor.invoke({
                    "input": f"Process this translated text: {text}\nOriginal language: {language_info.get('language', 'unknown')}\nOriginal text: {original_text}"
                })
                
                # Update notes with translation information
                for timestamp, note in self.video_notes.items():
                    if note["text"] == original_text:
                        note["translated_text"] = text
                        note["language"] = language_info
                        break
            
            return None
        
        return None
    
    def run(self):
        """Run the agent to listen for MCP messages"""
        print(f"Documentation Agent {self.agent_id} is running...")
        while True:
            message = self.broker.get_message(self.agent_id, timeout=1)
            if message:
                self.handle_mcp_message(message)
            time.sleep(0.1)
    
    def generate_summary(self) -> str:
        """Generate a summary of the video"""
        if not self.video_notes:
            return "No video data available to summarize."
        
        all_notes = "\n".join([f"{ts}: {note['text']}" for ts, note in self.video_notes.items()])
        
        result = self.agent_executor.invoke({
            "input": f"Generate a concise summary of this YouTube video, including key points and topics:\n{all_notes}"
        })
        
        return result["output"]

Let us understand the key methods in a step-by-step manner:

The Documentation Agent is like a smart assistant that watches a YouTube video, takes notes, pulls out important ideas, and creates a summary — almost like a professional note-taker trained to help educators, researchers, and content creators. It works with a team of other assistants, like a Translator Agent and a Research Agent, and they all talk to each other through a messaging system.

1. Starting to Work on a New Video

    def start_processing(self) -> str
    

    When a new video is being processed:

    • A new project ID is created.
    • Old notes and transcripts are cleared to start fresh.

    2. Processing the Whole Transcript

    def process_transcript(...)
    

    This is where the assistant:

    • Takes in the full transcript (what was said in the video).
    • Breaks it into small parts (like subtitles).
    • Sends each part to the smart brain for analysis.
    • Collects the results.
    • Finally, a summary of all the main ideas is created.

    3. Processing One Transcript Segment at a Time

    def process_segment(self, segment)
    

    For each chunk of the video:

    • The assistant reads the text and timestamp.
    • It asks GPT-4 to analyze it and suggest important insights.
    • It saves that insight along with the original text and timestamp.

    4. Handling Incoming Messages from Other Agents

    def handle_mcp_message(self, message)
    

    The assistant can also receive messages from teammates (other agents):

    If the message is from the Research Agent:

    • It reads new information and adds it to its notes.
    • It replies with a thank-you message to say it got the research.

    If the message is from the Translation Agent:

    • It takes the translated version of a transcript.
    • Updates its notes to reflect the translated text and its language.

    This is like a team of assistants emailing back and forth to make sure the notes are complete and accurate.

    5. Summarizing the Whole Video

    def generate_summary(self)
    

    After going through all the transcript parts, the agent asks GPT-4 to create a short, clean summary — identifying:

    • Main ideas
    • Key talking points
    • Structure of the content

    The final result is clear, professional, and usable in learning materials or documentation.


    class clsResearchAgent:
        """Research Agent built with AutoGen"""
        
        def __init__(self, agent_id: str, broker: clsMCPBroker):
            self.agent_id = agent_id
            self.broker = broker
            self.broker.register_agent(agent_id)
            
            # Configure AutoGen directly with API key
            if not OPENAI_API_KEY:
                print("Warning: OPENAI_API_KEY not set for ResearchAgent")
                
            # Create config list directly instead of loading from file
            config_list = [
                {
                    "model": "gpt-4-0125-preview",
                    "api_key": OPENAI_API_KEY
                }
            ]
            # Create AutoGen assistant for research
            self.assistant = AssistantAgent(
                name="research_assistant",
                system_message="""You are a Research Agent for YouTube videos. Your responsibilities include:
                    1. Research topics mentioned in the video
                    2. Find relevant information, facts, references, or context
                    3. Provide concise, accurate information to support the documentation
                    4. Focus on delivering high-quality, relevant information
                    
                    Respond directly to research requests with clear, factual information.
                """,
                llm_config={"config_list": config_list, "temperature": 0.1}
            )
            
            # Create user proxy to handle message passing
            self.user_proxy = UserProxyAgent(
                name="research_manager",
                human_input_mode="NEVER",
                code_execution_config={"work_dir": "coding", "use_docker": False},
                default_auto_reply="Working on the research request..."
            )
            
            # Current conversation tracking
            self.current_requests = {}
        
        def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
            """Handle an incoming MCP message"""
            if message.message_type == "request":
                # Process research request from Documentation Agent
                request_text = message.content.get("text", "")
                
                # Use AutoGen to process the research request
                def research_task():
                    self.user_proxy.initiate_chat(
                        self.assistant,
                        message=f"Research request for YouTube video content: {request_text}. Provide concise, factual information."
                    )
                    # Return last assistant message
                    return self.assistant.chat_messages[self.user_proxy.name][-1]["content"]
                
                # Execute research task
                research_result = research_task()
                
                # Send research results back to Documentation Agent
                response = clsMCPMessage(
                    sender=self.agent_id,
                    receiver=message.sender,
                    message_type="research_response",
                    content={"text": research_result},
                    reply_to=message.id,
                    conversation_id=message.conversation_id
                )
                
                self.broker.publish(response)
                return response
            
            return None
        
        def run(self):
            """Run the agent to listen for MCP messages"""
            print(f"Research Agent {self.agent_id} is running...")
            while True:
                message = self.broker.get_message(self.agent_id, timeout=1)
                if message:
                    self.handle_mcp_message(message)
                time.sleep(0.1)
    

    Let us understand the key methods in detail.

    1. Receiving and Responding to Research Requests

      def handle_mcp_message(self, message)
      

      When the Research Agent gets a message (like a question or request for info), it:

      1. Reads the message to see what needs to be researched.
      2. Asks GPT-4 to find helpful, accurate info about that topic.
      3. Sends the answer back to whoever asked the question (usually the Documentation Agent).

      class clsTranslationAgent:
          """Agent for language detection and translation"""
          
          def __init__(self, agent_id: str, broker: clsMCPBroker):
              self.agent_id = agent_id
              self.broker = broker
              self.broker.register_agent(agent_id)
              
              # Initialize language detector
              self.language_detector = clsLanguageDetector()
              
              # Initialize translation service
              self.translation_service = clsTranslationService()
          
          def process_text(self, text, conversation_id=None):
              """Process text: detect language and translate if needed, handling mixed language content"""
              if not conversation_id:
                  conversation_id = str(uuid.uuid4())
              
              # Detect language with support for mixed language content
              language_info = self.language_detector.detect(text)
              
              # Decide if translation is needed
              needs_translation = True
              
              # Pure English content doesn't need translation
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  needs_translation = False
              
              # For mixed language, check if it's primarily English
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  english_langs = [
                      lang for lang in language_info.get("languages", []) 
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-")
                  ]
                  
                  # If the highest confidence language is English and > 60% confident, don't translate
                  if english_langs and english_langs[0].get("confidence", 0) > 0.6:
                      needs_translation = False
              
              if needs_translation:
                  # Translate using the appropriate service based on language detection
                  translation_result = self.translation_service.translate(text, language_info)
                  
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": translation_result,
                      "final_text": translation_result.get("translated_text", text),
                      "conversation_id": conversation_id
                  }
              else:
                  # Already English or unknown language, return as is
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": {"provider": "none"},
                      "final_text": text,
                      "conversation_id": conversation_id
                  }
          
          def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
              """Handle an incoming MCP message"""
              if message.message_type == "translation_request":
                  # Process translation request from Documentation Agent
                  text = message.content.get("text", "")
                  
                  # Process the text
                  result = self.process_text(text, message.conversation_id)
                  
                  # Send translation results back to requester
                  response = clsMCPMessage(
                      sender=self.agent_id,
                      receiver=message.sender,
                      message_type="translation_response",
                      content=result,
                      reply_to=message.id,
                      conversation_id=message.conversation_id
                  )
                  
                  self.broker.publish(response)
                  return response
              
              return None
          
          def run(self):
              """Run the agent to listen for MCP messages"""
              print(f"Translation Agent {self.agent_id} is running...")
              while True:
                  message = self.broker.get_message(self.agent_id, timeout=1)
                  if message:
                      self.handle_mcp_message(message)
                  time.sleep(0.1)

      Let us understand the key methods in step-by-step manner:

      1. Understanding and Translating Text:

      def process_text(...)
      

      This is the core job of the agent. Here’s what it does with any piece of text:

      Step 1: Detect the Language

      • It tries to figure out the language of the input text.
      • It can handle cases where more than one language is mixed together, which is common in casual speech or subtitles.

      Step 2: Decide Whether to Translate

      • If the text is clearly in English, or it’s unclear what the language is, it decides not to translate.
      • If the text is mostly in another language or has less than 60% confidence in being English, it will translate it into English.

      Step 3: Translate (if needed)

      • If translation is required, it uses the translation service to do the job.
      • Then it packages all the information: the original text, detected language, the translated version, and a unique conversation ID.

      Step 4: Return the Results

      • If no translation is needed, it returns the original text and a note saying “no translation was applied.”

      2. Receiving Messages and Responding

      def handle_mcp_message(...)
      

      The agent listens for messages from other agents. When someone asks it to translate something:

      • It takes the text from the message.
      • Runs it through the process_text function (as explained above).
      • Sends the translated (or original) result to the person who asked.
      class clsTranslationService:
          """Translation service using multiple providers with support for mixed languages"""
          
          def __init__(self):
              # Initialize Sarvam AI client
              self.sarvam_api_key = SARVAM_API_KEY
              self.sarvam_url = "https://api.sarvam.ai/translate"
              
              # Initialize Google Cloud Translation client using simple HTTP requests
              self.google_api_key = GOOGLE_API_KEY
              self.google_translate_url = "https://translation.googleapis.com/language/translate/v2"
          
          def translate_with_sarvam(self, text, source_lang, target_lang="en-IN"):
              """Translate text using Sarvam AI (for Indian languages)"""
              if not self.sarvam_api_key:
                  return {"error": "Sarvam API key not set"}
              
              headers = {
                  "Content-Type": "application/json",
                  "api-subscription-key": self.sarvam_api_key
              }
              
              payload = {
                  "input": text,
                  "source_language_code": source_lang,
                  "target_language_code": target_lang,
                  "speaker_gender": "Female",
                  "mode": "formal",
                  "model": "mayura:v1"
              }
              
              try:
                  response = requests.post(self.sarvam_url, headers=headers, json=payload)
                  if response.status_code == 200:
                      return {"translated_text": response.json().get("translated_text", ""), "provider": "sarvam"}
                  else:
                      return {"error": f"Sarvam API error: {response.text}", "provider": "sarvam"}
              except Exception as e:
                  return {"error": f"Error calling Sarvam API: {str(e)}", "provider": "sarvam"}
          
          def translate_with_google(self, text, target_lang="en"):
              """Translate text using Google Cloud Translation API with direct HTTP request"""
              if not self.google_api_key:
                  return {"error": "Google API key not set"}
              
              try:
                  # Using the translation API v2 with API key
                  params = {
                      "key": self.google_api_key,
                      "q": text,
                      "target": target_lang
                  }
                  
                  response = requests.post(self.google_translate_url, params=params)
                  if response.status_code == 200:
                      data = response.json()
                      translation = data.get("data", {}).get("translations", [{}])[0]
                      return {
                          "translated_text": translation.get("translatedText", ""),
                          "detected_source_language": translation.get("detectedSourceLanguage", ""),
                          "provider": "google"
                      }
                  else:
                      return {"error": f"Google API error: {response.text}", "provider": "google"}
              except Exception as e:
                  return {"error": f"Error calling Google Translation API: {str(e)}", "provider": "google"}
          
          def translate(self, text, language_info):
              """Translate text to English based on language detection info"""
              # If already English or unknown language, return as is
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  return {"translated_text": text, "provider": "none"}
              
              # Handle mixed language content
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  # Strategy for mixed language: 
                  # 1. If one of the languages is English, don't translate the entire text, as it might distort English portions
                  # 2. If no English but contains Indian languages, use Sarvam as it handles code-mixing better
                  # 3. Otherwise, use Google Translate for the primary detected language
                  
                  has_english = False
                  has_indian = False
                  
                  for lang in language_info.get("languages", []):
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-"):
                          has_english = True
                      if lang.get("is_indian", False):
                          has_indian = True
                  
                  if has_english:
                      # Contains English - use Google for full text as it handles code-mixing well
                      return self.translate_with_google(text)
                  elif has_indian:
                      # Contains Indian languages - use Sarvam
                      # Use the highest confidence Indian language as source
                      indian_langs = [lang for lang in language_info.get("languages", []) if lang.get("is_indian", False)]
                      if indian_langs:
                          # Sort by confidence
                          indian_langs.sort(key=lambda x: x.get("confidence", 0), reverse=True)
                          source_lang = indian_langs[0]["language_code"]
                          return self.translate_with_sarvam(text, source_lang)
                      else:
                          # Fallback to primary language
                          if language_info["is_indian"]:
                              return self.translate_with_sarvam(text, language_info["language_code"])
                          else:
                              return self.translate_with_google(text)
                  else:
                      # No English, no Indian languages - use Google for primary language
                      return self.translate_with_google(text)
              else:
                  # Not mixed language - use standard approach
                  if language_info["is_indian"]:
                      # Use Sarvam AI for Indian languages
                      return self.translate_with_sarvam(text, language_info["language_code"])
                  else:
                      # Use Google for other languages
                      return self.translate_with_google(text)

      This Translation Service is like a smart translator that knows how to:

      • Detect what language the text is written in,
      • Choose the best translation provider depending on the language (especially for Indian languages),
      • And then translate the text into English.

      It supports mixed-language content (such as Hindi-English in one sentence) and uses either Google Translate or Sarvam AI, a translation service designed for Indian languages.

      Now, let us understand the key methods in a step-by-step manner:

      1. Translating Using Google Translate

      def translate_with_google(...)
      

      This function uses Google Translate:

      • It sends the text, asks for English as the target language, and gets a translation back.
      • It also detects the source language automatically.
      • If successful, it returns the translated text and the detected original language.
      • If there’s an error, it returns a message saying what went wrong.

      Best For: Non-Indian languages (like Spanish, French, Chinese) and content that is not mixed with English.

      2. Main Translation Logic

      def translate(self, text, language_info)
      

      This is the decision-maker. Here’s how it works:

      Case 1: No Translation Needed

      If the text is already in English or the language is unknown, it simply returns the original text.

      Case 2: Mixed Language (e.g., Hindi + English)

      If the text contains more than one language:

      • ✅ If one part is English → use Google Translate (it’s good with mixed languages).
      • ✅ If it includes Indian languages only → use Sarvam AI (better at handling Indian content).
      • ✅ If it’s neither English nor Indian → use Google Translate.

      The service checks how confident it is about each language in the mix and chooses the most likely one to translate from.

      Case 3: Single Language

      If the text is only in one language:

      • ✅ If it’s an Indian language (like Bengali, Tamil, or Marathi), use Sarvam AI.
      • ✅ If it’s any other language, use Google Translate.

      So, we’ve done it.

      I’ve included the complete working solutions for you in the GitHub Link.

      We’ll cover the detailed performance testing, Optimized configurations & many other useful details in our next post.

      Till then, Happy Avenging! 🙂

      Real-time video summary assistance App – Part 1

      Today, we’ll discuss another topic in our two-part series. We will understand the importance of the MCP protocol for communicating between agents.

      This will be an in-depth highly technical as well as depicting using easy-to-understand visuals.

      But, before that, let us understand the demo first.

      Isn’t it exciting?


      Let us first understand in easy language about the MCP protocol.

      MCP (Multi-Agent Communication Protocol) is a custom message exchange system that facilitates structured and scalable communication among multiple AI agents operating within an application. These agents collaborate asynchronously or in real-time to complete complex tasks by sharing results, context, and commands through a common messaging layer.

      How MCP Protocol Helps:

      FeatureBenefit
      Agent-Oriented ArchitectureEach agent handles a focused task, improving modularity and scalability.
      Event-Driven Message PassingAgents communicate based on triggers, not polling—leading to faster and efficient responses.
      Structured Communication FormatAll messages follow a standard format (e.g., JSON) with metadata for sender, recipient, type, and payload.
      State PreservationAgents maintain context across messages using memory (e.g., ConversationBufferMemory) to ensure coherence.

      How It Works (Step-by-Step):

      • 📥 User uploads or streams a video.
      • 🧑‍💻 MCP Protocol triggers the Transcription Agent to start converting audio into text.
      • 🌐 Translation Agent receives this text (if a different language is needed).
      • 🧾 Summarization Agent receives the translated or original transcript and generates a concise summary.
      • 📚 Research Agent checks for references or terminology used in the video.
      • 📄 Documentation Agent compiles the output into a structured report.
      • 🔁 All communication between agents flows through MCP, ensuring consistent message delivery and coordination.

      Now, let us understand the solution that we intend to implement for our solutions:

      This app provides live summarization and contextual insights from videos such as webinars, interviews, or YouTube recordings using multiple cooperating AI agents. These agents may include:

      • Transcription Agent: Converts spoken words to text.
      • Translation Agent: Translates text to different languages (if needed).
      • Summarization Agent: Generates concise summaries.
      • Research Agent: Finds background or supplementary data related to the discussion.
      • Documentation Agent: Converts outputs into structured reports or learning materials.

      We need to understand one more thing before deep diving into the code. Part of your conversation may be mixed, like part Hindi & part English. So, in that case, it will break the sentences into chunks & then convert all of them into the same language. Hence, the following rules are applied while translating the sentences –


      Now, we will go through the basic frame of the system & try to understand how it fits all the principles that we discussed above for this particular solution mapped against the specific technology –

      1. Documentation Agent built with the LangChain framework
      2. Research Agent built with the AutoGen framework
      3. MCP Broker for seamless communication between agents

      Let us understand from the given picture the flow of the process that our app is trying to implement –


      Great! So, now, we’ll focus on some of the key Python scripts & go through their key features.

      But, before that, we share the group of scripts that belong to specific tasks.

      • clsMCPMessage.py
      • clsMCPBroker.py
      • clsYouTubeVideoProcessor.py
      • clsLanguageDetector.py
      • clsTranslationAgent.py
      • clsTranslationService.py
      • clsDocumentationAgent.py
      • clsResearchAgent.py

      Now, we’ll review some of the script in this post, along with the next post, as a continuation from this post.

      class clsMCPMessage(BaseModel):
          """Message format for MCP protocol"""
          id: str = Field(default_factory=lambda: str(uuid.uuid4()))
          timestamp: float = Field(default_factory=time.time)
          sender: str
          receiver: str
          message_type: str  # "request", "response", "notification"
          content: Dict[str, Any]
          reply_to: Optional[str] = None
          conversation_id: str
          metadata: Dict[str, Any] = {}
          
      class clsMCPBroker:
          """Message broker for MCP protocol communication between agents"""
          
          def __init__(self):
              self.message_queues: Dict[str, queue.Queue] = {}
              self.subscribers: Dict[str, List[str]] = {}
              self.conversation_history: Dict[str, List[clsMCPMessage]] = {}
          
          def register_agent(self, agent_id: str) -> None:
              """Register an agent with the broker"""
              if agent_id not in self.message_queues:
                  self.message_queues[agent_id] = queue.Queue()
                  self.subscribers[agent_id] = []
          
          def subscribe(self, subscriber_id: str, publisher_id: str) -> None:
              """Subscribe an agent to messages from another agent"""
              if publisher_id in self.subscribers:
                  if subscriber_id not in self.subscribers[publisher_id]:
                      self.subscribers[publisher_id].append(subscriber_id)
          
          def publish(self, message: clsMCPMessage) -> None:
              """Publish a message to its intended receiver"""
              # Store in conversation history
              if message.conversation_id not in self.conversation_history:
                  self.conversation_history[message.conversation_id] = []
              self.conversation_history[message.conversation_id].append(message)
              
              # Deliver to direct receiver
              if message.receiver in self.message_queues:
                  self.message_queues[message.receiver].put(message)
              
              # Deliver to subscribers of the sender
              for subscriber in self.subscribers.get(message.sender, []):
                  if subscriber != message.receiver:  # Avoid duplicates
                      self.message_queues[subscriber].put(message)
          
          def get_message(self, agent_id: str, timeout: Optional[float] = None) -> Optional[clsMCPMessage]:
              """Get a message for the specified agent"""
              try:
                  return self.message_queues[agent_id].get(timeout=timeout)
              except (queue.Empty, KeyError):
                  return None
          
          def get_conversation_history(self, conversation_id: str) -> List[clsMCPMessage]:
              """Get the history of a conversation"""
              return self.conversation_history.get(conversation_id, [])

      Imagine a system where different virtual agents (like robots or apps) need to talk to each other. To do that, they send messages back and forth—kind of like emails or text messages. This code is responsible for:

      • Making sure those messages are properly written (like filling out all parts of a form).
      • Making sure messages are delivered to the right people.
      • Keeping a record of conversations so you can go back and review what was said.

      This part (clsMCPMessage) is like a template or a form that every message needs to follow. Each message has:

      • ID: A unique number so every message is different (like a serial number).
      • Time Sent: When the message was created.
      • Sender & Receiver: Who sent the message and who is supposed to receive it.
      • Type of Message: Is it a request, a response, or just a notification?
      • Content: The actual information or question the message is about.
      • Reply To: If this message is answering another one, this tells which one.
      • Conversation ID: So we know which group of messages belongs to the same conversation.
      • Extra Info (Metadata): Any other small details that might help explain the message.

      This (clsMCPBroker) is the system (or “post office”) that makes sure messages get to where they’re supposed to go. Here’s what it does:

      1. Registering an Agent

      • Think of this like signing up a new user in the system.
      • Each agent gets their own personal mailbox (called a “message queue”) so others can send them messages.

      2. Subscribing to Another Agent

      • If Agent A wants to receive copies of messages from Agent B, they can “subscribe” to B.
      • This is like signing up for B’s newsletter—whenever B sends something, A gets a copy.

      3. Sending a Message

      • When someone sends a message:
        • It is saved into a conversation history (like keeping emails in your inbox).
        • It is delivered to the main person it was meant for.
        • And, if anyone subscribed to the sender, they get a copy too—unless they’re already the main receiver (to avoid sending duplicates).

      4. Receiving Messages

      • Each agent can check their personal mailbox to see if they got any new messages.
      • If there are no messages, they’ll either wait for some time or move on.

      5. Viewing Past Conversations

      • You can look up all messages that were part of a specific conversation.
      • This is helpful for remembering what was said earlier.

      In systems where many different smart tools or services need to work together and communicate, this kind of communication system makes sure everything is:

      • Organized
      • Delivered correctly
      • Easy to trace back when needed

      So, in this post, we’ll finish it here. We’ll cover the rest of the post in the next post.

      I’ll bring some more exciting topics in the coming days from the Python verse.

      Till then, Happy Avenging!  🙂

      Monitoring & evaluating the leading LLMs (both the established & new) by Python-based evaluator

      As we’re leaping more & more into the field of Generative AI, one of the frequent questions or challenges people are getting more & more is the performance & other evaluation factors. These factors will eventually bring the fruit of this technology; otherwise, you will end up in technical debt.

      This post will discuss the key snippets of the monitoring app based on the Python-based AI app. But before that, let us first view the demo.

      Isn’t it exciting?


      Let us deep dive into it. But, here is the flow this solution will follow.

      So, the current application will invoke the industry bigshots and some relatively unknown or new LLMs.

      In this case, we’ll evaluate Anthropic, Open AI, DeepSeek, and Bharat GPT’s various models. However, Bharat GPT is open source, so we’ll use the Huggingface library and execute it locally against my MacBook Pro M4 Max.

      The following are the KPIs we’re going to evaluate:

      Here are the lists of dependant python packages that is require to run this application –

      pip install certifi==2024.8.30
      pip install anthropic==0.42.0
      pip install huggingface-hub==0.27.0
      pip install nltk==3.9.1
      pip install numpy==2.2.1
      pip install moviepy==2.1.1
      pip install numpy==2.1.3
      pip install openai==1.59.3
      pip install pandas==2.2.3
      pip install pillow==11.1.0
      pip install pip==24.3.1
      pip install psutil==6.1.1
      pip install requests==2.32.3
      pip install rouge_score==0.1.2
      pip install scikit-learn==1.6.0
      pip install setuptools==70.2.0
      pip install tokenizers==0.21.0
      pip install torch==2.6.0.dev20250104
      pip install torchaudio==2.6.0.dev20250104
      pip install torchvision==0.22.0.dev20250104
      pip install tqdm==4.67.1
      pip install transformers==4.47.1
          @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
          def get_claude_response(self, prompt: str) -> str:
              response = self.anthropic_client.messages.create(
                  model=anthropic_model,
                  max_tokens=maxToken,
                  messages=[{"role": "user", "content": prompt}]
              )
              return response.content[0].text
      1. The Retry Mechanism
        • The @retry line means this function will automatically try again if it fails.
        • It will stop retrying after 3 attempts (stop_after_attempt(3)).
        • It will wait longer between retries, starting at 4 seconds and increasing up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).
      2. The Function Purpose
        • The function takes a message, called prompt, as input (a string of text).
        • It uses a service (likely an AI system like Claude) to generate a response to this prompt.
      3. Sending the Message
        • Inside the function, the code self.anthropic_client.messages.create is the part that actually sends the prompt to the AI.
        • It specifies:Which AI model to use (e.g., anthropic_model).
        • The maximum length of the response (controlled by maxToken).
        • The input message for the AI has a “role” (user), as well as the content of the prompt.
      4. Getting the Response
        • Once the AI generates a response, it’s saved as response.
        • The code retrieves the first part of the response (response.content[0].text) and sends it back to whoever called the function.

      Similarly, it will work for Open AI as well.

          @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
          def get_deepseek_response(self, prompt: str) -> tuple:
              deepseek_api_key = self.deepseek_api_key
      
              headers = {
                  "Authorization": f"Bearer {deepseek_api_key}",
                  "Content-Type": "application/json"
                  }
              
              payload = {
                  "model": deepseek_model,  
                  "messages": [{"role": "user", "content": prompt}],
                  "max_tokens": maxToken
                  }
              
              response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload)
      
              if response.status_code == 200:
                  res = response.json()["choices"][0]["message"]["content"]
              else:
                  res = "API request failed with status code " + str(response.status_code) + ":" + str(response.text)
      
              return res
      1. Retry Mechanism:
        • The @retry line ensures the function will try again if it fails.
        • It will stop retrying after 3 attempts (stop_after_attempt(3)).
        • It waits between retries, starting at 4 seconds and increasing up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).

      1. What the Function Does:
        • The function takes one input, prompt, which is the message or question you want to send to the AI.
        • It returns the AI’s response or an error message.

      1. Preparing to Communicate with the API:
        • API Key: It gets the API key for the DeepSeek service from self.deepseek_api_key.
        • Headers: These tell the API that the request will use the API key (for security) and that the data format is JSON (structured text).
        • Payload: This is the information sent to the AI. It includes:
          • Model: Specifies which version of the AI to use (deepseek_model).
          • Messages: The input message with the role “user” and your prompt.
          • Max Tokens: Defines the maximum size of the AI’s response (maxToken).

      1. Sending the Request:
        • It uses the requests.post() method to send the payload and headers to the DeepSeek API using the URL DEEPSEEK_API_URL.

      1. Processing the Response:
        • If the API responds successfully (status_code == 200):
          • It extracts the AI’s reply from the response data.
          • Specifically, it gets the first choice’s message content: response.json()["choices"][0]["message"]["content"].
        • If there’s an error:
          • It constructs an error message with the status code and detailed error text from the API.

      1. Returning the Result:
        • The function outputs either the AI’s response or the error message.
          @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
          def get_bharatgpt_response(self, prompt: str) -> tuple:
              try:
                  messages = [[{"role": "user", "content": prompt}]]
                  
                  response = pipe(messages, max_new_tokens=maxToken,)
      
                  # Extract 'content' field safely
                  res = next((entry.get("content", "")
                              for entry in response[0][0].get("generated_text", [])
                              if isinstance(entry, dict) and entry.get("role") == "assistant"
                              ),
                              None,
                              )
                  
                  return res
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return ""
      1. Retry Mechanism:The @retry ensures the function will try again if it fails.
        • It will stop retrying after 3 attempts (stop_after_attempt(3)).
        • The waiting time between retries starts at 4 seconds and increases exponentially up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).
      2. What the Function Does:The function takes one input, prompt, which is the message or question you want to send to BharatGPT.
        • It returns the AI’s response or an empty string if something goes wrong.
      3. Sending the Prompt:Messages Structure: The function wraps the user’s prompt in a format that the BharatGPT AI understands:
        • messages = [[{"role": "user", "content": prompt}]]
        • This tells the AI that the prompt is coming from the “user.”
      4. Pipe Function: It uses a pipe() method to send the messages to the AI system.
        • max_new_tokens=maxToken: Limits how long the AI’s response can be.
      5. Extracting the Response:The response from the AI is in a structured format. The code looks for the first piece of text where:
        • The role is “assistant” (meaning it’s the AI’s reply).
        • The text is in the “content” field.
        • The next() function safely extracts this “content” field or returns None if it can’t find it.
      6. Error Handling:If something goes wrong (e.g., the AI doesn’t respond or there’s a technical issue), the code:
        • Captures the error message in e.
        • Prints the error message: print('Error: ', x).
        • Returns an empty string ("") instead of crashing.
      7. Returning the Result:If everything works, the function gives you the AI’s response as plain text.
        • If there’s an error, it gives you an empty string, indicating no response was received.

          def get_model_response(self, model_name: str, prompt: str) -> ModelResponse:
              """Get response from specified model with metrics"""
              start_time = time.time()
              start_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
      
              try:
                  if model_name == "claude-3":
                      response_content = self.get_claude_response(prompt)
                  elif model_name == "gpt4":
                      response_content = self.get_gpt4_response(prompt)
                  elif model_name == "deepseek-chat":
                      response_content = self.get_deepseek_response(prompt)
                  elif model_name == "bharat-gpt":
                      response_content = self.get_bharatgpt_response(prompt)
      
                  # Model-specific API calls 
                  token_count = len(self.bert_tokenizer.encode(response_content))
                  
                  end_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
                  memory_usage = end_memory - start_memory
                  
                  return ModelResponse(
                      content=response_content,
                      response_time=time.time() - start_time,
                      token_count=token_count,
                      memory_usage=memory_usage
                  )
              except Exception as e:
                  logging.error(f"Error getting response from {model_name}: {str(e)}")
                  return ModelResponse(
                      content="",
                      response_time=0,
                      token_count=0,
                      memory_usage=0,
                      error=str(e)
                  )

      Start Tracking Time and Memory:

        • The function starts a timer (start_time) to measure how long it takes to get a response.
        • It also checks how much memory is being used at the beginning (start_memory).

        Choose the AI Model:

        • Based on the model_name provided, the function selects the appropriate method to get a response:
          • "claude-3" → Calls get_claude_response(prompt).
          • "gpt4" → Calls get_gpt4_response(prompt).
          • "deepseek-chat" → Calls get_deepseek_response(prompt).
          • "bharat-gpt" → Calls get_bharatgpt_response(prompt).

        Process the Response:

        • Once the response is received, the function calculates:
          • Token Count: The number of tokens (small chunks of text) in the response using a tokenizer.
          • Memory Usage: The difference between memory usage after the response (end_memory) and before it (start_memory).

        Return the Results:

        • The function bundles all the information into a ModelResponse object:
          • The AI’s reply (content).
          • How long the response took (response_time).
          • The number of tokens in the reply (token_count).
          • How much memory was used (memory_usage).

        Handle Errors:

        • If something goes wrong (e.g., the AI doesn’t respond), the function:
          • Logs the error message.
          • Returns an empty response with default values and the error message.
            def evaluate_text_quality(self, generated: str, reference: str) -> Dict[str, float]:
                """Evaluate text quality metrics"""
                # BERTScore
                gen_embedding = self.sentence_model.encode([generated])
                ref_embedding = self.sentence_model.encode([reference])
                bert_score = cosine_similarity(gen_embedding, ref_embedding)[0][0]
        
                # BLEU Score
                generated_tokens = word_tokenize(generated.lower())
                reference_tokens = word_tokenize(reference.lower())
                bleu = sentence_bleu([reference_tokens], generated_tokens)
        
                # METEOR Score
                meteor = meteor_score([reference_tokens], generated_tokens)
        
                return {
                    'bert_score': bert_score,
                    'bleu_score': bleu,
                    'meteor_score': meteor
                }

        Inputs:

        • generated: The text produced by the AI.
        • reference: The correct or expected version of the text.

        Calculating BERTScore:

        • Converts the generated and reference texts into numerical embeddings (mathematical representations) using a pre-trained model (self.sentence_model.encode).
        • Measures the similarity between the two embeddings using cosine similarity. This gives the bert_score, which ranges from -1 (completely different) to 1 (very similar).

        Calculating BLEU Score:

        • Breaks the generated and reference texts into individual words (tokens) using word_tokenize.
        • Converts both texts to lowercase for consistent comparison.
        • Calculates the BLEU Score (sentence_bleu), which checks how many words or phrases in the generated text overlap with the reference. BLEU values range from 0 (no match) to 1 (perfect match).

        Calculating METEOR Score:

        • Also uses the tokenized versions of generated and reference texts.
        • Calculates the METEOR Score (meteor_score), which considers exact matches, synonyms, and word order. Scores range from 0 (no match) to 1 (perfect match).

        Returning the Results:

        • Combines the three scores into a dictionary with the keys 'bert_score''bleu_score', and 'meteor_score'.

        Similarly, other functions are developed.

            def run_comprehensive_evaluation(self, evaluation_data: List[Dict]) -> pd.DataFrame:
                """Run comprehensive evaluation on all metrics"""
                results = []
                
                for item in evaluation_data:
                    prompt = item['prompt']
                    reference = item['reference']
                    task_criteria = item.get('task_criteria', {})
                    
                    for model_name in self.model_configs.keys():
                        # Get multiple responses to evaluate reliability
                        responses = [
                            self.get_model_response(model_name, prompt)
                            for _ in range(3)  # Get 3 responses for reliability testing
                        ]
                        
                        # Use the best response for other evaluations
                        best_response = max(responses, key=lambda x: len(x.content) if not x.error else 0)
                        
                        if best_response.error:
                            logging.error(f"Error in model {model_name}: {best_response.error}")
                            continue
                        
                        # Gather all metrics
                        metrics = {
                            'model': model_name,
                            'prompt': prompt,
                            'response': best_response.content,
                            **self.evaluate_text_quality(best_response.content, reference),
                            **self.evaluate_factual_accuracy(best_response.content, reference),
                            **self.evaluate_task_performance(best_response.content, task_criteria),
                            **self.evaluate_technical_performance(best_response),
                            **self.evaluate_reliability(responses),
                            **self.evaluate_safety(best_response.content)
                        }
                        
                        # Add business impact metrics using task performance
                        metrics.update(self.evaluate_business_impact(
                            best_response,
                            metrics['task_completion']
                        ))
                        
                        results.append(metrics)
                
                return pd.DataFrame(results)
        • Input:
          • evaluation_data: A list of test cases, where each case is a dictionary containing:
            • prompt: The question or input to the AI model.
            • reference: The ideal or expected answer.
            • task_criteria (optional): Additional rules or requirements for the task.
        • Initialize Results:
          • An empty list results is created to store the evaluation metrics for each model and test case.
        • Iterate Through Test Cases:
          • For each item in the evaluation_data:
            • Extract the promptreference, and task_criteria.
        • Evaluate Each Model:
          • Loop through all available AI models (self.model_configs.keys()).
          • Generate three responses for each model to test reliability.
        • Select the Best Response:
          • Out of the three responses, pick the one with the most content (best_response), ignoring responses with errors.
        • Handle Errors:
          • If a response has an error, log the issue and skip further evaluation for that model.
        • Evaluate Metrics:
          • Using the best_response, calculate a variety of metrics, including:
            • Text Quality: How similar the response is to the reference.
            • Factual Accuracy: Whether the response is factually correct.
            • Task Performance: How well it meets task-specific criteria.
            • Technical Performance: Evaluate time, memory, or other system-related metrics.
            • Reliability: Check consistency across multiple responses.
            • Safety: Ensure the response is safe and appropriate.
        • Evaluate Business Impact:
          • Add metrics for business impact (e.g., how well the task was completed, using task_completion as a key factor).
        • Store Results:
          • Add the calculated metrics for this model and prompt to the results list.
        • Return Results as a DataFrame:
          • Convert the results list into a structured table (a pandas DataFrame) for easy analysis and visualization.

        Great! So, now, we’ve explained the code.

        Let us understand the final outcome of this run & what we can conclude from that.

        1. BERT Score (Semantic Understanding):
          • GPT4 leads slightly at 0.8322 (83.22%)
          • Bharat-GPT close second at 0.8118 (81.18%)
          • Claude-3 at 0.8019 (80.19%)
          • DeepSeek-Chat at 0.7819 (78.19%) Think of this like a “comprehension score” – how well the models understand the context. All models show strong understanding, with only a 5% difference between best and worst.
        2. BLEU Score (Word-for-Word Accuracy):
          • Bharat-GPT leads at 0.0567 (5.67%)
          • Claude-3 at 0.0344 (3.44%)
          • GPT4 at 0.0306 (3.06%)
          • DeepSeek-Chat lowest at 0.0189 (1.89%) These low scores suggest models use different wording than references, which isn’t necessarily bad.
        3. METEOR Score (Meaning Preservation):
          • Bharat-GPT leads at 0.4684 (46.84%)
          • Claude-3 close second at 0.4507 (45.07%)
          • GPT4 at 0.2960 (29.60%)
          • DeepSeek-Chat at 0.2652 (26.52%) This shows how well models maintain meaning while using different words.
        4. Response Time (Speed):
          • Claude-3 fastest: 4.40 seconds
          • Bharat-GPT: 6.35 seconds
          • GPT4: 6.43 seconds
          • DeepSeek-Chat slowest: 8.52 seconds
        5. Safety and Reliability:
          • Error Rate: Perfect 0.0 for all models
          • Toxicity: All very safe (below 0.15%) 
            • Claude-3 safest at 0.0007GPT4 at 0.0008Bharat-GPT at 0.0012
            • DeepSeek-Chat at 0.0014
        6. Cost Efficiency:
          • Claude-3 most economical: $0.0019 per response
          • Bharat-GPT close: $0.0021
          • GPT4: $0.0038
          • DeepSeek-Chat highest: $0.0050

        Key Takeaways by Model:

        1. Claude-3: ✓ Fastest responses ✓ Most cost-effective ✓ Excellent meaning preservation ✓ Lowest toxicity
        2. Bharat-GPT: ✓ Best BLEU and METEOR scores ✓ Strong semantic understanding ✓ Cost-effective ✗ Moderate response time
        3. GPT4: ✓ Best semantic understanding ✓ Good safety metrics ✗ Higher cost ✗ Moderate response time
        4. DeepSeek-Chat: ✗ Generally lower performance ✗ Slowest responses ✗ Highest cost ✗ Slightly higher toxicity

        Reliability of These Statistics:

        Strong Points:

        • Comprehensive metric coverage
        • Consistent patterns across evaluations
        • Zero error rates show reliability
        • Clear differentiation between models

        Limitations:

        • BLEU scores are quite low across all models
        • Doesn’t measure creative or innovative responses
        • May not reflect specific use case performance
        • Single snapshot rather than long-term performance

        Final Observation:

        1. Best Overall Value: Claude-3
          • Fast, cost-effective, safe, good performance
        2. Best for Accuracy: Bharat-GPT
          • Highest meaning preservation and precision
        3. Best for Understanding: GPT4
          • Strongest semantic comprehension
        4. Consider Your Priorities: 
          • Speed → Choose Claude-3
          • Cost → Choose Claude-3 or Bharat-GPT
          • Accuracy → Choose Bharat-GPT
          • Understanding → Choose GPT4

        These statistics provide reliable comparative data but should be part of a broader decision-making process that includes your specific needs, budget, and use cases.


        For the Bharat GPT model, we’ve tested this locally on my MacBook Pro 4 Max. And, the configuration is as follows –

        I’ve tried the API version locally, & it provided a similar performance against the stats that we received by running locally. Unfortunately, they haven’t made the API version public yet.

        So, apart from the Anthropic & Open AI, I’ll watch this new LLM (Bharat GPT) for overall stats in the coming days.


        So, we’ve done it.

        You can find the detailed code at the GitHub link.

        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Building solutions using LLM AutoGen in Python – Part 3

        Before we dive into the details of this post, let us provide the previous two links that precede it.

        Building solutions using LLM AutoGen in Python – Part 1

        Building solutions using LLM AutoGen in Python – Part 2

        For, reference, we’ll share the demo before deep dive into the actual follow-up analysis in the below section –


        In this post, we will understand the initial code generated & then the revised code to compare them for a better understanding of the impact of revised prompts.

        But, before that let us broadly understand the communication types between the agents.

        • Agents InvolvedAgent1Agent2
        • Flow:
          • Agent1 sends a request directly to Agent2.
          • Agent2 processes the request and sends the response back to Agent1.
        • Use Case: Simple query-response interactions without intermediaries.
        • Agents InvolvedUserAgentMediatorSpecialistAgent1SpecialistAgent2
        • Flow:
          • UserAgent sends input to Mediator.
          • Mediator delegates tasks to SpecialistAgent1 and SpecialistAgent2.
          • Specialists process tasks and return results to Mediator.
          • Mediator consolidates results and sends them back to UserAgent.
        • Agents InvolvedBroadcasterAgentAAgentBAgentC
        • Flow:
          • Broadcaster sends a message to multiple agents simultaneously.
          • Agents that find the message relevant (AgentAAgentC) acknowledge or respond.
        • Use Case: System-wide notifications or alerts.
        • Agents InvolvedSupervisorWorker1Worker2
        • Flow:
          • Supervisor assigns tasks to Worker1 and Worker2.
          • Workers execute tasks and report progress back to Supervisor.
        • Use Case: Task delegation in structured organizations.
        • Agents InvolvedPublisherSubscriber1Topic
        • Flow:
          • Publisher publishes an event or message to a Topic.
          • Subscriber1, who is subscribed to the Topic, receives the event.
        • Use Case: Decoupled systems where publishers and subscribers do not need direct knowledge of each other.
        • Agents InvolvedTriggerEventReactiveAgentNextStep
        • Flow:
          • An event occurs (TriggerEvent).
          • ReactiveAgent detects the event and acts.
          • The action leads to the NextStep in the process.
        • Use Case: Systems that need to respond to asynchronous events or changes in the environment.

        Since, we now understand the basic communication types. Let us understand the AutoGen generated first code & the last code (That satisfies our need) –

        # filename: simple_snake.py (Generated by AutoGen)
        
        import pygame
        import time
        import random
         
        snake_speed = 15
         
        # Window color
        white = pygame.Color(255, 255, 255)
         
        # Snake color
        green = pygame.Color(0, 255, 0)
         
        snake_position = [100, 50]
         
        # defining first 4 blocks 
        # of snake body
        snake_body = [ [100, 50], 
                       [90, 50],
                       [80, 50],
                       [70, 50]
                    ]
        # fruit position
        fruit_position = [random.randrange(1, (1000//10)) * 10, 
                          random.randrange(1, (600//10)) * 10]
        fruit_spawn = True
         
        direction = 'RIGHT'
        change_to = direction
         
        score = 0
         
        # Initialising pygame
        pygame.init()
         
        # Initialise game window
        win = pygame.display.set_mode((1000, 600))
        pygame.display.set_caption("Snake game for kids")
         
        # FPS (frames per second) controller
        fps_controller = pygame.time.Clock()
         
          
        while True:
            # handling key events
            for event in pygame.event.get():
                if event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_UP:
                        change_to = 'UP'
                    if event.key == pygame.K_DOWN:
                        change_to = 'DOWN'
                    if event.key == pygame.K_LEFT:
                        change_to = 'LEFT'
                    if event.key == pygame.K_RIGHT:
                        change_to = 'RIGHT'
        
            # If two keys pressed simultaneously
            # we don't want snake to move into two
            # directions simultaneously
            if change_to == 'UP' and direction != 'DOWN':
                direction = 'UP'
            if change_to == 'DOWN' and direction != 'UP':
                direction = 'DOWN'
            if change_to == 'LEFT' and direction != 'RIGHT':
                direction = 'LEFT'
            if change_to == 'RIGHT' and direction != 'LEFT':
                direction = 'RIGHT'
         
            # Moving the snake
            if direction == 'UP':
                snake_position[1] -= 10
            if direction == 'DOWN':
                snake_position[1] += 10
            if direction == 'LEFT':
                snake_position[0] -= 10
            if direction == 'RIGHT':
                snake_position[0] += 10
         
            # Snake body growing mechanism
            # if fruits and snakes collide then scores
            # will increase by 10
            snake_body.insert(0, list(snake_position))
            if snake_position[0] == fruit_position[0] and snake_position[1] == fruit_position[1]:
                score += 10
                fruit_spawn = False
            else:
                snake_body.pop()
                 
            if not fruit_spawn:
                fruit_position = [random.randrange(1, (1000//10)) * 10, 
                                  random.randrange(1, (600//10)) * 10]
                 
            fruit_spawn = True
            win.fill(white)
            
            for pos in snake_body:
                pygame.draw.rect(win, green, pygame.Rect(pos[0], pos[1], 10, 10))
            pygame.draw.rect(win, white, pygame.Rect(
            fruit_position[0], fruit_position[1], 10, 10))
         
            # Game Over conditions
            if snake_position[0] < 0 or snake_position[0] > 1000-10:
                break
            if snake_position[1] < 0 or snake_position[1] > 600-10:
                break
         
            # Touching the snake body
            for block in snake_body[1:]:
                if snake_position[0] == block[0] and snake_position[1] == block[1]:
                    break
            
            # refresh game screen
            pygame.display.update()
        
            # Frame Per Second /Refresh rate
            fps_controller.tick(snake_speed)
        
        # displaying final score after game over
        print(f"You scored {score} in the game.")

        Now, let us remember the prompt refinement that we’ve done (We’ve already posted that in our last post as well) –

        I want to place the snake within the boundary of the visual wall & it should bounce each wall & then hit the target. And, it should increase the size. And, every time when it hits the target it will increase the score by 10. And, the application will run for 2 minutes, which will be displayed on the screen. This will exit once 2 minute is over.
        # filename: simple_snake.py (Generated by AutoGen)
        
        import pygame
        import time
        import random
        import math
        
        pygame.init()
        
        white = (255, 255, 255)
        yellow = (255, 255, 102)
        green = (0, 255, 0)
        red = (255, 0, 0)
        black = (0, 0, 0)
        blue = (0, 0, 255)
        
        dis_width = 800
        dis_height = 600
        
        dis = pygame.display.set_mode((dis_width, dis_height))
        pygame.display.set_caption('Snake Game')
        
        clock = pygame.time.Clock()
        snake_block = 10
        snake_speed = 30
        font_style = pygame.font.SysFont(None, 50)
        score_font = pygame.font.SysFont(None, 35)
        
        def our_snake(snake_block, snake_List):
            for x in snake_List:
                pygame.draw.rect(dis, green, [x[0], x[1], snake_block, snake_block])
        
        def message(msg,color):
            mesg = font_style.render(msg, True, color)
            dis.blit(mesg, [dis_width / 3, dis_height / 3])
        
        def gameLoop():  # creating a function
            game_over = False
            game_close = False
        
            # snake starting coordinates
            x1 = dis_width / 2
            y1 = dis_height / 2
        
            # snake initial movement direction
            x1_change = 0
            y1_change = 0
        
            # initialize snake length and list of coordinates
            snake_List = []
            Length_of_snake = 1
        
            # random starting point for the food
            foodx = round(random.randrange(0, dis_width - snake_block) / 10.0) * 10.0
            foody = round(random.randrange(0, dis_height - snake_block) / 10.0) * 10.0
        
            # initialize score
            score = 0
        
            # store starting time
            start_time = time.time()
        
            while not game_over:
        
                # Remaining time
                elapsed_time = time.time() - start_time
                remaining_time = 120 - elapsed_time  # 2 minutes game
                if remaining_time <= 0:
                    game_over = True
        
                # event handling loop
                for event in pygame.event.get():
                    if event.type == pygame.QUIT:
                        game_over = True  # when closing window
                    if event.type == pygame.MOUSEBUTTONUP:
                        # get mouse click coordinates
                        pos = pygame.mouse.get_pos()
        
                        # calculate new direction vector from snake to click position
                        x1_change = pos[0] - x1
                        y1_change = pos[1] - y1
        
                        # normalize direction vector
                        norm = math.sqrt(x1_change ** 2 + y1_change ** 2)
                        if norm != 0:
                            x1_change /= norm
                            y1_change /= norm
        
                        # multiply direction vector by step size
                        x1_change *= snake_block
                        y1_change *= snake_block
        
                x1 += x1_change
                y1 += y1_change
                dis.fill(white)
                pygame.draw.rect(dis, red, [foodx, foody, snake_block, snake_block])
                pygame.draw.rect(dis, green, [x1, y1, snake_block, snake_block])
                snake_Head = []
                snake_Head.append(x1)
                snake_Head.append(y1)
                snake_List.append(snake_Head)
                if len(snake_List) > Length_of_snake:
                    del snake_List[0]
        
                our_snake(snake_block, snake_List)
        
                # Bounces the snake back if it hits the edge
                if x1 < 0 or x1 > dis_width:
                    x1_change *= -1
                if y1 < 0 or y1 > dis_height:
                    y1_change *= -1
        
                # Display score
                value = score_font.render("Your Score: " + str(score), True, black)
                dis.blit(value, [0, 0])
        
                # Display remaining time
                time_value = score_font.render("Remaining Time: " + str(int(remaining_time)), True, blue)
                dis.blit(time_value, [0, 30])
        
                pygame.display.update()
        
                # Increase score and length of snake when snake gets the food
                if abs(x1 - foodx) < snake_block and abs(y1 - foody) < snake_block:
                    foodx = round(random.randrange(0, dis_width - snake_block) / 10.0) * 10.0
                    foody = round(random.randrange(0, dis_height - snake_block) / 10.0) * 10.0
                    Length_of_snake += 1
                    score += 10
        
                # Snake movement speed
                clock.tick(snake_speed)
        
            pygame.quit()
            quit()
        
        gameLoop()
        

        Now, let us understand the difference here –

        The first program is a snake game controlled by arrow keys that end if the Snake hits a wall or itself. The second game uses mouse clicks for control, bounces off walls instead of ending, includes a 2-minute timer, and displays the remaining time.

        So, we’ve done it. 🙂

        You can find the detailed code in the following Github link.


        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Building & deploying a RAG architecture rapidly using Langflow & Python

        I’ve been looking for a solution that can help deploy any RAG solution involving Python faster. It would be more effective if an available UI helped deliver the solution faster. And, here comes the solution that does exactly what I needed – “LangFlow.”

        Before delving into the details, I strongly recommend taking a look at the demo. It’s a great way to get a comprehensive understanding of LangFlow and its capabilities in deploying RAG architecture rapidly.

        Demo

        This describes the entire architecture; hence, I’ll share the architecture components I used to build the solution.

        To know more about RAG-Architecture, please refer to the following link.

        As we all know, we can parse the data from the source website URL (in this case, I’m referring to my photography website to extract the text of one of my blogs) and then embed it into the newly created Astra DB & new collection, where I will be storing the vector embeddings.

        As you can see from the above diagram, the flow that I configured within 5 minutes and the full functionality of writing a complete solution (underlying Python application) within no time that extracts chunks, converts them into embeddings, and finally stores them inside the Astra DB.

        Now, let us understand the next phase, where, based on the ask from a chatbot, I need to convert that question into Vector DB & then find the similarity search to bring the relevant vectors as shown below –

        You need to configure this entire flow by dragging the necessary widgets from the left-side panel as marked in the Blue-Box shown below –

        For this specific use case, we’ve created an instance of Astra DB & then created an empty vector collection. Also, we need to ensure that we generate the API-Key & and provide the right roles assigned with the token. After successfully creating the token, you need to copy the endpoint, token & collection details & paste them into the desired fields of the Astra-DB components inside the LangFlow. Think of it as a framework where one needs to provide all the necessary information to build & run the entire flow successfully.

        Following are some of the important snapshots from the Astra-DB –

        Step – 1

        Step – 2

        Once you run the vector DB population, this will insert extracted text & then convert it into vectors, which will show in the following screenshot –

        You can see the sample vectors along with the text chunks inside the Astra DB data explorer as shown below –

        Some of the critical components are highlighted in the Blue-box which is important for us to monitor the vector embeddings.

        Now, here is how you can modify the current Python code of any available widgets or build your own widget by using the custom widget.

        The first step is to click the code button highlighted in the Red-box as shown below –

        The next step is when you click that button, which will open the detailed Python code representing the entire widget build & its functionality. This button is the place where you can add, modify, or keep it as it is depending upon your need, which will shown below –

        Once one builds the entire solution, you must click the final compile button (shown in the red box), which will eventually compile all the individual widgets. However, you can build the compile button for the individual widgets as soon as you make the solution. So you can pinpoint any potential problems at that very step.

        Let us understand one sample code of a widget. In this case, we will take vector embedding insertion into the Astra DB. Let us see the code –

        from typing import List, Optional, Union
        from langchain_astradb import AstraDBVectorStore
        from langchain_astradb.utils.astradb import SetupMode
        
        from langflow.custom import CustomComponent
        from langflow.field_typing import Embeddings, VectorStore
        from langflow.schema import Record
        from langchain_core.retrievers import BaseRetriever
        
        
        class AstraDBVectorStoreComponent(CustomComponent):
            display_name = "Astra DB"
            description = "Builds or loads an Astra DB Vector Store."
            icon = "AstraDB"
            field_order = ["token", "api_endpoint", "collection_name", "inputs", "embedding"]
        
            def build_config(self):
                return {
                    "inputs": {
                        "display_name": "Inputs",
                        "info": "Optional list of records to be processed and stored in the vector store.",
                    },
                    "embedding": {"display_name": "Embedding", "info": "Embedding to use"},
                    "collection_name": {
                        "display_name": "Collection Name",
                        "info": "The name of the collection within Astra DB where the vectors will be stored.",
                    },
                    "token": {
                        "display_name": "Token",
                        "info": "Authentication token for accessing Astra DB.",
                        "password": True,
                    },
                    "api_endpoint": {
                        "display_name": "API Endpoint",
                        "info": "API endpoint URL for the Astra DB service.",
                    },
                    "namespace": {
                        "display_name": "Namespace",
                        "info": "Optional namespace within Astra DB to use for the collection.",
                        "advanced": True,
                    },
                    "metric": {
                        "display_name": "Metric",
                        "info": "Optional distance metric for vector comparisons in the vector store.",
                        "advanced": True,
                    },
                    "batch_size": {
                        "display_name": "Batch Size",
                        "info": "Optional number of records to process in a single batch.",
                        "advanced": True,
                    },
                    "bulk_insert_batch_concurrency": {
                        "display_name": "Bulk Insert Batch Concurrency",
                        "info": "Optional concurrency level for bulk insert operations.",
                        "advanced": True,
                    },
                    "bulk_insert_overwrite_concurrency": {
                        "display_name": "Bulk Insert Overwrite Concurrency",
                        "info": "Optional concurrency level for bulk insert operations that overwrite existing records.",
                        "advanced": True,
                    },
                    "bulk_delete_concurrency": {
                        "display_name": "Bulk Delete Concurrency",
                        "info": "Optional concurrency level for bulk delete operations.",
                        "advanced": True,
                    },
                    "setup_mode": {
                        "display_name": "Setup Mode",
                        "info": "Configuration mode for setting up the vector store, with options likeSync,Async, orOff”.",
                        "options": ["Sync", "Async", "Off"],
                        "advanced": True,
                    },
                    "pre_delete_collection": {
                        "display_name": "Pre Delete Collection",
                        "info": "Boolean flag to determine whether to delete the collection before creating a new one.",
                        "advanced": True,
                    },
                    "metadata_indexing_include": {
                        "display_name": "Metadata Indexing Include",
                        "info": "Optional list of metadata fields to include in the indexing.",
                        "advanced": True,
                    },
                    "metadata_indexing_exclude": {
                        "display_name": "Metadata Indexing Exclude",
                        "info": "Optional list of metadata fields to exclude from the indexing.",
                        "advanced": True,
                    },
                    "collection_indexing_policy": {
                        "display_name": "Collection Indexing Policy",
                        "info": "Optional dictionary defining the indexing policy for the collection.",
                        "advanced": True,
                    },
                }
        
            def build(
                self,
                embedding: Embeddings,
                token: str,
                api_endpoint: str,
                collection_name: str,
                inputs: Optional[List[Record]] = None,
                namespace: Optional[str] = None,
                metric: Optional[str] = None,
                batch_size: Optional[int] = None,
                bulk_insert_batch_concurrency: Optional[int] = None,
                bulk_insert_overwrite_concurrency: Optional[int] = None,
                bulk_delete_concurrency: Optional[int] = None,
                setup_mode: str = "Sync",
                pre_delete_collection: bool = False,
                metadata_indexing_include: Optional[List[str]] = None,
                metadata_indexing_exclude: Optional[List[str]] = None,
                collection_indexing_policy: Optional[dict] = None,
            ) -> Union[VectorStore, BaseRetriever]:
                try:
                    setup_mode_value = SetupMode[setup_mode.upper()]
                except KeyError:
                    raise ValueError(f"Invalid setup mode: {setup_mode}")
                if inputs:
                    documents = [_input.to_lc_document() for _input in inputs]
        
                    vector_store = AstraDBVectorStore.from_documents(
                        documents=documents,
                        embedding=embedding,
                        collection_name=collection_name,
                        token=token,
                        api_endpoint=api_endpoint,
                        namespace=namespace,
                        metric=metric,
                        batch_size=batch_size,
                        bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                        bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                        bulk_delete_concurrency=bulk_delete_concurrency,
                        setup_mode=setup_mode_value,
                        pre_delete_collection=pre_delete_collection,
                        metadata_indexing_include=metadata_indexing_include,
                        metadata_indexing_exclude=metadata_indexing_exclude,
                        collection_indexing_policy=collection_indexing_policy,
                    )
                else:
                    vector_store = AstraDBVectorStore(
                        embedding=embedding,
                        collection_name=collection_name,
                        token=token,
                        api_endpoint=api_endpoint,
                        namespace=namespace,
                        metric=metric,
                        batch_size=batch_size,
                        bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                        bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                        bulk_delete_concurrency=bulk_delete_concurrency,
                        setup_mode=setup_mode_value,
                        pre_delete_collection=pre_delete_collection,
                        metadata_indexing_include=metadata_indexing_include,
                        metadata_indexing_exclude=metadata_indexing_exclude,
                        collection_indexing_policy=collection_indexing_policy,
                    )
        
                return vector_store
        

        Method: build_config:

        • This method defines the configuration options for the component.
        • Each configuration option includes a display_name and info, which provides details about the option.
        • Some options are marked as advanced, indicating they are optional and more complex.

        Method: build:

        • This method is used to create an instance of the Astra DB Vector Store.
        • It takes several parameters, including embedding, token, api_endpoint, collection_name, and various optional parameters.
        • It converts the setup_mode string to an enum value.
        • If inputs are provided, they are converted to a format suitable for storing in the vector store.
        • Depending on whether inputs are provided, a new vector store from documents can be created, or an empty vector store can be initialized with the given configurations.
        • Finally, it returns the created vector store instance.

        And, here is the the screenshot of your run –

        And, this is the last steps to run the Integrated Chatbot as shown below –

        As one can see the left side highlighted shows the reference text & chunks & the right side actual response.


        So, we’ve done it. And, you know the fun fact. I did this entire workflow within 35 minutes alone. 😛

        I’ll bring some more exciting topics in the coming days from the Python verse.

        To learn more about LangFlow, please click here.

        To learn about Astra DB, you need to click the following link.

        To learn about my blog & photography, you can click the following url.

        Till then, Happy Avenging!  🙂

        Building a real-time Gen AI Improvement Matrices (GAIIM) using Python, UpTrain, Open AI & React

        How does the RAG work better for various enterprise-level Gen AI use cases? What needs to be there to make the LLM model work more efficiently & able to check the response & validate their response, including the bias, hallucination & many more?

        This is my post (after a slight GAP), which will capture and discuss some of the burning issues that many AI architects are trying to explore. In this post, I’ve considered a newly formed AI start-up from India, which developed an open-source framework that can easily evaluate all the challenges that one is facing with their LLMs & easily integrate with your existing models for better understanding including its limitations. You will get plenty of insights about it.

        But, before we dig deep, why not see the demo first –

        Isn’t it exciting? Let’s deep dive into the flow of events.


        Let’s explore the broad-level architecture/flow –

        Let us understand the steps of the above architecture. First, our Python application needs to trigger and enable the API, which will interact with the Open AI and UpTrain AI to fetch all the LLM KPIs based on the input from the React app named “Evaluation.”

        Once the response is received from UpTrain AI, the Python application then organizes the results in a better readable manner without changing the core details coming out from their APIs & then shares that back with the react interface.

        Let’s examine the react app’s sample inputs to better understand the input that will be passed to the Python-based API solution, which is wrapper capability to call multiple APIs from the UpTrain & then accumulate them under one response by parsing the data & reorganizing the data with the help of Open AI & sharing that back.

        Highlighted in RED are some of the critical inputs you need to provide to get most of the KPIs. And, here are the sample text inputs for your reference –

        Q. Enter input question.
        A. What are the four largest moons of Jupiter?
        Q. Enter the context document.
        A. Jupiter, the largest planet in our solar system, boasts a fascinating array of moons. Among these, the four largest are collectively known as the Galilean moons, named after the renowned astronomer Galileo Galilei, who first observed them in 1610. These four moons, Io, Europa, Ganymede, and Callisto, hold significant scientific interest due to their unique characteristics and diverse geological features.
        Q. Enter LLM response.
        A. The four largest moons of Jupiter, known as the Galilean moons, are Io, Europa, Ganymede, and Marshmello.
        Q. Enter the persona response.
        A. strict and methodical teacher
        Q. Enter the guideline.
        A. Response shouldn’t contain any specific numbers
        Q. Enter the ground truth.
        A. The Jupiter is the largest & gaseous planet in the solar system.
        Q. Choose the evaluation method.
        A. llm

        Once you fill in the App should look like this –

        Once you fill in, the app should look like the below screenshot –


        Let us understand the sample packages that are required for this task.

        pip install Flask==3.0.3
        pip install Flask-Cors==4.0.0
        pip install numpy==1.26.4
        pip install openai==1.17.0
        pip install pandas==2.2.2
        pip install uptrain==0.6.13

        Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.

        def askFeluda(context, question):
            try:
                # Combine the context and the question into a single prompt.
                prompt_text = f"{context}\n\n Question: {question}\n Answer:"
        
                # Retrieve conversation history from the session or database
                conversation_history = []
        
                # Add the new message to the conversation history
                conversation_history.append(prompt_text)
        
                # Call OpenAI API with the updated conversation
                response = client.with_options(max_retries=0).chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": prompt_text,
                        }
                    ],
                    model=cf.conf['MODEL_NAME'],
                    max_tokens=150,  # You can adjust this based on how long you expect the response to be
                    temperature=0.3,  # Adjust for creativity. Lower values make responses more focused and deterministic
                    top_p=1,
                    frequency_penalty=0,
                    presence_penalty=0
                )
        
                # Extract the content from the first choice's message
                chat_response = response.choices[0].message.content
        
                # Print the generated response text
                return chat_response.strip()
            except Exception as e:
                return f"An error occurred: {str(e)}"

        This function will ask the supplied questions with contexts or it will supply the UpTrain results to summarize the JSON into more easily readable plain texts. For our test, we’ve used “gpt-3.5-turbo”.

        def evalContextRelevance(question, context, resFeluda, personaResponse):
            try:
                data = [{
                    'question': question,
                    'context': context,
                    'response': resFeluda
                }]
        
                results = eval_llm.evaluate(
                    data=data,
                    checks=[Evals.CONTEXT_RELEVANCE, Evals.FACTUAL_ACCURACY, Evals.RESPONSE_COMPLETENESS, Evals.RESPONSE_RELEVANCE, CritiqueTone(llm_persona=personaResponse), Evals.CRITIQUE_LANGUAGE, Evals.VALID_RESPONSE, Evals.RESPONSE_CONCISENESS]
                )
        
                return results
            except Exception as e:
                x = str(e)
        
                return x

        The above methods initiate the model from UpTrain to get all the stats, which will be helpful for your LLM response. In this post, we’ve captured the following KPIs –

        - Context Relevance Explanation
        - Factual Accuracy Explanation
        - Guideline Adherence Explanation
        - Response Completeness Explanation
        - Response Fluency Explanation
        - Response Relevance Explanation
        - Response Tonality Explanation
        # Function to extract and print all the keys and their values
        def extractPrintedData(data):
            for entry in data:
                print("Parsed Data:")
                for key, value in entry.items():
        
        
                    if key == 'score_context_relevance':
                        s_1_key_val = value
                    elif key == 'explanation_context_relevance':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_1_val = cleaned_value
                    elif key == 'score_factual_accuracy':
                        s_2_key_val = value
                    elif key == 'explanation_factual_accuracy':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_2_val = cleaned_value
                    elif key == 'score_response_completeness':
                        s_3_key_val = value
                    elif key == 'explanation_response_completeness':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_3_val = cleaned_value
                    elif key == 'score_response_relevance':
                        s_4_key_val = value
                    elif key == 'explanation_response_relevance':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_4_val = cleaned_value
                    elif key == 'score_critique_tone':
                        s_5_key_val = value
                    elif key == 'explanation_critique_tone':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_5_val = cleaned_value
                    elif key == 'score_fluency':
                        s_6_key_val = value
                    elif key == 'explanation_fluency':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_6_val = cleaned_value
                    elif key == 'score_valid_response':
                        s_7_key_val = value
                    elif key == 'score_response_conciseness':
                        s_8_key_val = value
                    elif key == 'explanation_response_conciseness':
                        print('Raw Value: ', value)
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_8_val = cleaned_value
        
            print('$'*200)
        
            results = {
                "Factual_Accuracy_Score": s_2_key_val,
                "Factual_Accuracy_Explanation": s_2_val,
                "Context_Relevance_Score": s_1_key_val,
                "Context_Relevance_Explanation": s_1_val,
                "Response_Completeness_Score": s_3_key_val,
                "Response_Completeness_Explanation": s_3_val,
                "Response_Relevance_Score": s_4_key_val,
                "Response_Relevance_Explanation": s_4_val,
                "Response_Fluency_Score": s_6_key_val,
                "Response_Fluency_Explanation": s_6_val,
                "Response_Tonality_Score": s_5_key_val,
                "Response_Tonality_Explanation": s_5_val,
                "Guideline_Adherence_Score": s_8_key_val,
                "Guideline_Adherence_Explanation": s_8_val,
                "Response_Match_Score": s_7_key_val
                # Add other evaluations similarly
            }
        
            return results

        The above method parsed the initial data from UpTrain before sending it to OpenAI for a better summary without changing any text returned by it.

        @app.route('/evaluate', methods=['POST'])
        def evaluate():
            data = request.json
        
            if not data:
                return {jsonify({'error': 'No data provided'}), 400}
        
            # Extracting input data for processing (just an example of logging received data)
            question = data.get('question', '')
            context = data.get('context', '')
            llmResponse = ''
            personaResponse = data.get('personaResponse', '')
            guideline = data.get('guideline', '')
            groundTruth = data.get('groundTruth', '')
            evaluationMethod = data.get('evaluationMethod', '')
        
            print('question:')
            print(question)
        
            llmResponse = askFeluda(context, question)
            print('='*200)
            print('Response from Feluda::')
            print(llmResponse)
            print('='*200)
        
            # Getting Context LLM
            cLLM = evalContextRelevance(question, context, llmResponse, personaResponse)
        
            print('&'*200)
            print('cLLM:')
            print(cLLM)
            print(type(cLLM))
            print('&'*200)
        
            results = extractPrintedData(cLLM)
        
            print('JSON::')
            print(results)
        
            resJson = jsonify(results)
        
            return resJson

        The above function is the main method, which first receives all the input parameters from the react app & then invokes one-by-one functions to get the LLM response, and LLM performance & finally summarizes them before sending it to react-app.

        For any other scripts, please refer to the above-mentioned GitHub link.


        Let us see some of the screenshots of the test run –


        So, we’ve done it.

        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Enabling & Exploring Stable Defussion – Part 1

        This new solution will evaluate the power of Stable Defussion, which is created solutions as we progress & refine our prompt from scratch by using Stable Defussion & Python. This post opens new opportunities for IT companies & business start-ups looking to deliver solutions & have better performance compared to the paid version of Stable Defussion AI’s API performance. This project is for the advanced Python, Stable Defussion for data Science Newbies & AI evangelists.

        In a series of posts, I’ll explain and focus on the Stable Defussion API and custom solution using the Python-based SDK of Stable Defussion.

        But, before that, let us view the video that it generates from the prompt by using the third-party API:

        Prompt to Video

        And, let us understand the prompt that we supplied to create the above video –

        Isn’t it exciting?

        However, I want to stress this point: the video generated by the Stable Defusion (Stability AI) API was able to partially apply the animation effect. Even though the animation applies to the cloud, It doesn’t apply the animation to the wave. But, I must admit, the quality of the video is quite good.


        Let us understand the code and how we run the solution, and then we can try to understand its performance along with the other solutions later in the subsequent series.

        As you know, we’re exploring the code base of the third-party API, which will actually execute a series of API calls that create a video out of the prompt.

        Let us understand some of the important snippet –

        class clsStabilityAIAPI:
            def __init__(self, STABLE_DIFF_API_KEY, OUT_DIR_PATH, FILE_NM, VID_FILE_NM):
                self.STABLE_DIFF_API_KEY = STABLE_DIFF_API_KEY
                self.OUT_DIR_PATH = OUT_DIR_PATH
                self.FILE_NM = FILE_NM
                self.VID_FILE_NM = VID_FILE_NM
        
            def delFile(self, fileName):
                try:
                    # Deleting the intermediate image
                    os.remove(fileName)
        
                    return 0 
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 1
        
            def generateText2Image(self, inputDescription):
                try:
                    STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
                    fullFileName = self.OUT_DIR_PATH + self.FILE_NM
                    
                    if STABLE_DIFF_API_KEY is None:
                        raise Exception("Missing Stability API key.")
                    
                    response = requests.post(f"{api_host}/v1/generation/{engine_id}/text-to-image",
                                            headers={
                                                "Content-Type": "application/json",
                                                "Accept": "application/json",
                                                "Authorization": f"Bearer {STABLE_DIFF_API_KEY}"
                                                },
                                                json={
                                                    "text_prompts": [{"text": inputDescription}],
                                                    "cfg_scale": 7,
                                                    "height": 1024,
                                                    "width": 576,
                                                    "samples": 1,
                                                    "steps": 30,
                                                    },)
                    
                    if response.status_code != 200:
                        raise Exception("Non-200 response: " + str(response.text))
                    
                    data = response.json()
        
                    for i, image in enumerate(data["artifacts"]):
                        with open(fullFileName, "wb") as f:
                            f.write(base64.b64decode(image["base64"]))      
                    
                    return fullFileName
        
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 'N/A'
        
            def image2VideoPassOne(self, imgNameWithPath):
                try:
                    STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
        
                    response = requests.post(f"https://api.stability.ai/v2beta/image-to-video",
                                            headers={"authorization": f"Bearer {STABLE_DIFF_API_KEY}"},
                                            files={"image": open(imgNameWithPath, "rb")},
                                            data={"seed": 0,"cfg_scale": 1.8,"motion_bucket_id": 127},
                                            )
                    
                    print('First Pass Response:')
                    print(str(response.text))
                    
                    genID = response.json().get('id')
        
                    return genID 
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 'N/A'
        
            def image2VideoPassTwo(self, genId):
                try:
                    generation_id = genId
                    STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
                    fullVideoFileName = self.OUT_DIR_PATH + self.VID_FILE_NM
        
                    response = requests.request("GET", f"https://api.stability.ai/v2beta/image-to-video/result/{generation_id}",
                                                headers={
                                                    'accept': "video/*",  # Use 'application/json' to receive base64 encoded JSON
                                                    'authorization': f"Bearer {STABLE_DIFF_API_KEY}"
                                                    },) 
                    
                    print('Retrieve Status Code: ', str(response.status_code))
                    
                    if response.status_code == 202:
                        print("Generation in-progress, try again in 10 seconds.")
        
                        return 5
                    elif response.status_code == 200:
                        print("Generation complete!")
                        with open(fullVideoFileName, 'wb') as file:
                            file.write(response.content)
        
                        print("Successfully Retrieved the video file!")
        
                        return 0
                    else:
                        raise Exception(str(response.json()))
                    
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 1

        Now, let us understand the code –

        This function is called when an object of the class is created. It initializes four properties:

        • STABLE_DIFF_API_KEY: the API key for Stability AI services.
        • OUT_DIR_PATH: the folder path to save files.
        • FILE_NM: the name of the generated image file.
        • VID_FILE_NM: the name of the generated video file.

        This function deletes a file specified by fileName.

        • If successful, it returns 0.
        • If an error occurs, it logs the error and returns 1.

        This function generates an image based on a text description:

        • Sends a request to the Stability AI text-to-image endpoint using the API key.
        • Saves the resulting image to a file.
        • Returns the file’s path on success or 'N/A' if an error occurs.

        This function uploads an image to create a video in its first phase:

        • Sends the image to Stability AI’s image-to-video endpoint.
        • Logs the response and extracts the id (generation ID) for the next phase.
        • Returns the id if successful or 'N/A' on failure.

        This function retrieves the video created in the second phase using the genId:

        • Checks the video generation status from the Stability AI endpoint.
        • If complete, saves the video file and returns 0.
        • If still processing, returns 5.
        • Logs and returns 1 for any errors.

        As you can see, the code is pretty simple to understand & we’ve taken all the necessary actions in case of any unforeseen network issues or even if the video is not ready after our job submission in the following lines of the main calling script (generateText2VideoAPI.py) –

        waitTime = 10
        time.sleep(waitTime)
        
        # Failed case retry
        retries = 1
        success = False
        
        try:
            while not success:
                try:
                    z = r1.image2VideoPassTwo(gID)
                except Exception as e:
                    success = False
        
                if z == 0:
                    success = True
                else:
                    wait = retries * 2 * 15
                    str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"
        
                    print(str_R1)
        
                    time.sleep(wait)
                    retries += 1
        
                # Checking maximum retries
                if retries >= maxRetryNo:
                    success = True
                    raise  Exception
        except:
            print()

        And, let us see how the run looks like –

        Let us understand the CPU utilization –

        As you can see, CPU utilization is minimal since most tasks are at the API end.


        So, we’ve done it. 🙂

        Please find the next series on this topic below:

        Enabling & Exploring Stable Defussion – Part 2

        Enabling & Exploring Stable Defussion – Part 3

        Please let me know your feedback after reviewing all the posts! 🙂