AGENTIC AI IN THE ENTERPRISE: STRATEGY, ARCHITECTURE, AND IMPLEMENTATION – PART 2

This is a continuation of my previous post, which can be found here.

Let us recap the key takaways from our previous post –

Agentic AI refers to autonomous systems that pursue goals with minimal supervision by planning, reasoning about next steps, utilizing tools, and maintaining context across sessions. Core capabilities include goal-directed autonomy, interaction with tools and environments (e.g., APIs, databases, devices), multi-step planning and reasoning under uncertainty, persistence, and choiceful decision-making.

Architecturally, three modules coordinate intelligent behavior: Sensing (perception pipelines that acquire multimodal data, extract salient patterns, and recognize entities/events); Observation/Deliberation (objective setting, strategy formation, and option evaluation relative to resources and constraints); and Action (execution via software interfaces, communications, or physical actuation to deliver outcomes). These functions are enabled by machine learning, deep learning, computer vision, natural language processing, planning/decision-making, uncertainty reasoning, and simulation/modeling.

At enterprise scale, open standards align autonomy with governance: the Model Context Protocol (MCP) grants an agent secure, principled access to enterprise tools and data (vertical integration), while Agent-to-Agent (A2A) enables specialized agents to coordinate, delegate, and exchange information (horizontal collaboration). Together, MCP and A2A help organizations transition from isolated pilots to scalable programs, delivering end-to-end automation, faster integration, enhanced security and auditability, vendor-neutral interoperability, and adaptive problem-solving that responds to real-time context.

Great! Let’s dive into this topic now.

Enterprise AI with MCP refers to the application of the Model Context Protocol (MCP), an open standard, to enable AI systems to securely and consistently access external enterprise data and applications. 

Before MCP, enterprise AI integration was characterized by a “many-to-many” or “N x M” problem. Companies had to build custom, fragile, and costly integrations between each AI model and every proprietary data source, which was not scalable. These limitations left AI agents with limited, outdated, or siloed information, restricting their potential impact. 
MCP addresses this by offering a standardized architecture for AI and data systems to communicate with each other.

The MCP framework uses a client-server architecture to enable communication between AI models and external tools and data sources. 

  • MCP Host: The AI-powered application or environment, such as an AI-enhanced IDE or a generative AI chatbot like Anthropic’s Claude or OpenAI’s ChatGPT, where the user interacts.
  • MCP Client: A component within the host application that manages the connection to MCP servers.
  • MCP Server: A lightweight service that wraps around an external system (e.g., a CRM, database, or API) and exposes its capabilities to the AI client in a standardized format, typically using JSON-RPC 2.0. 

An MCP server provides AI clients with three key resources: 

  • Resources: Structured or unstructured data that an AI can access, such as files, documents, or database records.
  • Tools: The functionality to perform specific actions within an external system, like running a database query or sending an email.
  • Prompts: Pre-defined text templates or workflows to help guide the AI’s actions. 
  • Standardized integration: Developers can build integrations against a single, open standard, which dramatically reduces the complexity and time required to deploy and scale AI initiatives.
  • Enhanced security and governance: MCP incorporates native support for security and compliance measures. It provides permission models, access control, and auditing capabilities to ensure AI systems only access data and tools within specified boundaries.
  • Real-time contextual awareness: By connecting AI agents to live enterprise data sources, MCP ensures they have access to the most current and relevant information, which reduces hallucinations and improves the accuracy of AI outputs.
  • Greater interoperability: MCP is model-agnostic & can be used with a variety of AI models (e.g., Anthropic’s Claude or OpenAI’s models) and across different cloud environments. This approach helps enterprises avoid vendor lock-in.
  • Accelerated development: The “build once, integrate everywhere” approach enables internal teams to focus on innovation instead of writing custom connectors for every system.

Let us understand one sample case & the flow of activities.

A customer support agent uses an AI assistant to get information about a customer’s recent orders. The AI assistant utilizes an MCP-compliant client to communicate with an MCP server, which is connected to the company’s PostgreSQL database.

1. User request: The support agent asks the AI assistant, “What was the most recent order placed by Priyanka Chopra Jonas?”

2. AI model processes intent: The AI assistant, running on an MCP host, analyzes the natural language query. It recognizes that to answer this question, it needs to perform a database query. It then identifies the appropriate tool from the MCP server’s capabilities. 

3. Client initiates tool call: The AI assistant’s MCP client sends a JSON-RPC request to the MCP server connected to the PostgreSQL database. The request specifies the tool to be used, such as get_customer_orders, and includes the necessary parameters: 

{
  "jsonrpc": "2.0",
  "method": "db_tools.get_customer_orders",
  "params": {
    "customer_name": "Priyanka Chopra Jonas",
    "sort_by": "order_date",
    "sort_order": "desc",
    "limit": 1
  },
  "id": "12345"
}

4. Server handles the request: The MCP server receives the request and performs several key functions: 

  • Authentication and authorization: The server verifies that the AI client and the user have permission to query the database.
  • Query translation: The server translates the standardized MCP request into a specific SQL query for the PostgreSQL database.
  • Query execution: The server executes the SQL query against the database.
SELECT order_id, order_date, total_amount
FROM orders
WHERE customer_name = 'Priyanka Chopra Jonas'
ORDER BY order_date DESC
LIMIT 1;

5. Database returns data: The PostgreSQL database executes the query and returns the requested data to the MCP server. 

6. Server formats the response: The MCP server receives the raw database output and formats it into a standardized JSON response that the MCP client can understand.

{
  "jsonrpc": "2.0",
  "result": {
    "data": [
      {
        "order_id": "98765",
        "order_date": "2025-08-25",
        "total_amount": 11025.50
      }
    ]
  },
  "id": "12345"
}

7. Client returns data to the model: The MCP client receives the JSON response and passes it back to the AI assistant’s language model. 

8. AI model generates final response: The language model incorporates this real-time data into its response and presents it to the user in a natural, conversational format. 

“Priyanka Chopra Jonas’s most recent order was placed on August 25, 2025, with an order ID of 98765, for a total of $11025.50.”

Using the Model Context Protocol (MCP) for database access introduces a layer of abstraction that affects performance in several ways. While it adds some latency and processing overhead, strategic implementation can mitigate these effects. For AI applications, the benefits often outweigh the costs, particularly in terms of improved accuracy, security, and scalability.

The MCP architecture introduces extra communication steps between the AI agent and the database, each adding a small amount of latency. 

  • RPC overhead: The JSON-RPC call from the AI’s client to the MCP server adds a small processing and network delay. This is an out-of-process request, as opposed to a simple local function call.
  • JSON serialization: Request and response data must be serialized and deserialized into JSON format, which requires processing time.
  • Network transit: For remote MCP servers, the data must travel over the network, adding latency. However, for a local or on-premise setup, this is minimal. The physical location of the MCP server relative to the AI model and the database is a significant factor.

The performance impact scales with the complexity and volume of the AI agent’s interactions. 

  • High request volume: A single AI agent working on a complex task might issue dozens of parallel database queries. In high-traffic scenarios, managing numerous simultaneous connections can strain system resources and require robust infrastructure.
  • Excessive data retrieval: A significant performance risk is an AI agent retrieving a massive dataset in a single query. This process can consume a large number of tokens, fill the AI’s context window, and cause bottlenecks at the database and client levels.
  • Context window usage: Tool definitions and the results of tool calls consume space in the AI’s context window. If a large number of tools are in use, this can limit the AI’s “working memory,” resulting in slower and less effective reasoning. 

Caching is a crucial strategy for mitigating the performance overhead of MCP. 

  • In-memory caching: The MCP server can cache results from frequent or expensive database queries in memory (e.g., using Redis or Memcached). This approach enables repeat requests to be served almost instantly without requiring a database hit.
  • Semantic caching: Advanced techniques can cache the results of previous queries and serve them for semantically similar future requests, reducing token consumption and improving speed for conversational applications. 

Designing the MCP server and its database interactions for efficiency is critical. 

  • Optimized SQL: The MCP server should generate optimized SQL queries. Database indexes should be utilized effectively to expedite lookups and minimize load.
  • Pagination and filtering: To prevent a single query from overwhelming the system, the MCP server should implement pagination. The AI agent can be prompted to use filtering parameters to retrieve only the necessary data.
  • Connection pooling: This technique reuses existing database connections instead of opening a new one for each request, thereby reducing latency and database load. 

For large-scale enterprise deployments, scaling is essential for maintaining performance. 

  • Multiple servers: The workload can be distributed across various MCP servers. One server could handle read requests, and another could handle writes.
  • Load balancing: A reverse proxy or other load-balancing solution can distribute incoming traffic across MCP server instances. Autoscaling can dynamically add or remove servers in response to demand.

For AI-driven tasks, a slight increase in latency for database access is often a worthwhile trade-off for significant gains. 

  • Improved accuracy: Accessing real-time, high-quality data through MCP leads to more accurate and relevant AI responses, reducing “hallucinations”.
  • Scalable ecosystem: The standardization of MCP reduces development overhead and allows for a more modular, scalable ecosystem, which saves significant engineering resources compared to building custom integrations.
  • Decoupled architecture: The MCP server decouples the AI model from the database, allowing each to be optimized and scaled independently. 

We’ll go ahead and conclude this post here & continue discussing on a further deep dive in the next post.

Till then, Happy Avenging! 🙂

Agentic AI in the Enterprise: Strategy, Architecture, and Implementation – Part 1

Today, we won’t be discussing any solutions. Today, we’ll be discussing the Agentic AI & its implementation in the Enterprise landscape in a series of upcoming posts.

So, hang tight! We’re about to launch a new venture as part of our knowledge drive.

Agentic AI refers to artificial intelligence systems that can act autonomously to achieve goals, making decisions and taking actions without constant human oversight. Unlike traditional AI, which responds to prompts, agentic AI can plan, reason about next steps, utilize tools, and work toward objectives over extended periods of time.

Key characteristics of agentic AI include:

  • Autonomy and Goal-Directed Behavior: These systems can pursue objectives independently, breaking down complex tasks into smaller steps and executing them sequentially.
  • Tool Use and Environment Interaction: Agentic AI can interact with external systems, APIs, databases, and software tools to gather information and perform actions in the real world.
  • Planning and Reasoning: They can develop multi-step strategies, adapt their approach based on feedback, and reason through problems to find solutions.
  • Persistence: Unlike single-interaction AI, agentic systems can maintain context and continue working on tasks across multiple interactions or sessions.
  • Decision Making: They can evaluate options, weigh trade-offs, and make choices about how to proceed when faced with uncertainty.

Agentic AI systems have several interconnected components that work together to enable intelligent behaviour. Each element plays a crucial role in the overall functioning of the AI system, and they must interact seamlessly to achieve desired outcomes. Let’s explore each of these components in more detail.

The sensing module serves as the AI’s eyes and ears, enabling it to understand its surroundings and make informed decisions. Think of it as the system that helps the AI “see” and “hear” the world around it, much like how humans use their senses.

  • Gathering Information: The system collects data from multiple sources, including cameras for visual information, microphones for audio, sensors for physical touch, and digital systems for data. This step provides the AI with a comprehensive understanding of what’s happening.
  • Making Sense of Data: Raw information from sensors can be messy and overwhelming. This component processes the data to identify the essential patterns and details that actually matter for making informed decisions.
  • Recognizing What’s Important: Utilizing advanced techniques such as computer vision (for images), natural language processing (for text and speech), and machine learning (for data patterns), the system identifies and understands objects, people, events, and situations within the environment.

This sensing capability enables AI systems to transition from merely following pre-programmed instructions to genuinely understanding their environment and making informed decisions based on real-world conditions. It’s the difference between a basic automated system and an intelligent agent that can adapt to changing situations.

The observation module serves as the AI’s decision-making center, where it sets objectives, develops strategies, and selects the most effective actions to take. This step is where the AI transforms what it perceives into purposeful action, much like humans think through problems and devise plans.

  • Setting Clear Objectives: The system establishes specific goals and desired outcomes, giving the AI a clear sense of direction and purpose. This approach helps ensure all actions are working toward meaningful results rather than random activity.
  • Strategic Planning: Using information about its own capabilities and the current situation, the AI creates step-by-step plans to reach its goals. It considers potential obstacles, available resources, and different approaches to find the most effective path forward.
  • Intelligent Decision-Making: When faced with multiple options, the system evaluates each choice against the current circumstances, established goals, and potential outcomes. It then selects the action most likely to move the AI closer to achieving its objectives.

This observation capability is what transforms an AI from a simple tool that follows commands into an intelligent system that can work independently toward business goals. It enables the AI to handle complex, multi-step tasks and adapt its approach when conditions change, making it valuable for a wide range of applications, from customer service to project management.

The action module serves as the AI’s hands and voice, turning decisions into real-world results. This step is where the AI actually puts its thinking and planning into action, carrying out tasks that make a tangible difference in the environment.

  • Control Systems: The system utilizes various tools to interact with the world, including motors for physical movement, speakers for communication, network connections for digital tasks, and software interfaces for system operation. These serve as the AI’s means of reaching out and making adjustments.
  • Task Implementation: Once the cognitive module determines the action to take, this component executes the actual task. Whether it’s sending an email, moving a robotic arm, updating a database, or scheduling a meeting, this module handles the execution from start to finish.

This action capability is what makes AI systems truly useful in business environments. Without it, an AI could analyze data and make significant decisions, but it couldn’t help solve problems or complete tasks. The action module bridges the gap between artificial intelligence and real-world impact, enabling AI to automate processes, respond to customers, manage systems, and deliver measurable business value.

Technology that is primarily involved in the Agentic AI is as follows –

1. Machine Learning
2. Deep Learning
3. Computer Vision
4. Natural Language Processing (NLP)
5. Planning and Decision-Making
6. Uncertainty and Reasoning
7. Simulation and Modeling

In an enterprise setting, agentic AI systems utilize the Model Context Protocol (MCP) and the Agent-to-Agent (A2A) protocol as complementary, open standards to achieve autonomous, coordinated, and secure workflows. An MCP-enabled agent gains the ability to access and manipulate enterprise tools and data. At the same time, A2A allows a network of these agents to collaborate on complex tasks by delegating and exchanging information.

This combined approach allows enterprises to move from isolated AI experiments to strategic, scalable, and secure AI programs.

ProtocolFunction in Agentic AIFocusExample use case
Model Context Protocol (MCP)Equips a single AI agent with the tools and data it needs to perform a specific job.Vertical integration: connecting agents to enterprise systems like databases, CRMs, and APIs.A sales agent uses MCP to query the company CRM for a client’s recent purchase history.
Agent-to-Agent (A2A)Enables multiple specialized agents to communicate, delegate tasks, and collaborate on a larger, multi-step goal.Horizontal collaboration: allowing agents from different domains to work together seamlessly.An orchestrating agent uses A2A to delegate parts of a complex workflow to specialized HR, IT, and sales agents.
  • End-to-end automation: Agents can handle tasks from start to finish, including complex, multi-step workflows, autonomously.
  • Greater agility and speed: Enterprise-wide adoption of these protocols reduces the cost and complexity of integrating AI, accelerating deployment timelines for new applications.
  • Enhanced security and governance: Enterprise AI platforms built on these open standards incorporate robust security policies, centralized access controls, and comprehensive audit trails.
  • Vendor neutrality and interoperability: As open standards, MCP and A2A allow AI agents to work together seamlessly, regardless of the underlying vendor or platform.
  • Adaptive problem-solving: Agents can dynamically adjust their strategies and collaborate based on real-time data and contextual changes, leading to more resilient and efficient systems.

We will discuss this topic further in our upcoming posts.

Till then, Happy Avenging! 🙂

Real-time video summary assistance App – Part 2

As a continuation of the previous post, I would like to continue my discussion about the implementation of MCP protocols among agents. But before that, I want to add the quick demo one more time to recap our objectives.

Let us recap the process flow –

Also, understand the groupings of scripts by each group as posted in the previous post –

Message-Chaining Protocol (MCP) Implementation:

    clsMCPMessage.py
    clsMCPBroker.py

YouTube Transcript Extraction:

    clsYouTubeVideoProcessor.py

Language Detection:

    clsLanguageDetector.py

Translation Services & Agents:

    clsTranslationAgent.py
    clsTranslationService.py

Documentation Agent:

    clsDocumentationAgent.py
    
Research Agent:

    clsDocumentationAgent.py

Great! Now, we’ll continue with the main discussion.


def extract_youtube_id(youtube_url):
    """Extract YouTube video ID from URL"""
    youtube_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url)
    if youtube_id_match:
        return youtube_id_match.group(1)
    return None

def get_youtube_transcript(youtube_url):
    """Get transcript from YouTube video"""
    video_id = extract_youtube_id(youtube_url)
    if not video_id:
        return {"error": "Invalid YouTube URL or ID"}
    
    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        
        # First try to get manual transcripts
        try:
            transcript = transcript_list.find_manually_created_transcript(["en"])
            transcript_data = transcript.fetch()
            print(f"Debug - Manual transcript format: {type(transcript_data)}")
            if transcript_data and len(transcript_data) > 0:
                print(f"Debug - First item type: {type(transcript_data[0])}")
                print(f"Debug - First item sample: {transcript_data[0]}")
            return {"text": transcript_data, "language": "en", "auto_generated": False}
        except Exception as e:
            print(f"Debug - No manual transcript: {str(e)}")
            # If no manual English transcript, try any available transcript
            try:
                available_transcripts = list(transcript_list)
                if available_transcripts:
                    transcript = available_transcripts[0]
                    print(f"Debug - Using transcript in language: {transcript.language_code}")
                    transcript_data = transcript.fetch()
                    print(f"Debug - Auto transcript format: {type(transcript_data)}")
                    if transcript_data and len(transcript_data) > 0:
                        print(f"Debug - First item type: {type(transcript_data[0])}")
                        print(f"Debug - First item sample: {transcript_data[0]}")
                    return {
                        "text": transcript_data, 
                        "language": transcript.language_code, 
                        "auto_generated": transcript.is_generated
                    }
                else:
                    return {"error": "No transcripts available for this video"}
            except Exception as e:
                return {"error": f"Error getting transcript: {str(e)}"}
    except Exception as e:
        return {"error": f"Error getting transcript list: {str(e)}"}

# ----------------------------------------------------------------------------------
# YouTube Video Processor
# ----------------------------------------------------------------------------------

class clsYouTubeVideoProcessor:
    """Process YouTube videos using the agent system"""
    
    def __init__(self, documentation_agent, translation_agent, research_agent):
        self.documentation_agent = documentation_agent
        self.translation_agent = translation_agent
        self.research_agent = research_agent
    
    def process_youtube_video(self, youtube_url):
        """Process a YouTube video"""
        print(f"Processing YouTube video: {youtube_url}")
        
        # Extract transcript
        transcript_result = get_youtube_transcript(youtube_url)
        
        if "error" in transcript_result:
            return {"error": transcript_result["error"]}
        
        # Start a new conversation
        conversation_id = self.documentation_agent.start_processing()
        
        # Process transcript segments
        transcript_data = transcript_result["text"]
        transcript_language = transcript_result["language"]
        
        print(f"Debug - Type of transcript_data: {type(transcript_data)}")
        
        # For each segment, detect language and translate if needed
        processed_segments = []
        
        try:
            # Make sure transcript_data is a list of dictionaries with text and start fields
            if isinstance(transcript_data, list):
                for idx, segment in enumerate(transcript_data):
                    print(f"Debug - Processing segment {idx}, type: {type(segment)}")
                    
                    # Extract text properly based on the type
                    if isinstance(segment, dict) and "text" in segment:
                        text = segment["text"]
                        start = segment.get("start", 0)
                    else:
                        # Try to access attributes for non-dict types
                        try:
                            text = segment.text
                            start = getattr(segment, "start", 0)
                        except AttributeError:
                            # If all else fails, convert to string
                            text = str(segment)
                            start = idx * 5  # Arbitrary timestamp
                    
                    print(f"Debug - Extracted text: {text[:30]}...")
                    
                    # Create a standardized segment
                    std_segment = {
                        "text": text,
                        "start": start
                    }
                    
                    # Process through translation agent
                    translation_result = self.translation_agent.process_text(text, conversation_id)
                    
                    # Update segment with translation information
                    segment_with_translation = {
                        **std_segment,
                        "translation_info": translation_result
                    }
                    
                    # Use translated text for documentation
                    if "final_text" in translation_result and translation_result["final_text"] != text:
                        std_segment["processed_text"] = translation_result["final_text"]
                    else:
                        std_segment["processed_text"] = text
                    
                    processed_segments.append(segment_with_translation)
            else:
                # If transcript_data is not a list, treat it as a single text block
                print(f"Debug - Transcript is not a list, treating as single text")
                text = str(transcript_data)
                std_segment = {
                    "text": text,
                    "start": 0
                }
                
                translation_result = self.translation_agent.process_text(text, conversation_id)
                segment_with_translation = {
                    **std_segment,
                    "translation_info": translation_result
                }
                
                if "final_text" in translation_result and translation_result["final_text"] != text:
                    std_segment["processed_text"] = translation_result["final_text"]
                else:
                    std_segment["processed_text"] = text
                
                processed_segments.append(segment_with_translation)
                
        except Exception as e:
            print(f"Debug - Error processing transcript: {str(e)}")
            return {"error": f"Error processing transcript: {str(e)}"}
        
        # Process the transcript with the documentation agent
        documentation_result = self.documentation_agent.process_transcript(
            processed_segments,
            conversation_id
        )
        
        return {
            "youtube_url": youtube_url,
            "transcript_language": transcript_language,
            "processed_segments": processed_segments,
            "documentation": documentation_result,
            "conversation_id": conversation_id
        }

Let us understand this step-by-step:

Part 1: Getting the YouTube Transcript

def extract_youtube_id(youtube_url):
    ...

This extracts the unique video ID from any YouTube link. 

def get_youtube_transcript(youtube_url):
    ...
  • This gets the actual spoken content of the video.
  • It tries to get a manual transcript first (created by humans).
  • If not available, it falls back to an auto-generated version (created by YouTube’s AI).
  • If nothing is found, it gives back an error message like: “Transcript not available.”

Part 2: Processing the Video with Agents

class clsYouTubeVideoProcessor:
    ...

This is like the control center that tells each intelligent agent what to do with the transcript. Here are the detailed steps:

1. Start the Process

def process_youtube_video(self, youtube_url):
    ...
  • The system starts with a YouTube video link.
  • It prints a message like: “Processing YouTube video: [link]”

2. Extract the Transcript

  • The system runs the get_youtube_transcript() function.
  • If it fails, it returns an error (e.g., invalid link or no subtitles available).

3. Start a “Conversation”

  • The documentation agent begins a new session, tracked by a unique conversation ID.
  • Think of this like opening a new folder in a shared team workspace to store everything related to this video.

4. Go Through Each Segment of the Transcript

  • The spoken text is often broken into small parts (segments), like subtitles.
  • For each part:
    • It checks the text.
    • It finds out the time that part was spoken.
    • It sends it to the translation agent to clean up or translate the text.

5. Translate (if needed)

  • If the translation agent finds a better or translated version, it replaces the original.
  • Otherwise, it keeps the original.

6. Prepare for Documentation

  • After translation, the segment is passed to the documentation agent.
  • This agent might:
    • Summarize the content,
    • Highlight important terms,
    • Structure it into a readable format.

7. Return the Final Result

The system gives back a structured package with:

  • The video link
  • The original language
  • The transcript in parts (processed and translated)
  • A documentation summary
  • The conversation ID (for tracking or further updates)

class clsDocumentationAgent:
    """Documentation Agent built with LangChain"""
    
    def __init__(self, agent_id: str, broker: clsMCPBroker):
        self.agent_id = agent_id
        self.broker = broker
        self.broker.register_agent(agent_id)
        
        # Initialize LangChain components
        self.llm = ChatOpenAI(
            model="gpt-4-0125-preview",
            temperature=0.1,
            api_key=OPENAI_API_KEY
        )
        
        # Create tools
        self.tools = [
            clsSendMessageTool(sender_id=self.agent_id, broker=self.broker)
        ]
        
        # Set up LLM with tools
        self.llm_with_tools = self.llm.bind(
            tools=[tool.tool_config for tool in self.tools]
        )
        
        # Setup memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        
        # Create prompt
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a Documentation Agent for YouTube video transcripts. Your responsibilities include:
                1. Process YouTube video transcripts
                2. Identify key points, topics, and main ideas
                3. Organize content into a coherent and structured format
                4. Create concise summaries
                5. Request research information when necessary
                
                When you need additional context or research, send a request to the Research Agent.
                Always maintain a professional tone and ensure your documentation is clear and organized.
            """),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        # Create agent
        self.agent = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: self.memory.load_memory_variables({})["chat_history"],
                "agent_scratchpad": lambda x: format_to_openai_tool_messages(x["intermediate_steps"]),
            }
            | self.prompt
            | self.llm_with_tools
            | OpenAIToolsAgentOutputParser()
        )
        
        # Create agent executor
        self.agent_executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            verbose=True,
            memory=self.memory
        )
        
        # Video data
        self.current_conversation_id = None
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
    def start_processing(self) -> str:
        """Start processing a new video"""
        self.current_conversation_id = str(uuid.uuid4())
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
        return self.current_conversation_id
    
    def process_transcript(self, transcript_segments, conversation_id=None):
        """Process a YouTube transcript"""
        if not conversation_id:
            conversation_id = self.start_processing()
        self.current_conversation_id = conversation_id
        
        # Store transcript segments
        self.transcript_segments = transcript_segments
        
        # Process segments
        processed_segments = []
        for segment in transcript_segments:
            processed_result = self.process_segment(segment)
            processed_segments.append(processed_result)
        
        # Generate summary
        summary = self.generate_summary()
        
        return {
            "processed_segments": processed_segments,
            "summary": summary,
            "conversation_id": conversation_id
        }
    
    def process_segment(self, segment):
        """Process individual transcript segment"""
        text = segment.get("text", "")
        start = segment.get("start", 0)
        
        # Use LangChain agent to process the segment
        result = self.agent_executor.invoke({
            "input": f"Process this video transcript segment at timestamp {start}s: {text}. If research is needed, send a request to the research_agent."
        })
        
        # Update video notes
        timestamp = start
        self.video_notes[timestamp] = {
            "text": text,
            "analysis": result["output"]
        }
        
        return {
            "timestamp": timestamp,
            "text": text,
            "analysis": result["output"]
        }
    
    def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
        """Handle an incoming MCP message"""
        if message.message_type == "research_response":
            # Process research information received from Research Agent
            research_info = message.content.get("text", "")
            
            result = self.agent_executor.invoke({
                "input": f"Incorporate this research information into video analysis: {research_info}"
            })
            
            # Send acknowledgment back to Research Agent
            response = clsMCPMessage(
                sender=self.agent_id,
                receiver=message.sender,
                message_type="acknowledgment",
                content={"text": "Research information incorporated into video analysis."},
                reply_to=message.id,
                conversation_id=message.conversation_id
            )
            
            self.broker.publish(response)
            return response
        
        elif message.message_type == "translation_response":
            # Process translation response from Translation Agent
            translation_result = message.content
            
            # Process the translated text
            if "final_text" in translation_result:
                text = translation_result["final_text"]
                original_text = translation_result.get("original_text", "")
                language_info = translation_result.get("language", {})
                
                result = self.agent_executor.invoke({
                    "input": f"Process this translated text: {text}\nOriginal language: {language_info.get('language', 'unknown')}\nOriginal text: {original_text}"
                })
                
                # Update notes with translation information
                for timestamp, note in self.video_notes.items():
                    if note["text"] == original_text:
                        note["translated_text"] = text
                        note["language"] = language_info
                        break
            
            return None
        
        return None
    
    def run(self):
        """Run the agent to listen for MCP messages"""
        print(f"Documentation Agent {self.agent_id} is running...")
        while True:
            message = self.broker.get_message(self.agent_id, timeout=1)
            if message:
                self.handle_mcp_message(message)
            time.sleep(0.1)
    
    def generate_summary(self) -> str:
        """Generate a summary of the video"""
        if not self.video_notes:
            return "No video data available to summarize."
        
        all_notes = "\n".join([f"{ts}: {note['text']}" for ts, note in self.video_notes.items()])
        
        result = self.agent_executor.invoke({
            "input": f"Generate a concise summary of this YouTube video, including key points and topics:\n{all_notes}"
        })
        
        return result["output"]

Let us understand the key methods in a step-by-step manner:

The Documentation Agent is like a smart assistant that watches a YouTube video, takes notes, pulls out important ideas, and creates a summary — almost like a professional note-taker trained to help educators, researchers, and content creators. It works with a team of other assistants, like a Translator Agent and a Research Agent, and they all talk to each other through a messaging system.

1. Starting to Work on a New Video

    def start_processing(self) -> str
    

    When a new video is being processed:

    • A new project ID is created.
    • Old notes and transcripts are cleared to start fresh.

    2. Processing the Whole Transcript

    def process_transcript(...)
    

    This is where the assistant:

    • Takes in the full transcript (what was said in the video).
    • Breaks it into small parts (like subtitles).
    • Sends each part to the smart brain for analysis.
    • Collects the results.
    • Finally, a summary of all the main ideas is created.

    3. Processing One Transcript Segment at a Time

    def process_segment(self, segment)
    

    For each chunk of the video:

    • The assistant reads the text and timestamp.
    • It asks GPT-4 to analyze it and suggest important insights.
    • It saves that insight along with the original text and timestamp.

    4. Handling Incoming Messages from Other Agents

    def handle_mcp_message(self, message)
    

    The assistant can also receive messages from teammates (other agents):

    If the message is from the Research Agent:

    • It reads new information and adds it to its notes.
    • It replies with a thank-you message to say it got the research.

    If the message is from the Translation Agent:

    • It takes the translated version of a transcript.
    • Updates its notes to reflect the translated text and its language.

    This is like a team of assistants emailing back and forth to make sure the notes are complete and accurate.

    5. Summarizing the Whole Video

    def generate_summary(self)
    

    After going through all the transcript parts, the agent asks GPT-4 to create a short, clean summary — identifying:

    • Main ideas
    • Key talking points
    • Structure of the content

    The final result is clear, professional, and usable in learning materials or documentation.


    class clsResearchAgent:
        """Research Agent built with AutoGen"""
        
        def __init__(self, agent_id: str, broker: clsMCPBroker):
            self.agent_id = agent_id
            self.broker = broker
            self.broker.register_agent(agent_id)
            
            # Configure AutoGen directly with API key
            if not OPENAI_API_KEY:
                print("Warning: OPENAI_API_KEY not set for ResearchAgent")
                
            # Create config list directly instead of loading from file
            config_list = [
                {
                    "model": "gpt-4-0125-preview",
                    "api_key": OPENAI_API_KEY
                }
            ]
            # Create AutoGen assistant for research
            self.assistant = AssistantAgent(
                name="research_assistant",
                system_message="""You are a Research Agent for YouTube videos. Your responsibilities include:
                    1. Research topics mentioned in the video
                    2. Find relevant information, facts, references, or context
                    3. Provide concise, accurate information to support the documentation
                    4. Focus on delivering high-quality, relevant information
                    
                    Respond directly to research requests with clear, factual information.
                """,
                llm_config={"config_list": config_list, "temperature": 0.1}
            )
            
            # Create user proxy to handle message passing
            self.user_proxy = UserProxyAgent(
                name="research_manager",
                human_input_mode="NEVER",
                code_execution_config={"work_dir": "coding", "use_docker": False},
                default_auto_reply="Working on the research request..."
            )
            
            # Current conversation tracking
            self.current_requests = {}
        
        def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
            """Handle an incoming MCP message"""
            if message.message_type == "request":
                # Process research request from Documentation Agent
                request_text = message.content.get("text", "")
                
                # Use AutoGen to process the research request
                def research_task():
                    self.user_proxy.initiate_chat(
                        self.assistant,
                        message=f"Research request for YouTube video content: {request_text}. Provide concise, factual information."
                    )
                    # Return last assistant message
                    return self.assistant.chat_messages[self.user_proxy.name][-1]["content"]
                
                # Execute research task
                research_result = research_task()
                
                # Send research results back to Documentation Agent
                response = clsMCPMessage(
                    sender=self.agent_id,
                    receiver=message.sender,
                    message_type="research_response",
                    content={"text": research_result},
                    reply_to=message.id,
                    conversation_id=message.conversation_id
                )
                
                self.broker.publish(response)
                return response
            
            return None
        
        def run(self):
            """Run the agent to listen for MCP messages"""
            print(f"Research Agent {self.agent_id} is running...")
            while True:
                message = self.broker.get_message(self.agent_id, timeout=1)
                if message:
                    self.handle_mcp_message(message)
                time.sleep(0.1)
    

    Let us understand the key methods in detail.

    1. Receiving and Responding to Research Requests

      def handle_mcp_message(self, message)
      

      When the Research Agent gets a message (like a question or request for info), it:

      1. Reads the message to see what needs to be researched.
      2. Asks GPT-4 to find helpful, accurate info about that topic.
      3. Sends the answer back to whoever asked the question (usually the Documentation Agent).

      class clsTranslationAgent:
          """Agent for language detection and translation"""
          
          def __init__(self, agent_id: str, broker: clsMCPBroker):
              self.agent_id = agent_id
              self.broker = broker
              self.broker.register_agent(agent_id)
              
              # Initialize language detector
              self.language_detector = clsLanguageDetector()
              
              # Initialize translation service
              self.translation_service = clsTranslationService()
          
          def process_text(self, text, conversation_id=None):
              """Process text: detect language and translate if needed, handling mixed language content"""
              if not conversation_id:
                  conversation_id = str(uuid.uuid4())
              
              # Detect language with support for mixed language content
              language_info = self.language_detector.detect(text)
              
              # Decide if translation is needed
              needs_translation = True
              
              # Pure English content doesn't need translation
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  needs_translation = False
              
              # For mixed language, check if it's primarily English
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  english_langs = [
                      lang for lang in language_info.get("languages", []) 
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-")
                  ]
                  
                  # If the highest confidence language is English and > 60% confident, don't translate
                  if english_langs and english_langs[0].get("confidence", 0) > 0.6:
                      needs_translation = False
              
              if needs_translation:
                  # Translate using the appropriate service based on language detection
                  translation_result = self.translation_service.translate(text, language_info)
                  
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": translation_result,
                      "final_text": translation_result.get("translated_text", text),
                      "conversation_id": conversation_id
                  }
              else:
                  # Already English or unknown language, return as is
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": {"provider": "none"},
                      "final_text": text,
                      "conversation_id": conversation_id
                  }
          
          def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
              """Handle an incoming MCP message"""
              if message.message_type == "translation_request":
                  # Process translation request from Documentation Agent
                  text = message.content.get("text", "")
                  
                  # Process the text
                  result = self.process_text(text, message.conversation_id)
                  
                  # Send translation results back to requester
                  response = clsMCPMessage(
                      sender=self.agent_id,
                      receiver=message.sender,
                      message_type="translation_response",
                      content=result,
                      reply_to=message.id,
                      conversation_id=message.conversation_id
                  )
                  
                  self.broker.publish(response)
                  return response
              
              return None
          
          def run(self):
              """Run the agent to listen for MCP messages"""
              print(f"Translation Agent {self.agent_id} is running...")
              while True:
                  message = self.broker.get_message(self.agent_id, timeout=1)
                  if message:
                      self.handle_mcp_message(message)
                  time.sleep(0.1)

      Let us understand the key methods in step-by-step manner:

      1. Understanding and Translating Text:

      def process_text(...)
      

      This is the core job of the agent. Here’s what it does with any piece of text:

      Step 1: Detect the Language

      • It tries to figure out the language of the input text.
      • It can handle cases where more than one language is mixed together, which is common in casual speech or subtitles.

      Step 2: Decide Whether to Translate

      • If the text is clearly in English, or it’s unclear what the language is, it decides not to translate.
      • If the text is mostly in another language or has less than 60% confidence in being English, it will translate it into English.

      Step 3: Translate (if needed)

      • If translation is required, it uses the translation service to do the job.
      • Then it packages all the information: the original text, detected language, the translated version, and a unique conversation ID.

      Step 4: Return the Results

      • If no translation is needed, it returns the original text and a note saying “no translation was applied.”

      2. Receiving Messages and Responding

      def handle_mcp_message(...)
      

      The agent listens for messages from other agents. When someone asks it to translate something:

      • It takes the text from the message.
      • Runs it through the process_text function (as explained above).
      • Sends the translated (or original) result to the person who asked.
      class clsTranslationService:
          """Translation service using multiple providers with support for mixed languages"""
          
          def __init__(self):
              # Initialize Sarvam AI client
              self.sarvam_api_key = SARVAM_API_KEY
              self.sarvam_url = "https://api.sarvam.ai/translate"
              
              # Initialize Google Cloud Translation client using simple HTTP requests
              self.google_api_key = GOOGLE_API_KEY
              self.google_translate_url = "https://translation.googleapis.com/language/translate/v2"
          
          def translate_with_sarvam(self, text, source_lang, target_lang="en-IN"):
              """Translate text using Sarvam AI (for Indian languages)"""
              if not self.sarvam_api_key:
                  return {"error": "Sarvam API key not set"}
              
              headers = {
                  "Content-Type": "application/json",
                  "api-subscription-key": self.sarvam_api_key
              }
              
              payload = {
                  "input": text,
                  "source_language_code": source_lang,
                  "target_language_code": target_lang,
                  "speaker_gender": "Female",
                  "mode": "formal",
                  "model": "mayura:v1"
              }
              
              try:
                  response = requests.post(self.sarvam_url, headers=headers, json=payload)
                  if response.status_code == 200:
                      return {"translated_text": response.json().get("translated_text", ""), "provider": "sarvam"}
                  else:
                      return {"error": f"Sarvam API error: {response.text}", "provider": "sarvam"}
              except Exception as e:
                  return {"error": f"Error calling Sarvam API: {str(e)}", "provider": "sarvam"}
          
          def translate_with_google(self, text, target_lang="en"):
              """Translate text using Google Cloud Translation API with direct HTTP request"""
              if not self.google_api_key:
                  return {"error": "Google API key not set"}
              
              try:
                  # Using the translation API v2 with API key
                  params = {
                      "key": self.google_api_key,
                      "q": text,
                      "target": target_lang
                  }
                  
                  response = requests.post(self.google_translate_url, params=params)
                  if response.status_code == 200:
                      data = response.json()
                      translation = data.get("data", {}).get("translations", [{}])[0]
                      return {
                          "translated_text": translation.get("translatedText", ""),
                          "detected_source_language": translation.get("detectedSourceLanguage", ""),
                          "provider": "google"
                      }
                  else:
                      return {"error": f"Google API error: {response.text}", "provider": "google"}
              except Exception as e:
                  return {"error": f"Error calling Google Translation API: {str(e)}", "provider": "google"}
          
          def translate(self, text, language_info):
              """Translate text to English based on language detection info"""
              # If already English or unknown language, return as is
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  return {"translated_text": text, "provider": "none"}
              
              # Handle mixed language content
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  # Strategy for mixed language: 
                  # 1. If one of the languages is English, don't translate the entire text, as it might distort English portions
                  # 2. If no English but contains Indian languages, use Sarvam as it handles code-mixing better
                  # 3. Otherwise, use Google Translate for the primary detected language
                  
                  has_english = False
                  has_indian = False
                  
                  for lang in language_info.get("languages", []):
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-"):
                          has_english = True
                      if lang.get("is_indian", False):
                          has_indian = True
                  
                  if has_english:
                      # Contains English - use Google for full text as it handles code-mixing well
                      return self.translate_with_google(text)
                  elif has_indian:
                      # Contains Indian languages - use Sarvam
                      # Use the highest confidence Indian language as source
                      indian_langs = [lang for lang in language_info.get("languages", []) if lang.get("is_indian", False)]
                      if indian_langs:
                          # Sort by confidence
                          indian_langs.sort(key=lambda x: x.get("confidence", 0), reverse=True)
                          source_lang = indian_langs[0]["language_code"]
                          return self.translate_with_sarvam(text, source_lang)
                      else:
                          # Fallback to primary language
                          if language_info["is_indian"]:
                              return self.translate_with_sarvam(text, language_info["language_code"])
                          else:
                              return self.translate_with_google(text)
                  else:
                      # No English, no Indian languages - use Google for primary language
                      return self.translate_with_google(text)
              else:
                  # Not mixed language - use standard approach
                  if language_info["is_indian"]:
                      # Use Sarvam AI for Indian languages
                      return self.translate_with_sarvam(text, language_info["language_code"])
                  else:
                      # Use Google for other languages
                      return self.translate_with_google(text)

      This Translation Service is like a smart translator that knows how to:

      • Detect what language the text is written in,
      • Choose the best translation provider depending on the language (especially for Indian languages),
      • And then translate the text into English.

      It supports mixed-language content (such as Hindi-English in one sentence) and uses either Google Translate or Sarvam AI, a translation service designed for Indian languages.

      Now, let us understand the key methods in a step-by-step manner:

      1. Translating Using Google Translate

      def translate_with_google(...)
      

      This function uses Google Translate:

      • It sends the text, asks for English as the target language, and gets a translation back.
      • It also detects the source language automatically.
      • If successful, it returns the translated text and the detected original language.
      • If there’s an error, it returns a message saying what went wrong.

      Best For: Non-Indian languages (like Spanish, French, Chinese) and content that is not mixed with English.

      2. Main Translation Logic

      def translate(self, text, language_info)
      

      This is the decision-maker. Here’s how it works:

      Case 1: No Translation Needed

      If the text is already in English or the language is unknown, it simply returns the original text.

      Case 2: Mixed Language (e.g., Hindi + English)

      If the text contains more than one language:

      • ✅ If one part is English → use Google Translate (it’s good with mixed languages).
      • ✅ If it includes Indian languages only → use Sarvam AI (better at handling Indian content).
      • ✅ If it’s neither English nor Indian → use Google Translate.

      The service checks how confident it is about each language in the mix and chooses the most likely one to translate from.

      Case 3: Single Language

      If the text is only in one language:

      • ✅ If it’s an Indian language (like Bengali, Tamil, or Marathi), use Sarvam AI.
      • ✅ If it’s any other language, use Google Translate.

      So, we’ve done it.

      I’ve included the complete working solutions for you in the GitHub Link.

      We’ll cover the detailed performance testing, Optimized configurations & many other useful details in our next post.

      Till then, Happy Avenging! 🙂

      Creating a local LLM Cluster Server using Apple Silicon GPU

      Today, we’re going to discuss creating a local LLM server and then utilizing it to execute various popular LLM models. We will club the local Apple GPUs together via a new framework that binds all the available Apple Silicon devices into one big LLM server. This enables people to run many large models, which was otherwise not possible due to the lack of GPUs.

      This is certainly a new way; One can create virtual computation layers by adding nodes to the resource pool, increasing the computation capacity.

      Why not witness a small demo to energize ourselves –

      Let us understand the scenario. I’ve one Mac Book Pro M4 & 2 Mac Mini Pro M4 (Base models). So, I want to add them & expose them as a cluster as follows –

      As you can see, I’ve connected my MacBook Pro with both the Mac Mini using high-speed thunderbolt cables for better data transmissions. And, I’ll be using an open-source framework called “Exo” to create it.

      Also, you can see that my total computing capacity is 53.11 TFlops, which is slightly more than the last category.

      “Exo” is an open-source framework that helps you merge all your available devices into a large cluster of available resources. This extracts all the computing juice needed to handle complex tasks, including the big LLMs, which require very expensive GPU-based servers.

      For more information on “Exo”, please refer to the following link.

      In our previous diagram, we can see that the framework also offers endpoints.

      • One option is a local ChatGPT interface, where any question you ask will receive a response from models by combining all available computing power.
      • The other endpoint offers users a choice of any standard LLM API endpoint, which helps them integrate it into their solutions.

      Let us see, how the devices are connected together –


      To proceed with this, you need to have at least Python 3.12, Anaconda or Miniconda & Xcode installed in all of your machines. Also, you need to install some Apple-specific MLX packages or libraries to get the best performance.

      Depending on your choice, you need to use the following link to download Anaconda or Miniconda.

      You can download the following link to download the Python 3.12. However, I’ve used Python 3.13 on some machines & some machines, I’ve used Python 3.12. And it worked without any problem.

      Sometimes, after installing Anaconda or Miniconda, the environment may not implicitly be activated after successful installation. In that case, you may need to use the following commands in the terminal -> source ~/.bash_profile

      To verify, whether the conda has been successfully installed & activated, you need to type the following command –

      (base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % conda --version
      conda 24.11.3
      (base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
      (base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 

      Once you verify it. Now, we need to install the following supplemental packages in all the machines as –

      satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
      satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
      satyaki_de@Satyakis-MacBook-Pro-Max Pandas % conda install anaconda::m4
      Channels:
       - defaults
       - anaconda
      Platform: osx-arm64
      Collecting package metadata (repodata.json): done
      Solving environment: done
      
      ## Package Plan ##
      
        environment location: /opt/anaconda3
      
        added / updated specs:
          - anaconda::m4
      
      
      The following packages will be downloaded:
      
          package                    |            build
          ---------------------------|-----------------
          m4-1.4.18                  |       h1230e6a_1         202 KB  anaconda
          ------------------------------------------------------------
                                                 Total:         202 KB
      
      The following NEW packages will be INSTALLED:
      
        m4                 anaconda/osx-arm64::m4-1.4.18-h1230e6a_1 
      
      
      Proceed ([y]/n)? y
      
      
      Downloading and Extracting Packages:
                                                                                                                                                                                                                            
      Preparing transaction: done
      Verifying transaction: done
      Executing transaction: done

      Also, you can use this package to install in your machines –

      (base) satyakidemini2@Satyakis-Mac-mini-2 exo % 
      (base) satyakidemini2@Satyakis-Mac-mini-2 exo % pip install mlx
      Collecting mlx
        Downloading mlx-0.23.2-cp312-cp312-macosx_14_0_arm64.whl.metadata (5.3 kB)
      Downloading mlx-0.23.2-cp312-cp312-macosx_14_0_arm64.whl (27.6 MB)
         ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 27.6/27.6 MB 8.8 MB/s eta 0:00:00
      Installing collected packages: mlx
      Successfully installed mlx-0.23.2
      (base) satyakidemini2@Satyakis-Mac-mini-2 exo % 
      (base) satyakidemini2@Satyakis-Mac-mini-2 exo % 

      Till now, we’ve installed all the important packages. Now, we need to setup the final “eco” framework in all the machines like our previous steps.

      Now, we’ll first clone the “eco” framework by the following commands –

      (base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
      (base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
      (base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % git clone https://github.com/exo-explore/exo.git
      Cloning into 'exo'...
      remote: Enumerating objects: 9736, done.
      remote: Counting objects: 100% (411/411), done.
      remote: Compressing objects: 100% (148/148), done.
      remote: Total 9736 (delta 333), reused 263 (delta 263), pack-reused 9325 (from 3)
      Receiving objects: 100% (9736/9736), 12.18 MiB | 8.41 MiB/s, done.
      Resolving deltas: 100% (5917/5917), done.
      Updating files: 100% (178/178), done.
      Filtering content: 100% (9/9), 3.16 MiB | 2.45 MiB/s, done.
      (base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
      (base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 

      And, the content of the “Exo” folder should look like this –

      total 28672
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 docs
      -rwx------  1 satyaki_de  staff     1337 Mar  9 17:06 configure_mlx.sh
      -rwx------  1 satyaki_de  staff    11107 Mar  9 17:06 README.md
      -rwx------  1 satyaki_de  staff    35150 Mar  9 17:06 LICENSE
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 examples
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 exo
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 extra
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 scripts
      -rwx------  1 satyaki_de  staff      390 Mar  9 17:06 install.sh
      -rwx------  1 satyaki_de  staff      792 Mar  9 17:06 format.py
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 test
      -rwx------  1 satyaki_de  staff     2476 Mar  9 17:06 setup.py
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:10 build
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:17 exo.egg-info

      Similar commands need to fire to other devices. Here, I’m showing one Mac-Mini examples –

      (base) satyakidemini2@Satyakis-Mac-mini-2 Pandas % 
      (base) satyakidemini2@Satyakis-Mac-mini-2 Pandas % git clone https://github.com/exo-explore/exo.git
      Cloning into 'exo'...
      remote: Enumerating objects: 9736, done.
      remote: Counting objects: 100% (424/424), done.
      remote: Compressing objects: 100% (146/146), done.
      remote: Total 9736 (delta 345), reused 278 (delta 278), pack-reused 9312 (from 4)
      Receiving objects: 100% (9736/9736), 12.18 MiB | 6.37 MiB/s, done.
      Resolving deltas: 100% (5920/5920), done.
      (base) satyakidemini2@Satyakis-Mac-mini-2 Pandas % 

      After that, I’ll execute the following sets of commands to install the framework –

      (base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % cd exo
      (base) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
      (base) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
      (base) satyaki_de@Satyakis-MacBook-Pro-Max exo % conda create --name exo1 python=3.13
      WARNING: A conda environment already exists at '/opt/anaconda3/envs/exo1'
      
      Remove existing environment?
      This will remove ALL directories contained within this specified prefix directory, including any other conda environments.
      
       (y/[n])? y
      
      Channels:
       - defaults
      Platform: osx-arm64
      Collecting package metadata (repodata.json): done
      Solving environment: done
      
      ## Package Plan ##
      
        environment location: /opt/anaconda3/envs/exo1
      
        added / updated specs:
          - python=3.13
      
      
      The following NEW packages will be INSTALLED:
      
        bzip2              pkgs/main/osx-arm64::bzip2-1.0.8-h80987f9_6 
        ca-certificates    pkgs/main/osx-arm64::ca-certificates-2025.2.25-hca03da5_0 
        expat              pkgs/main/osx-arm64::expat-2.6.4-h313beb8_0 
        libcxx             pkgs/main/osx-arm64::libcxx-14.0.6-h848a8c0_0 
        libffi             pkgs/main/osx-arm64::libffi-3.4.4-hca03da5_1 
        libmpdec           pkgs/main/osx-arm64::libmpdec-4.0.0-h80987f9_0 
        ncurses            pkgs/main/osx-arm64::ncurses-6.4-h313beb8_0 
        openssl            pkgs/main/osx-arm64::openssl-3.0.16-h02f6b3c_0 
        pip                pkgs/main/osx-arm64::pip-25.0-py313hca03da5_0 
        python             pkgs/main/osx-arm64::python-3.13.2-h4862095_100_cp313 
        python_abi         pkgs/main/osx-arm64::python_abi-3.13-0_cp313 
        readline           pkgs/main/osx-arm64::readline-8.2-h1a28f6b_0 
        setuptools         pkgs/main/osx-arm64::setuptools-75.8.0-py313hca03da5_0 
        sqlite             pkgs/main/osx-arm64::sqlite-3.45.3-h80987f9_0 
        tk                 pkgs/main/osx-arm64::tk-8.6.14-h6ba3021_0 
        tzdata             pkgs/main/noarch::tzdata-2025a-h04d1e81_0 
        wheel              pkgs/main/osx-arm64::wheel-0.45.1-py313hca03da5_0 
        xz                 pkgs/main/osx-arm64::xz-5.6.4-h80987f9_1 
        zlib               pkgs/main/osx-arm64::zlib-1.2.13-h18a0788_1 
      
      
      Proceed ([y]/n)? y
      
      
      Downloading and Extracting Packages:
      
      Preparing transaction: done
      Verifying transaction: done
      Executing transaction: done
      #
      # To activate this environment, use
      #
      #     $ conda activate exo1
      #
      # To deactivate an active environment, use
      #
      #     $ conda deactivate
      
      (base) satyaki_de@Satyakis-MacBook-Pro-Max exo % conda activate exo1
      (exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
      (exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % ls -lrt
      total 24576
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 docs
      -rwx------  1 satyaki_de  staff     1337 Mar  9 17:06 configure_mlx.sh
      -rwx------  1 satyaki_de  staff    11107 Mar  9 17:06 README.md
      -rwx------  1 satyaki_de  staff    35150 Mar  9 17:06 LICENSE
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 examples
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 exo
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 extra
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 scripts
      -rwx------  1 satyaki_de  staff      390 Mar  9 17:06 install.sh
      -rwx------  1 satyaki_de  staff      792 Mar  9 17:06 format.py
      drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 test
      -rwx------  1 satyaki_de  staff     2476 Mar  9 17:06 setup.py
      (exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
      (exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
      (exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % pip install .
      Processing /Volumes/WD_BLACK/PythonCourse/Pandas/exo
        Preparing metadata (setup.py) ... done
      Collecting tinygrad@ git+https://github.com/tinygrad/tinygrad.git@ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8 (from exo==0.0.1)
        Cloning https://github.com/tinygrad/tinygrad.git (to revision ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8) to /private/var/folders/26/dj11b57559b8r8rl6ztdpc840000gn/T/pip-install-q18fzk3r/tinygrad_7917114c483a4d9c83c795b69dbeb5c7
        Running command git clone --filter=blob:none --quiet https://github.com/tinygrad/tinygrad.git /private/var/folders/26/dj11b57559b8r8rl6ztdpc840000gn/T/pip-install-q18fzk3r/tinygrad_7917114c483a4d9c83c795b69dbeb5c7
        Running command git rev-parse -q --verify 'sha^ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8'
        Running command git fetch -q https://github.com/tinygrad/tinygrad.git ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8
        Running command git checkout -q ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8
        Resolved https://github.com/tinygrad/tinygrad.git to commit ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8
        Preparing metadata (setup.py) ... done
      Collecting aiohttp==3.10.11 (from exo==0.0.1)
      .
      .
      (Installed many more dependant packages)
      .
      .
      Downloading propcache-0.3.0-cp313-cp313-macosx_11_0_arm64.whl (44 kB)
      Building wheels for collected packages: exo, nuitka, numpy, uuid, tinygrad
        Building wheel for exo (setup.py) ... done
        Created wheel for exo: filename=exo-0.0.1-py3-none-any.whl size=901357 sha256=5665297f8ea09d06670c9dea91e40270acc4a3cf99a560bf8d268abb236050f7
        Stored in directory: /private/var/folders/26/dj118r8rl6ztdpc840000gn/T/pip-ephem-wheel-cache-0k8zloo3/wheels/b6/91/fb/c1c7d8ca90cf16b9cd8203c11bb512614bee7f6d34
        Building wheel for nuitka (pyproject.toml) ... done
        Created wheel for nuitka: filename=nuitka-2.5.1-cp313-cp313-macosx_11_0_arm64.whl size=3432720 sha256=ae5a280a1684fde98c334516ee8a99f9f0acb6fc2f625643b7f9c5c0887c2998
        Stored in directory: /Users/satyaki_de/Library/Caches/pip/wheels/f6/c9/53/9e37c6fb34c27e892e8357aaead46da610f82117ab2825
        Building wheel for numpy (pyproject.toml) ... done
        Created wheel for numpy: filename=numpy-2.0.0-cp313-cp313-macosx_15_0_arm64.whl size=4920701 sha256=f030b0aa51ec6628f708fab0af14ff765a46d210df89aa66dd8d9482e59b5
        Stored in directory: /Users/satyaki_de/Library/Caches/pip/wheels/e0/d3/66/30d07c18e56ac85e8d3ceaf22f093a09bae124a472b85d1
        Building wheel for uuid (setup.py) ... done
        Created wheel for uuid: filename=uuid-1.30-py3-none-any.whl size=6504 sha256=885103a90d1dc92d9a75707fc353f4154597d232f2599a636de1bc6d1c83d
        Stored in directory: /Users/satyaki_de/Library/Caches/pip/wheels/cc/9d/72/13ff6a181eacfdbd6d761a4ee7c5c9f92034a9dc8a1b3c
        Building wheel for tinygrad (setup.py) ... done
        Created wheel for tinygrad: filename=tinygrad-0.10.0-py3-none-any.whl size=1333964 sha256=1f08c5ce55aa3c87668675beb80810d609955a81b99d416459d2489b36a
        Stored in directory: /Users/satyaki_de/Library/Caches/pip/wheels/c7/bd/02/bd91c1303002619dad23f70f4c1f1c15d0c24c60b043e
      Successfully built exo nuitka numpy uuid tinygrad
      Installing collected packages: uuid, sentencepiece, nvidia-ml-py, zstandard, uvloop, urllib3, typing-extensions, tqdm, tinygrad, scapy, safetensors, regex, pyyaml, pygments, psutil, protobuf, propcache, prometheus-client, pillow, packaging, ordered-set, numpy, multidict, mlx, mdurl, MarkupSafe, idna, grpcio, fsspec, frozenlist, filelock, charset-normalizer, certifi, attrs, annotated-types, aiohappyeyeballs, aiofiles, yarl, requests, pydantic-core, opencv-python, nuitka, markdown-it-py, Jinja2, grpcio-tools, aiosignal, rich, pydantic, huggingface-hub, aiohttp, tokenizers, aiohttp_cors, transformers, mlx-lm, exo
      Successfully installed Jinja2-3.1.4 MarkupSafe-3.0.2 aiofiles-24.1.0 aiohappyeyeballs-2.5.0 aiohttp-3.10.11 aiohttp_cors-0.7.0 aiosignal-1.3.2 annotated-types-0.7.0 attrs-25.1.0 certifi-2025.1.31 charset-normalizer-3.4.1 exo-0.0.1 filelock-3.17.0 frozenlist-1.5.0 fsspec-2025.3.0 grpcio-1.67.0 grpcio-tools-1.67.0 huggingface-hub-0.29.2 idna-3.10 markdown-it-py-3.0.0 mdurl-0.1.2 mlx-0.22.0 mlx-lm-0.21.1 multidict-6.1.0 nuitka-2.5.1 numpy-2.0.0 nvidia-ml-py-12.560.30 opencv-python-4.10.0.84 ordered-set-4.1.0 packaging-24.2 pillow-10.4.0 prometheus-client-0.20.0 propcache-0.3.0 protobuf-5.28.1 psutil-6.0.0 pydantic-2.9.2 pydantic-core-2.23.4 pygments-2.19.1 pyyaml-6.0.2 regex-2024.11.6 requests-2.32.3 rich-13.7.1 safetensors-0.5.3 scapy-2.6.1 sentencepiece-0.2.0 tinygrad-0.10.0 tokenizers-0.20.3 tqdm-4.66.4 transformers-4.46.3 typing-extensions-4.12.2 urllib3-2.3.0 uuid-1.30 uvloop-0.21.0 yarl-1.18.3 zstandard-0.23.0
      (exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
      

      And, you need to perform the same process in other available devices as well.

      Now, we’re ready to proceed with the final command –

      (.venv) (exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % exo
      /opt/anaconda3/envs/exo1/lib/python3.13/site-packages/google/protobuf/runtime_version.py:112: UserWarning: Protobuf gencode version 5.27.2 is older than the runtime version 5.28.1 at node_service.proto. Please avoid checked-in Protobuf gencode that can be obsolete.
        warnings.warn(
      None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
      None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
      Selected inference engine: None
      
        _____  _____  
       / _ \ \/ / _ \ 
      |  __/>  < (_) |
       \___/_/\_\___/ 
          
      Detected system: Apple Silicon Mac
      Inference engine name after selection: mlx
      Using inference engine: MLXDynamicShardInferenceEngine with shard downloader: SingletonShardDownloader
      [60771, 54631, 54661]
      Chat interface started:
       - http://127.0.0.1:52415
       - http://XXX.XXX.XX.XX:52415
       - http://XXX.XXX.XXX.XX:52415
       - http://XXX.XXX.XXX.XXX:52415
      ChatGPT API endpoint served at:
       - http://127.0.0.1:52415/v1/chat/completions
       - http://XXX.XXX.X.XX:52415/v1/chat/completions
       - http://XXX.XXX.XXX.XX:52415/v1/chat/completions
       - http://XXX.XXX.XXX.XXX:52415/v1/chat/completions
      has_read=True, has_write=True
      ╭────────────────────────────────────────────────────────────────────────────────────────────── Exo Cluster (2 nodes) ───────────────────────────────────────────────────────────────────────────────────────────────╮
      Received exit signal SIGTERM...
      Thank you for using exo.
      
        _____  _____  
       / _ \ \/ / _ \ 
      |  __/>  < (_) |
       \___/_/\_\___/ 
          
      

      Note that I’ve masked the IP addresses for security reasons.


      At the beginning, if we trigger the main MacBook Pro Max, the “Exo” screen should looks like this –

      And if you open the URL, you will see the following ChatGPT-like interface –

      Connecting without the Thunderbolt bridge with the relevant port or a hub may cause performance degradation. Hence, how you connect will play a major role in the success of this intention. However, this is certainly a great idea to proceed with.


      So, we’ve done it.

      We’ll cover the detailed performance testing, Optimized configurations & many other useful details in our next post.

      Till then, Happy Avenging! 🙂

      Building a real-time streamlit app by consuming events from Ably channels

      I’ll bring an exciting streamlit app that will reflect the real-time dashboard by consuming all the events from the Ably channel.

      One more time, I’ll be utilizing my IoT emulator that will feed the real-time events based on the user inputs to the Ably channel, which will be subscribed to by the Streamlit-based app.

      However, I would like to share the run before we dig deep into this.


      Demo

      Isn’t this exciting? How we can use our custom-built IoT emulator & capture real-time events to Ably Queue, then transform those raw events into more meaningful KPIs? Let’s deep dive then.

      Let’s explore the broad-level architecture/flow –

      As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, the streamlit-based Dashboard app consumes the events & transforms them into more meaningful metrics.

      Let us understand the sample packages that are required for this task.

      pip install ably==2.0.3
      pip install numpy==1.26.3
      pip install pandas==2.2.0
      pip install plotly==5.19.0
      pip install requests==2.31.0
      pip install streamlit==1.30.0
      pip install streamlit-autorefresh==1.0.1
      pip install streamlit-echarts==0.4.0

      Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that are required for this use case.

      1. app.py (This script will consume real-time streaming data coming out from a hosted API source using another popular third-party service named Ably. Ably mimics the pub sub-streaming concept, which might be extremely useful for any start-up. This will then translate into many meaningful KPIs in a streamlit-based dashboard app.)

      Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.

      def createHumidityGauge(humidity_value):
          fig = go.Figure(go.Indicator(
              mode = "gauge+number",
              value = humidity_value,
              domain = {'x': [0, 1], 'y': [0, 1]},
              title = {'text': "Humidity", 'font': {'size': 24}},
              gauge = {
                  'axis': {'range': [None, 100], 'tickwidth': 1, 'tickcolor': "darkblue"},
                  'bar': {'color': "darkblue"},
                  'bgcolor': "white",
                  'borderwidth': 2,
                  'bordercolor': "gray",
                  'steps': [
                      {'range': [0, 50], 'color': 'cyan'},
                      {'range': [50, 100], 'color': 'royalblue'}],
                  'threshold': {
                      'line': {'color': "red", 'width': 4},
                      'thickness': 0.75,
                      'value': humidity_value}
              }
          ))
      
          fig.update_layout(height=220, paper_bgcolor = "white", font = {'color': "darkblue", 'family': "Arial"}, margin=dict(t=0, l=5, r=5, b=0))
      
          return fig

      The above function creates a customized humidity gauge that visually represents a given humidity value, making it easy to read and understand at a glance.

      This code defines a function createHumidityGauge that creates a visual gauge (like a meter) to display a humidity value. Here’s a simple breakdown of what it does:

      1. Function Definition: It starts by defining a function named createHumidityGauge that takes one parameter, humidity_value, which is the humidity level you want to display on the gauge.
      2. Creating the Gauge: Inside the function, it creates a figure using Plotly (a plotting library) with a specific type of chart called an Indicator. This Indicator is set to display in “gauge+number” mode, meaning it shows both a gauge visual and the numeric value of the humidity.
      3. Setting Gauge Properties:
        • The value is set to the humidity_value parameter, so the gauge shows this humidity level.
        • The domain sets the position of the gauge on the plot, which is set to fill the available space ([0, 1] for both x and y axes).
        • The title is set to “Humidity” with a font size of 24, labeling the gauge.
        • The gauge section defines the appearance and behavior of the gauge, including:
          • An axis that goes from 0 to 100 (assuming humidity is measured as a percentage from 0% to 100%).
          • The color and style of the gauge’s bar and background.
          • Colored steps indicating different ranges of humidity (cyan for 0-50% and royal blue for 50-100%).
          • A threshold line that appears at the value of the humidity, marked in red to stand out.
      4. Finalizing the Gauge Appearance: The function then updates the layout of the figure to set its height, background color, font style, and margins to make sure the gauge looks nice and is visible.
      5. Returning the Figure: Finally, the function returns the fig object, which is the fully configured gauge, ready to be displayed.

      Other similar functions will repeat the same steps.

      def createTemperatureLineChart(data):
          # Assuming 'data' is a DataFrame with a 'Timestamp' index and a 'Temperature' column
          fig = px.line(data, x=data.index, y='Temperature', title='Temperature Vs Time')
          fig.update_layout(height=270)  # Specify the desired height here
          return fig

      The above function takes a set of temperature data indexed by timestamp and creates a line chart that visually represents how the temperature changes over time.

      This code defines a function “createTemperatureLineChart” that creates a line chart to display temperature data over time. Here’s a simple summary of what it does:

      1. Function Definition: It starts with defining a function named createTemperatureLineChart that takes one parameter, data, which is expected to be a DataFrame (a type of data structure used in pandas, a Python data analysis library). This data frame should have a ‘Timestamp’ as its index (meaning each row represents a different point in time) and a ‘Temperature’ column containing temperature values.
      2. Creating the Line Chart: The function uses Plotly Express (a plotting library) to create a line chart with the following characteristics:
        • The x-axis represents time, taken from the DataFrame’s index (‘Timestamp’).
        • The y-axis represents temperature, taken from the ‘Temperature’ column in the DataFrame.
        • The chart is titled ‘Temperature Vs Time’, clearly indicating what the chart represents.
      3. Customizing the Chart: It then updates the layout of the chart to set a specific height (270 pixels) for the chart, making it easier to view.
      4. Returning the Chart: Finally, the function returns the fig object, which is the fully prepared line chart, ready to be displayed.

      Similar functions will repeat for other KPIs.

          st.sidebar.header("KPIs")
          selected_kpis = st.sidebar.multiselect(
              "Select KPIs", options=["Temperature", "Humidity", "Pressure"], default=["Temperature"]
          )

      The above code will create a sidebar with drop-down lists, which will show the KPIs (“Temperature”, “Humidity”, “Pressure”).

      # Split the layout into columns for KPIs and graphs
          gauge_col, kpi_col, graph_col = st.columns(3)
      
          # Auto-refresh setup
          st_autorefresh(interval=7000, key='data_refresh')
      
          # Fetching real-time data
          data = getData(var1, DInd)
      
          st.markdown(
              """
              <style>
              .stEcharts { margin-bottom: -50px; }  /* Class might differ, inspect the HTML to find the correct class name */
              </style>
              """,
              unsafe_allow_html=True
          )
      
          # Display gauges at the top of the page
          gauges = st.container()
      
          with gauges:
              col1, col2, col3 = st.columns(3)
              with col1:
                  humidity_value = round(data['Humidity'].iloc[-1], 2)
                  humidity_gauge_fig = createHumidityGauge(humidity_value)
                  st.plotly_chart(humidity_gauge_fig, use_container_width=True)
      
              with col2:
                  temp_value = round(data['Temperature'].iloc[-1], 2)
                  temp_gauge_fig = createTempGauge(temp_value)
                  st.plotly_chart(temp_gauge_fig, use_container_width=True)
      
              with col3:
                  pressure_value = round(data['Pressure'].iloc[-1], 2)
                  pressure_gauge_fig = createPressureGauge(pressure_value)
                  st.plotly_chart(pressure_gauge_fig, use_container_width=True)
      
      
          # Next row for actual readings and charts side-by-side
          readings_charts = st.container()
      
      
          # Display KPIs and their trends
          with readings_charts:
              readings_col, graph_col = st.columns([1, 2])
      
              with readings_col:
                  st.subheader("Latest Readings")
                  if "Temperature" in selected_kpis:
                      st.metric("Temperature", f"{temp_value:.2f}%")
      
                  if "Humidity" in selected_kpis:
                      st.metric("Humidity", f"{humidity_value:.2f}%")
      
                  if "Pressure" in selected_kpis:
                      st.metric("Pressure", f"{pressure_value:.2f}%")
      
      
              # Graph placeholders for each KPI
              with graph_col:
                  if "Temperature" in selected_kpis:
                      temperature_fig = createTemperatureLineChart(data.set_index("Timestamp"))
      
                      # Display the Plotly chart in Streamlit with specified dimensions
                      st.plotly_chart(temperature_fig, use_container_width=True)
      
                  if "Humidity" in selected_kpis:
                      humidity_fig = createHumidityLineChart(data.set_index("Timestamp"))
      
                      # Display the Plotly chart in Streamlit with specified dimensions
                      st.plotly_chart(humidity_fig, use_container_width=True)
      
                  if "Pressure" in selected_kpis:
                      pressure_fig = createPressureLineChart(data.set_index("Timestamp"))
      
                      # Display the Plotly chart in Streamlit with specified dimensions
                      st.plotly_chart(pressure_fig, use_container_width=True)
      1. The code begins by splitting the Streamlit web page layout into three columns to separately display Key Performance Indicators (KPIs), gauges, and graphs.
      2. It sets up an auto-refresh feature with a 7-second interval, ensuring the data displayed is regularly updated without manual refreshes.
      3. Real-time data is fetched using a function called getData, which takes unspecified parameters var1 and DInd.
      4. A CSS style is injected into the Streamlit page to adjust the margin of Echarts elements, which may be used to improve the visual layout of the page.
      5. A container for gauges is created at the top of the page, with three columns inside it dedicated to displaying humidity, temperature, and pressure gauges.
      6. Each gauge (humidity, temperature, and pressure) is created by rounding the last value from the fetched data to two decimal places and then visualized using respective functions that create Plotly gauge charts.
      7. Below the gauges, another container is set up for displaying the latest readings and their corresponding graphs in a side-by-side layout, using two columns.
      8. The left column under “Latest Readings” displays the latest values for selected KPIs (temperature, humidity, pressure) as metrics.
      9. In the right column, for each selected KPI, a line chart is created using data with timestamps as indices and displayed using Plotly charts, allowing for a visual trend analysis.
      10. This structured approach enables a dynamic and interactive dashboard within Streamlit, offering real-time insights into temperature, humidity, and pressure with both numeric metrics and graphical trends, optimized for regular data refreshes and user interactivity.

      Let us understand some of the important screenshots of this application –


      So, we’ve done it.

      I’ll bring some more exciting topics in the coming days from the Python verse.

      Till then, Happy Avenging! 🙂

      Navigating the Future of Work: Insights from the Argyle AI Summit

      At the recent Argyle AI Summit, a prestigious event in the AI industry, I had the honor of participating as a speaker alongside esteemed professionals like Misha Leybovich from Google Labs. The summit, coordinated by Sylvia Das Chagas, a former senior AI conversation designer at CVS Health, provided an enlightening platform to discuss the evolving role of AI in talent management. Our session focused on the theme “Driving Talent with AI,” addressing some of the most pressing questions in the field. Frequently, relevant use cases were shared in detail to support these threads.

      To view the actual page, please click the following link.

      One of the critical topics we explored was AI’s impact on talent management in the upcoming year. AI’s influence in hiring and retention is becoming increasingly significant. For example, AI-powered tools can now analyze vast amounts of data to identify the best candidates for a role, going beyond traditional resume screening. In retention, AI is instrumental in identifying patterns that indicate an employee’s likelihood to leave, enabling proactive measures.

      A burning question in AI is how leaders address fears that AI might replace manual jobs. We discussed the importance of leaders framing AI as a complement to human skills rather than a replacement. AI enhances employee capabilities by automating mundane tasks, allowing employees to focus on more creative and strategic work.

      Regarding new AI tools that organizations should watch out for, the conversation highlighted tools that enhance remote collaboration and workplace inclusivity. Tools like virtual meeting assistants that can transcribe, translate, and summarize meetings in real time are becoming invaluable in today’s global work environment.

      AI’s role in boosting employee motivation and productivity was another focal point. We discussed how AI-driven career development programs can offer personalized learning paths, helping employees grow and stay motivated.

      Incorporating multiple languages in tools like ChatGPT was highlighted as a critical step towards inclusivity. This expansion allows a broader range of employees to interact with AI tools in their native language, fostering a more inclusive workplace environment.

      Lastly, we tackled the challenge of addressing employees’ reluctance to change. Emphasizing the importance of transparent communication and education about AI’s benefits was identified as key. Organizations can alleviate fears and encourage a more accepting attitude towards AI by involving employees in the AI implementation process and providing training.

      The Argyle AI Summit offered a compelling glimpse into the future of AI in talent management. The session provided valuable insights for leaders looking to harness AI’s potential to enhance talent management strategies by discussing real-world examples and strategies. To gain more in-depth knowledge and perspectives shared during this summit, I encourage interested parties to visit the recorded session link for a more comprehensive understanding.

      Or, you can directly view it from here –


      I would greatly appreciate your feedback on the insights shared during the summit. Your thoughts and perspectives are invaluable as we continue to explore and navigate the evolving landscape of AI in the workplace.

      RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

      Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

      In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

      Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

      Demo

      Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

      As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

      Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

      Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

      The following are the important packages that are essential to this project –

      pip install farm-haystack==1.19.0
      pip install Flask==2.2.5
      pip install Flask-Cors==4.0.0
      pip install Flask-JWT-Extended==4.5.2
      pip install Flask-Session==0.5.0
      pip install openai==0.27.8
      pip install pandas==2.0.3
      pip install tensorflow==2.11.1

      We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

      Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

      • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
      #########################################################
      #### Written By: SATYAKI DE                          ####
      #### Written On: 27-Jun-2023                         ####
      #### Modified On 28-Jun-2023                         ####
      ####                                                 ####
      #### Objective: This is the main calling             ####
      #### python script that will invoke the              ####
      #### shortcut application created inside MAC         ####
      #### enviornment including MacBook, IPad or IPhone.  ####
      ####                                                 ####
      #########################################################
      
      from flask import Flask, jsonify, request, session
      from flask_cors import CORS
      from werkzeug.security import check_password_hash, generate_password_hash
      from flask_jwt_extended import JWTManager, jwt_required, create_access_token
      import pandas as pd
      from clsConfigClient import clsConfigClient as cf
      import clsL as log
      import clsContentScrapper as csc
      import clsRAGOpenAI as crao
      import csv
      from datetime import timedelta
      import os
      import re
      import json
      
      ########################################################
      ################    Global Area   ######################
      ########################################################
      #Initiating Logging Instances
      clog = log.clsL()
      
      admin_key = cf.conf['ADMIN_KEY']
      secret_key = cf.conf['SECRET_KEY']
      session_path = cf.conf['SESSION_PATH']
      sessionFile = cf.conf['SESSION_CACHE_FILE']
      
      app = Flask(__name__)
      CORS(app)  # This will enable CORS for all routes
      app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
      app.secret_key = secret_key
      
      jwt = JWTManager(app)
      
      users = cf.conf['USER_NM']
      passwd = cf.conf['USER_PWD']
      
      cCScrapper = csc.clsContentScrapper()
      cr = crao.clsRAGOpenAI()
      
      # Disbling Warning
      def warn(*args, **kwargs):
          pass
      
      import warnings
      warnings.warn = warn
      
      # Define the aggregation functions
      def join_unique(series):
          unique_vals = series.drop_duplicates().astype(str)
          return ', '.join(filter(lambda x: x != 'nan', unique_vals))
      
      # Building the preaggregate cache
      def groupImageWiki():
          try:
              base_path = cf.conf['OUTPUT_PATH']
              inputFile = cf.conf['CLEANED_FILE']
              outputFile = cf.conf['CLEANED_FILE_SHORT']
              subdir = cf.conf['SUBDIR_OUT']
              Ind = cf.conf['DEBUG_IND']
      
              inputCleanedFileLookUp = base_path + inputFile
      
              #Opening the file in dataframe
              df = pd.read_csv(inputCleanedFileLookUp)
              hash_values = df['Total_Hash'].unique()
      
              dFin = df[['primaryImage','Wiki_URL','Total_Hash']]
      
              # Ensure columns are strings and not NaN
              # Convert columns to string and replace 'nan' with an empty string
              dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
              dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')
      
              dFin.drop_duplicates()
      
              # Group by 'Total_Hash' and aggregate
              dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()
      
              return dfAgg
      
          except Exception as e:
              x = str(e)
              print('Error: ', x)
      
              df = pd.DataFrame()
      
              return df
      
      resDf = groupImageWiki()
      
      ########################################################
      ################  End  Global Area  ####################
      ########################################################
      
      def extractRemoveUrls(hash_value):
          image_urls = ''
          wiki_urls = ''
          # Parse the inner message JSON string
          try:
      
              resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
              filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]
      
              if not filtered_df.empty:
                  image_urls = filtered_df['primaryImage'].values[0]
                  wiki_urls = filtered_df['Wiki_URL'].values[0]
      
              return image_urls, wiki_urls
      
          except Exception as e:
              x = str(e)
              print('extractRemoveUrls Error: ', x)
              return image_urls, wiki_urls
      
      def isIncomplete(line):
          """Check if a line appears to be incomplete."""
      
          # Check if the line ends with certain patterns indicating it might be incomplete.
          incomplete_patterns = [': [Link](', ': Approximately ', ': ']
          return any(line.endswith(pattern) for pattern in incomplete_patterns)
      
      def filterData(data):
          """Return only the complete lines from the data."""
      
          lines = data.split('\n')
          complete_lines = [line for line in lines if not isIncomplete(line)]
      
          return '\n'.join(complete_lines)
      
      def updateCounter(sessionFile):
          try:
              counter = 0
      
              # Check if the CSV file exists
              if os.path.exists(sessionFile):
                  with open(sessionFile, 'r') as f:
                      reader = csv.reader(f)
                      for row in reader:
                          # Assuming the counter is the first value in the CSV
                          counter = int(row[0])
      
              # Increment counter
              counter += 1
      
              # Write counter back to CSV
              with open(sessionFile, 'w', newline='') as f:
                  writer = csv.writer(f)
                  writer.writerow([counter])
      
              return counter
          except Exception as e:
              x = str(e)
              print('Error: ', x)
      
              return 1
      
      def getPreviousResult():
          try:
              fullFileName = session_path + sessionFile
              newCounterValue = updateCounter(fullFileName)
      
              return newCounterValue
          except Exception as e:
              x = str(e)
              print('Error: ', x)
      
              return 1
      
      @app.route('/login', methods=['POST'])
      def login():
          username = request.json.get('username', None)
          password = request.json.get('password', None)
      
          print('User Name: ', str(username))
          print('Password: ', str(password))
      
          #if username not in users or not check_password_hash(users.get(username), password):
          if ((username not in users) or (password not in passwd)):
              return jsonify({'login': False}), 401
      
          access_token = create_access_token(identity=username)
          return jsonify(access_token=access_token)
      
      @app.route('/chat', methods=['POST'])
      def get_chat():
          try:
              #session["key"] = "1D98KI"
              #session_id = session.sid
              #print('Session Id: ', str(session_id))
      
              cnt = getPreviousResult()
              print('Running Session Count: ', str(cnt))
      
              username = request.json.get('username', None)
              message = request.json.get('message', None)
      
              print('User: ', str(username))
              print('Content: ', str(message))
      
              if cnt == 1:
                  retList = cCScrapper.extractCatalog()
              else:
                  hashValue, cleanedData = cr.getData(str(message))
                  print('Main Hash Value:', str(hashValue))
      
                  imageUrls, wikiUrls = extractRemoveUrls(hashValue)
                  print('Image URLs: ', str(imageUrls))
                  print('Wiki URLs: ', str(wikiUrls))
                  print('Clean Text:')
                  print(str(cleanedData))
                  retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'
      
              response = {
                  'message': retList
              }
      
              print('JSON: ', str(response))
              return jsonify(response)
      
          except Exception as e:
              x = str(e)
      
              response = {
                  'message': 'Error: ' + x
              }
              return jsonify(response)
      
      @app.route('/api/data', methods=['GET'])
      @jwt_required()
      def get_data():
          response = {
              'message': 'Hello from Flask!'
          }
          return jsonify(response)
      
      if __name__ == '__main__':
          app.run(debug=True)
      

      Let us understand some of the important sections of the above script –

      Function – login():

      The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

      Function – get_chat():

      The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

      Function – updateCounter():

      The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

      Function – extractRemoveUrls():

      The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

      • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
      #####################################################
      #### Written By: SATYAKI DE                      ####
      #### Written On: 27-May-2023                     ####
      #### Modified On 28-May-2023                     ####
      ####                                             ####
      #### Objective: This is the main calling         ####
      #### python class that will invoke the           ####
      #### LangChain of package to extract             ####
      #### the transcript from the YouTube videos &    ####
      #### then answer the questions based on the      ####
      #### topics selected by the users.               ####
      ####                                             ####
      #####################################################
      
      from langchain.document_loaders import YoutubeLoader
      from langchain.text_splitter import RecursiveCharacterTextSplitter
      from langchain.embeddings.openai import OpenAIEmbeddings
      from langchain.vectorstores import FAISS
      from langchain.chat_models import ChatOpenAI
      from langchain.chains import LLMChain
      
      from langchain.prompts.chat import (
          ChatPromptTemplate,
          SystemMessagePromptTemplate,
          HumanMessagePromptTemplate,
      )
      
      from googleapiclient.discovery import build
      
      import clsTemplate as ct
      from clsConfigClient import clsConfigClient as cf
      
      import os
      
      from flask import jsonify
      import requests
      
      ###############################################
      ###           Global Section                ###
      ###############################################
      open_ai_Key = cf.conf['OPEN_AI_KEY']
      os.environ["OPENAI_API_KEY"] = open_ai_Key
      embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)
      
      YouTube_Key = cf.conf['YOUTUBE_KEY']
      youtube = build('youtube', 'v3', developerKey=YouTube_Key)
      
      # Disbling Warning
      def warn(*args, **kwargs):
          pass
      
      import warnings
      warnings.warn = warn
      
      ###############################################
      ###    End of Global Section                ###
      ###############################################
      
      class clsContentScrapper:
          def __init__(self):
              self.model_name = cf.conf['MODEL_NAME']
              self.temp_val = cf.conf['TEMP_VAL']
              self.max_cnt = int(cf.conf['MAX_CNT'])
              self.url = cf.conf['BASE_URL']
              self.header_token = cf.conf['HEADER_TOKEN']
      
          def extractCatalog(self):
              try:
                  base_url = self.url
                  header_token = self.header_token
      
                  url = base_url + '/departments'
      
                  print('Full URL: ', str(url))
      
                  payload={}
                  headers = {'Cookie': header_token}
      
                  response = requests.request("GET", url, headers=headers, data=payload)
      
                  x = response.text
      
                  return x
              except Exception as e:
                  discussedTopic = []
                  x = str(e)
                  print('Error: ', x)
      
                  return x
      

      Let us understand the the core part that require from this class.

      Function – extractCatalog():

      The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

      • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
      #########################################################
      #### Written By: SATYAKI DE                          ####
      #### Written On: 27-Jun-2023                         ####
      #### Modified On 28-Jun-2023                         ####
      ####                                                 ####
      #### Objective: This is the main calling             ####
      #### python script that will invoke the              ####
      #### shortcut application created inside MAC         ####
      #### enviornment including MacBook, IPad or IPhone.  ####
      ####                                                 ####
      #########################################################
      
      from haystack.document_stores.faiss import FAISSDocumentStore
      from haystack.nodes import DensePassageRetriever
      import openai
      
      from clsConfigClient import clsConfigClient as cf
      import clsL as log
      
      # Disbling Warning
      def warn(*args, **kwargs):
          pass
      
      import warnings
      warnings.warn = warn
      
      import os
      import re
      ###############################################
      ###           Global Section                ###
      ###############################################
      Ind = cf.conf['DEBUG_IND']
      queryModel = cf.conf['QUERY_MODEL']
      passageModel = cf.conf['PASSAGE_MODEL']
      
      #Initiating Logging Instances
      clog = log.clsL()
      
      os.environ["TOKENIZERS_PARALLELISM"] = "false"
      
      vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
      
      indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
      indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"
      
      print('File: ', str(indexFile))
      print('Config: ', str(indexConfig))
      
      # Also, provide `config_path` parameter if you set it when calling the `save()` method:
      new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)
      
      # Initialize Retriever
      retriever = DensePassageRetriever(document_store=new_document_store,
                                        query_embedding_model=queryModel,
                                        passage_embedding_model=passageModel,
                                        use_gpu=False)
      
      
      ###############################################
      ###    End of Global Section                ###
      ###############################################
      
      class clsRAGOpenAI:
          def __init__(self):
              self.basePath = cf.conf['DATA_PATH']
              self.fileName = cf.conf['FILE_NAME']
              self.Ind = cf.conf['DEBUG_IND']
              self.subdir = str(cf.conf['OUT_DIR'])
              self.base_url = cf.conf['BASE_URL']
              self.outputPath = cf.conf['OUTPUT_PATH']
              self.vectorDBPath = cf.conf['VECTORDB_PATH']
              self.openAIKey = cf.conf['OPEN_AI_KEY']
              self.temp = cf.conf['TEMP_VAL']
              self.modelName = cf.conf['MODEL_NAME']
              self.maxToken = cf.conf['MAX_TOKEN']
      
          def extractHash(self, text):
              try:
                  # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
                  pattern = r"Ref: \{'(\d+)'\}"
                  match = re.search(pattern, text)
      
                  if match:
                      return match.group(1)
                  else:
                      return None
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return None
      
          def removeSentencesWithNaN(self, text):
              try:
                  # Split text into sentences using regular expression
                  sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
                  # Filter out sentences containing 'nan'
                  filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
                  # Rejoin the sentences
                  return ' '.join(filteredSentences)
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return ''
      
          def retrieveDocumentsReader(self, question, top_k=9):
              return retriever.retrieve(question, top_k=top_k)
      
          def generateAnswerWithGPT3(self, retrieved_docs, question):
              try:
                  openai.api_key = self.openAIKey
                  temp = self.temp
                  modelName = self.modelName
                  maxToken = self.maxToken
      
                  documentsText = " ".join([doc.content for doc in retrieved_docs])
      
                  filteredDocs = self.removeSentencesWithNaN(documentsText)
                  hashValue = self.extractHash(filteredDocs)
      
                  print('RAG Docs:: ')
                  print(filteredDocs)
                  #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"
      
                  # Set up a chat-style prompt with your data
                  messages = [
                      {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                      {"role": "user", "content": filteredDocs}
                  ]
      
                  # Chat style invoking the latest model
                  response = openai.ChatCompletion.create(
                      model=modelName,
                      messages=messages,
                      temperature = temp,
                      max_tokens=maxToken
                  )
                  return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
              except Exception as e:
                  x = str(e)
                  print('failed to get from OpenAI: ', x)
                  return 'Not Available!'
      
          def ragAnswerWithHaystackAndGPT3(self, question):
              retrievedDocs = self.retrieveDocumentsReader(question)
              return self.generateAnswerWithGPT3(retrievedDocs, question)
      
          def getData(self, strVal):
              try:
                  print('*'*120)
                  print('Index Your Data for Retrieval:')
                  print('*'*120)
      
                  print('Response from New Docs: ')
                  print()
      
                  hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)
      
                  print('GPT3 Answer::')
                  print(answer)
                  print('Hash Value:')
                  print(str(hashValue))
      
                  print('*'*240)
                  print('End Of Use RAG to Generate Answers:')
                  print('*'*240)
      
                  return hashValue, answer
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
                  answer = x
                  hashValue = 1
      
                  return hashValue, answer
      

      Let us understand some of the important block –

      Function – ragAnswerWithHaystackAndGPT3():

      The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

      Function – generateAnswerWithGPT3():

      The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

      Function – retrieveDocumentsReader():

      The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

      • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
      // App.js
      import React, { useState } from 'react';
      import axios from 'axios';
      import './App.css';
      
      const App = () => {
        const [isLoggedIn, setIsLoggedIn] = useState(false);
        const [username, setUsername] = useState('');
        const [password, setPassword] = useState('');
        const [message, setMessage] = useState('');
        const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);
      
        const handleLogin = async (e) => {
          e.preventDefault();
          try {
            const response = await axios.post('http://localhost:5000/login', { username, password });
            if (response.status === 200) {
              setIsLoggedIn(true);
            }
          } catch (error) {
            console.error('Login error:', error);
          }
        };
      
        const sendMessage = async (username) => {
          if (message.trim() === '') return;
      
          // Create a new chat entry
          const newChatEntry = {
            sender: 'user',
            message: message.trim(),
          };
      
          // Clear the input field
          setMessage('');
      
          try {
            // Make API request to Python-based API
            const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
            const responseData = response.data;
      
            // Print the response to the console for debugging
            console.log('API Response:', responseData);
      
            // Parse the nested JSON from the 'message' attribute
            const jsonData = JSON.parse(responseData.message);
      
            // Check if the data contains 'departments'
            if (jsonData.departments) {
      
              // Extract the 'departments' attribute from the parsed data
              const departments = jsonData.departments;
      
              // Extract the department names and create a single string with line breaks
              const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');
      
              // Update the chat log with the bot's response
              setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
            }
            else if (jsonData.records)
            {
              // Data structure 2: Artwork information
              const records = jsonData.records;
      
              // Prepare chat entries
              const chatEntries = [];
      
              // Iterate through records and extract text, image, and wiki information
              records.forEach((record) => {
                const textInfo = Object.entries(record).map(([key, value]) => {
                  if (key !== 'Image' && key !== 'Wiki') {
                    return `${key}: ${value}`;
                  }
                  return null;
                }).filter((info) => info !== null).join('\n');
      
                const imageLink = record.Image;
                //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
                //const wikiLinks = record.Wiki;
                const wikiLinks = record.Wiki.split(',').map(link => link.trim());
      
                console.log('Wiki:', wikiLinks);
      
                // Check if there is a valid image link
                const hasValidImage = imageLink && imageLink !== '[]';
      
                const imageElement = hasValidImage ? (
                  <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
                ) : null;
      
                // Create JSX elements for rendering the wiki links (if available)
                const wikiElements = wikiLinks.map((link, index) => (
                  <div key={index}>
                    <a href={link} target="_blank" rel="noopener noreferrer">
                      Wiki Link {index + 1}
                    </a>
                  </div>
                ));
      
                if (textInfo) {
                  chatEntries.push({ sender: 'bot', message: textInfo });
                }
      
                if (imageElement) {
                  chatEntries.push({ sender: 'bot', message: imageElement });
                }
      
                if (wikiElements.length > 0) {
                  chatEntries.push({ sender: 'bot', message: wikiElements });
                }
              });
      
              // Update the chat log with the bot's response
              setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
            }
      
          } catch (error) {
            console.error('Error sending message:', error);
          }
        };
      
        if (!isLoggedIn) {
          return (
            <div className="login-container">
              <h2>Welcome to the MuBot</h2>
              <form onSubmit={handleLogin} className="login-form">
                <input
                  type="text"
                  placeholder="Enter your name"
                  value={username}
                  onChange={(e) => setUsername(e.target.value)}
                  required
                />
                <input
                  type="password"
                  placeholder="Enter your password"
                  value={password}
                  onChange={(e) => setPassword(e.target.value)}
                  required
                />
                <button type="submit">Login</button>
              </form>
            </div>
          );
        }
      
        return (
          <div className="chat-container">
            <div className="chat-header">
              <h2>Hello, {username}</h2>
              <h3>Chat with MuBot</h3>
            </div>
            <div className="chat-log">
              {chatLog.map((chatEntry, index) => (
                <div
                  key={index}
                  className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
                >
                  <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
                  <p className="chat-message">{chatEntry.message}</p>
                </div>
              ))}
            </div>
            <div className="chat-input">
              <input
                type="text"
                placeholder="Type your message..."
                value={message}
                onChange={(e) => setMessage(e.target.value)}
                onKeyPress={(e) => {
                  if (e.key === 'Enter') {
                    sendMessage();
                  }
                }}
              />
              <button onClick={sendMessage}>Send</button>
            </div>
          </div>
        );
      };
      
      export default App;
      

      Please find some of the important logic –

      Function – handleLogin():

      The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

      Function – sendMessage():

      The sendMessage asynchronous function is designed to handle the user’s chat interaction:

      1. If the message is empty (after trimming spaces), the function exits without further action.
      2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
      3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
      4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
      5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
      6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
      7. Errors, if encountered, are logged to the console.

      This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


      Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


      So, finally, we’ve done it.

      I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

      You will get the complete codebase in the following GitHub link.

      I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

      Till then, Happy Avenging! 🙂

      Enable OpenAI chatbot with the selected YouTube video content using LangChain, FAISS & YouTube data-API.

      Today, I’m very excited to demonstrate an effortless & new way to extract the transcript from YouTube videos & then answer the questions based on the topics selected by the users. In this post, I plan to deal with the user inputs to consider the case first & then it can summarize the video content through useful advanced analytics with the help of the LangChain & OpenAI-based model.

      In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.
      Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

      Demo

      Isn’t it very exciting? This will lead to a whole new ballgame, where one can get critical decision-making information from these human sources along with their traditional advanced analytical data.

      How will it help?

      Let’s say as per your historical data & analytics, the dashboard is recommending prod-A, prod-B & prod-C as the top three products for potential top-performing brands. Whereas, you are getting some alerts from the TV news on prod-B due to the recent incidents. So, in that case, you don’t want to continue with the prod-B investment. You may find a new product named prod-Z. That may reduce the risk of your investment.


      What is LangChain?

      LangChain is a framework for developing applications powered by language models. We believe that the most powerful and differentiated applications will not only call out to a language model but will also be:

      1. Data-aware: connect a language model to other sources of data
      2. Agentic: allow a language model to interact with its environment

      The LangChain framework works around these principles.

      To know more about this, please click the following link.

      As you can see, this is one of the critical components in our solution, which will bind the OpenAI bot & it will feed the necessary data to provide the correct response.


      What is FAISS?

      Faiss is a library for efficient similarity search and clustering of dense vectors. It contains algorithms that search in sets of vectors of any size, up to ones that do not fit in RAM. It also has supporting code for evaluation and parameter tuning.

      Faiss developed using C++ with complete wrappers for Python—some of the most beneficial algorithms available both on CPU & in GPU as well. Facebook AI Research develops it.

      To know more about this, please click the following link.


      FLOW OF EVENTS:

      Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

      Here are the steps that will follow in sequence –

      • The application will first get the topic on which it needs to look from YouTube & find the top 5 videos using the YouTube data-API.
      • Once the application returns a list of websites from the above step, LangChain will drive the application will extract the transcripts from the video & then optimize the response size in smaller chunks to address the costly OpenAI calls. During this time, it will invoke FAISS to create document DBs.
      • Finally, it will send those chunks to OpenAI for the best response based on your supplied template that performs the final analysis with small data required for your query & gets the appropriate response with fewer costs.

      CODE:

      Why don’t we go through the code made accessible due to this new library for this particular use case?

      • clsConfigClient.py (This is the main calling Python script for the input parameters.)


      ################################################
      #### Written By: SATYAKI DE ####
      #### Written On: 15-May-2020 ####
      #### Modified On: 28-May-2023 ####
      #### ####
      #### Objective: This script is a config ####
      #### file, contains all the keys for ####
      #### personal OpenAI-based video content ####
      #### enable bot. ####
      #### ####
      ################################################
      import os
      import platform as pl
      class clsConfigClient(object):
      Curr_Path = os.path.dirname(os.path.realpath(__file__))
      os_det = pl.system()
      if os_det == "Windows":
      sep = '\\'
      else:
      sep = '/'
      conf = {
      'APP_ID': 1,
      'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
      'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
      'LOG_PATH': Curr_Path + sep + 'log' + sep,
      'DATA_PATH': Curr_Path + sep + 'data' + sep,
      'MODEL_PATH': Curr_Path + sep + 'model' + sep,
      'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
      'MODEL_DIR': 'model',
      'APP_DESC_1': 'LangChain Demo!',
      'DEBUG_IND': 'N',
      'INIT_PATH': Curr_Path,
      'FILE_NAME': 'Output.csv',
      'MODEL_NAME': 'gpt-3.5-turbo',
      'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
      'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
      'TITLE': "LangChain Demo!",
      'TEMP_VAL': 0.2,
      'PATH' : Curr_Path,
      'MAX_CNT' : 5,
      'OUT_DIR': 'data'
      }

      Some of the key entries from the above scripts are as follows –

      'MODEL_NAME': 'gpt-3.5-turbo',
      'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
      'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
      'TEMP_VAL': 0.2,

      From the above code snippet, one can understand that we need both the API keys for YouTube & OpenAI. And they have separate costs & usage, which I’ll share later in the post. Also, notice that the temperature sets to 0.2 ( range between 0 to 1). That means our AI bot will be consistent in response. And our application will use the GPT-3.5-turbo model for its analytic response.

      • clsTemplate.py (Contains all the templates for OpenAI.)


      ################################################
      #### Written By: SATYAKI DE ####
      #### Written On: 27-May-2023 ####
      #### Modified On: 28-May-2023 ####
      #### ####
      #### Objective: This script is a config ####
      #### file, contains all the template for ####
      #### OpenAI prompts to get the correct ####
      #### response. ####
      #### ####
      ################################################
      # Template to use for the system message prompt
      templateVal_1 = """
      You are a helpful assistant that that can answer questions about youtube videos
      based on the video's transcript: {docs}
      Only use the factual information from the transcript to answer the question.
      If you feel like you don't have enough information to answer the question, say "I don't know".
      Your answers should be verbose and detailed.
      """

      view raw

      clsTemplate.py

      hosted with ❤ by GitHub

      The above code is self-explanatory. Here, we’re keeping the correct instructions for our OpenAI to respond within these guidelines.

      • clsVideoContentScrapper.py (Main class to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


      #####################################################
      #### Written By: SATYAKI DE ####
      #### Written On: 27-May-2023 ####
      #### Modified On 28-May-2023 ####
      #### ####
      #### Objective: This is the main calling ####
      #### python class that will invoke the ####
      #### LangChain of package to extract ####
      #### the transcript from the YouTube videos & ####
      #### then answer the questions based on the ####
      #### topics selected by the users. ####
      #### ####
      #####################################################
      from langchain.document_loaders import YoutubeLoader
      from langchain.text_splitter import RecursiveCharacterTextSplitter
      from langchain.embeddings.openai import OpenAIEmbeddings
      from langchain.vectorstores import FAISS
      from langchain.chat_models import ChatOpenAI
      from langchain.chains import LLMChain
      from langchain.prompts.chat import (
      ChatPromptTemplate,
      SystemMessagePromptTemplate,
      HumanMessagePromptTemplate,
      )
      from googleapiclient.discovery import build
      import clsTemplate as ct
      from clsConfigClient import clsConfigClient as cf
      import os
      ###############################################
      ### Global Section ###
      ###############################################
      open_ai_Key = cf.conf['OPEN_AI_KEY']
      os.environ["OPENAI_API_KEY"] = open_ai_Key
      embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)
      YouTube_Key = cf.conf['YOUTUBE_KEY']
      youtube = build('youtube', 'v3', developerKey=YouTube_Key)
      # Disbling Warning
      def warn(*args, **kwargs):
      pass
      import warnings
      warnings.warn = warn
      ###############################################
      ### End of Global Section ###
      ###############################################
      class clsVideoContentScrapper:
      def __init__(self):
      self.model_name = cf.conf['MODEL_NAME']
      self.temp_val = cf.conf['TEMP_VAL']
      self.max_cnt = int(cf.conf['MAX_CNT'])
      def createDBFromYoutubeVideoUrl(self, video_url):
      try:
      loader = YoutubeLoader.from_youtube_url(video_url)
      transcript = loader.load()
      text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
      docs = text_splitter.split_documents(transcript)
      db = FAISS.from_documents(docs, embeddings)
      return db
      except Exception as e:
      x = str(e)
      print('Error: ', x)
      return ''
      def getResponseFromQuery(self, db, query, k=4):
      try:
      """
      gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
      the number of tokens to analyze.
      """
      mod_name = self.model_name
      temp_val = self.temp_val
      docs = db.similarity_search(query, k=k)
      docs_page_content = " ".join([d.page_content for d in docs])
      chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)
      # Template to use for the system message prompt
      template = ct.templateVal_1
      system_message_prompt = SystemMessagePromptTemplate.from_template(template)
      # Human question prompt
      human_template = "Answer the following question: {question}"
      human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
      chat_prompt = ChatPromptTemplate.from_messages(
      [system_message_prompt, human_message_prompt]
      )
      chain = LLMChain(llm=chat, prompt=chat_prompt)
      response = chain.run(question=query, docs=docs_page_content)
      response = response.replace("\n", "")
      return response, docs
      except Exception as e:
      x = str(e)
      print('Error: ', x)
      return '', ''
      def topFiveURLFromYouTube(self, service, **kwargs):
      try:
      video_urls = []
      channel_list = []
      results = service.search().list(**kwargs).execute()
      for item in results['items']:
      print("Title: ", item['snippet']['title'])
      print("Description: ", item['snippet']['description'])
      channel = item['snippet']['channelId']
      print("Channel Id: ", channel)
      # Fetch the channel name using the channel ID
      channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
      channel_title = channel_response['items'][0]['snippet']['title']
      print("Channel Title: ", channel_title)
      channel_list.append(channel_title)
      print("Video Id: ", item['id']['videoId'])
      vidURL = "https://www.youtube.com/watch?v=&quot; + item['id']['videoId']
      print("Video URL: " + vidURL)
      video_urls.append(vidURL)
      print("\n")
      return video_urls, channel_list
      except Exception as e:
      video_urls = []
      channel_list = []
      x = str(e)
      print('Error: ', x)
      return video_urls, channel_list
      def extractContentInText(self, topic, query):
      try:
      discussedTopic = []
      strKeyText = ''
      cnt = 0
      max_cnt = self.max_cnt
      urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
      print('Returned List: ')
      print(urlList)
      print()
      for video_url in urlList:
      print('Processing Video: ')
      print(video_url)
      db = self.createDBFromYoutubeVideoUrl(video_url)
      response, docs = self.getResponseFromQuery(db, query)
      if len(response) > 0:
      strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
      discussedTopic.append(strKeyText + response)
      cnt += 1
      return discussedTopic
      except Exception as e:
      discussedTopic = []
      x = str(e)
      print('Error: ', x)
      return discussedTopic

      Let us understand the key methods step by step in detail –

      def topFiveURLFromYouTube(self, service, **kwargs):
          try:
              video_urls = []
              channel_list = []
              results = service.search().list(**kwargs).execute()
      
              for item in results['items']:
                  print("Title: ", item['snippet']['title'])
                  print("Description: ", item['snippet']['description'])
                  channel = item['snippet']['channelId']
                  print("Channel Id: ", channel)
      
                  # Fetch the channel name using the channel ID
                  channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
                  channel_title = channel_response['items'][0]['snippet']['title']
                  print("Channel Title: ", channel_title)
                  channel_list.append(channel_title)
      
                  print("Video Id: ", item['id']['videoId'])
                  vidURL = "https://www.youtube.com/watch?v=" + item['id']['videoId']
                  print("Video URL: " + vidURL)
                  video_urls.append(vidURL)
                  print("\n")
      
              return video_urls, channel_list
      
          except Exception as e:
              video_urls = []
              channel_list = []
              x = str(e)
              print('Error: ', x)
      
              return video_urls, channel_list

      The above code will fetch the most relevant YouTube URLs & bind them into a list along with the channel names & then share the lists with the main functions.

      def createDBFromYoutubeVideoUrl(self, video_url):
          try:
              loader = YoutubeLoader.from_youtube_url(video_url)
              transcript = loader.load()
      
              text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
              docs = text_splitter.split_documents(transcript)
      
              db = FAISS.from_documents(docs, embeddings)
              return db
      
          except Exception as e:
              x = str(e)
              print('Error: ', x)
              return ''

      The provided Python code defines a function createDBFromYoutubeVideoUrl which appears to create a database of text documents from the transcript of a YouTube video. Here’s the explanation in simple English:

      1. The function createDBFromYoutubeVideoUrl has defined with one argument: video_url.
      2. The function uses a try-except block to handle any potential exceptions or errors that may occur.
      3. Inside the try block, the following steps are going to perform:
      • First, it creates a YoutubeLoader object from the provided video_url. This object is likely responsible for interacting with the YouTube video specified by the URL.
      • The loader object then loads the transcript of the video. This object is the text version of everything spoken in the video.
      • It then creates a RecursiveCharacterTextSplitter object with a specified chunk_size of 1000 and chunk_overlap of 100. This object may split the transcript into smaller chunks (documents) of text for easier processing or analysis. Each piece will be around 1000 characters long, and there will overlap of 100 characters between consecutive chunks.
      • The split_documents method of the text_splitter object will split the transcript into smaller documents. These documents are stored in the docs variable.
      • The FAISS.from_documents method is then called with docs and embeddings as arguments to create a FAISS (Facebook AI Similarity Search) index. This index is a database used for efficient similarity search and clustering of high-dimensional vectors, which in this case, are the embeddings of the documents. The FAISS index is stored in the db variable.
      • Finally, the db variable is returned, representing the created database from the video transcript.

      4. If an exception occurs during the execution of the try block, the code execution moves to the except block:

      • Here, it first converts the exception e to a string x.
      • Then it prints an error message.
      • Finally, it returns an empty string as an indication of the error.

      def getResponseFromQuery(self, db, query, k=4):
            try:
                """
                gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
                the number of tokens to analyze.
                """
      
                mod_name = self.model_name
                temp_val = self.temp_val
      
                docs = db.similarity_search(query, k=k)
                docs_page_content = " ".join([d.page_content for d in docs])
      
                chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)
      
                # Template to use for the system message prompt
                template = ct.templateVal_1
      
                system_message_prompt = SystemMessagePromptTemplate.from_template(template)
      
                # Human question prompt
                human_template = "Answer the following question: {question}"
                human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
      
                chat_prompt = ChatPromptTemplate.from_messages(
                    [system_message_prompt, human_message_prompt]
                )
      
                chain = LLMChain(llm=chat, prompt=chat_prompt)
      
                response = chain.run(question=query, docs=docs_page_content)
                response = response.replace("\n", "")
                return response, docs
      
            except Exception as e:
                x = str(e)
                print('Error: ', x)
      
                return '', ''

      The Python function getResponseFromQuery is designed to search a given database (db) for a specific query and then generate a response using a language model (possibly GPT-3.5-turbo). The answer is based on the content found and the particular question. Here is a simple English summary:

      1. The function getResponseFromQuery takes three parameters: db, query, and k. The k parameter is optional and defaults to 4 if not provided. db is the database to search, the query is the question or prompts to analyze, and k is the number of similar items to return.
      2. The function initiates a try-except block for handling any errors that might occur.
      3. Inside the try block:
      • The function retrieves the model name and temperature value from the instance of the class this function is a part of.
      • The function then searches the db database for documents similar to the query and saves these in docs.
      • It concatenates the content of the returned documents into a single string docs_page_content.
      • It creates a ChatOpenAI object with the model name and temperature value.
      • It creates a system message prompt from a predefined template.
      • It creates a human message prompt, which is the query.
      • It combines these two prompts to form a chat prompt.
      • An LLMChain object is then created using the ChatOpenAI object and the chat prompt.
      • This LLMChain object is used to generate a response to the query using the content of the documents found in the database. The answer is then formatted by replacing all newline characters with empty strings.
      • Finally, the function returns this response along with the original documents.
      1. If any error occurs during these operations, the function goes to the except block where:
      • The error message is printed.
      • The function returns two empty strings to indicate an error occurred, and no response or documents could be produced.

      def extractContentInText(self, topic, query):
          try:
              discussedTopic = []
              strKeyText = ''
              cnt = 0
              max_cnt = self.max_cnt
      
              urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
              print('Returned List: ')
              print(urlList)
              print()
      
              for video_url in urlList:
                  print('Processing Video: ')
                  print(video_url)
                  db = self.createDBFromYoutubeVideoUrl(video_url)
      
                  response, docs = self.getResponseFromQuery(db, query)
      
                  if len(response) > 0:
                      strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
                      discussedTopic.append(strKeyText + response)
      
                  cnt += 1
      
              return discussedTopic
          except Exception as e:
              discussedTopic = []
              x = str(e)
              print('Error: ', x)
      
              return discussedTopic

      This Python function, extractContentInText, is aimed to extract relevant content from the transcripts of top YouTube videos on a specific topic and generate responses to a given query. Here’s a simple English translation:

      1. The function extractContentInText is defined with topic and query as parameters.
      2. It begins with a try-except block to catch and handle any possible exceptions.
      3. In the try block:
      • It initializes several variables: an empty list discussedTopic to store the extracted information, an empty string strKeyText to keep specific parts of the content, a counter cnt initialized at 0, and max_cnt retrieved from the self-object to specify the maximum number of YouTube videos to consider.
      • It calls the topFiveURLFromYouTube function (defined previously) to get the URLs of the top videos on the given topic from YouTube. It also retrieves the list of channel names associated with these videos.
      • It prints the returned list of URLs.
      • Then, it starts a loop over each URL in the urlList.
        • For each URL, it prints the URL, then creates a database from the transcript of the YouTube video using the function createDBFromYoutubeVideoUrl.
        • It then uses the getResponseFromQuery function to get a response to the query based on the content of the database.
        • If the length of the response is greater than 0 (meaning there is a response), it forms a string strKeyText to indicate the channel that the topic was discussed on and then appends the answer to this string. This entire string is then added to the discussedTopic list.
        • It increments the counter cnt by one after each iteration.
        • Finally, it returns the discussedTopic list, which now contains relevant content extracted from the videos.
      1. If any error occurs during these operations, the function goes into the except block:
      • It first resets discussedTopic to an empty list.
      • Then it converts the exception e to a string and prints the error message.
      • Lastly, it returns the empty discussedTopic list, indicating that no content could be extracted due to the error.
      • testLangChain.py (Main Python script to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


      #####################################################
      #### Written By: SATYAKI DE ####
      #### Written On: 27-May-2023 ####
      #### Modified On 28-May-2023 ####
      #### ####
      #### Objective: This is the main calling ####
      #### python script that will invoke the ####
      #### clsVideoContentScrapper class to extract ####
      #### the transcript from the YouTube videos. ####
      #### ####
      #####################################################
      import clsL as cl
      from clsConfigClient import clsConfigClient as cf
      import datetime
      import textwrap
      import clsVideoContentScrapper as cvsc
      # Disbling Warning
      def warn(*args, **kwargs):
      pass
      import warnings
      warnings.warn = warn
      ######################################
      ### Get your global values ####
      ######################################
      debug_ind = 'Y'
      # Initiating Logging Instances
      clog = cl.clsL()
      data_path = cf.conf['DATA_PATH']
      data_file_name = cf.conf['FILE_NAME']
      cVCScrapper = cvsc.clsVideoContentScrapper()
      ######################################
      #### Global Flag ########
      ######################################
      def main():
      try:
      var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      print('*'*120)
      print('Start Time: ' + str(var))
      print('*'*120)
      #query = "What are they saying about Microsoft?"
      print('Please share your topic!')
      inputTopic = input('User: ')
      print('Please ask your questions?')
      inputQry = input('User: ')
      print()
      retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
      cnt = 0
      for discussedTopic in retList:
      finText = str(cnt + 1) + ') ' + discussedTopic
      print()
      print(textwrap.fill(finText, width=150))
      cnt += 1
      r1 = len(retList)
      if r1 > 0:
      print()
      print('Successfully Scrapped!')
      else:
      print()
      print('Failed to Scrappe!')
      print('*'*120)
      var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      print('End Time: ' + str(var1))
      except Exception as e:
      x = str(e)
      print('Error: ', x)
      if __name__ == "__main__":
      main()

      Please find the key snippet –

      def main():
          try:
              var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
              print('*'*120)
              print('Start Time: ' + str(var))
              print('*'*120)
      
              #query = "What are they saying about Microsoft?"
              print('Please share your topic!')
              inputTopic = input('User: ')
              print('Please ask your questions?')
              inputQry = input('User: ')
              print()
      
              retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
              cnt = 0
      
              for discussedTopic in retList:
                  finText = str(cnt + 1) + ') ' + discussedTopic
                  print()
                  print(textwrap.fill(finText, width=150))
      
                  cnt += 1
      
              r1 = len(retList)
      
              if r1 > 0:
                  print()
                  print('Successfully Scrapped!')
              else:
                  print()
                  print('Failed to Scrappe!')
      
              print('*'*120)
              var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
              print('End Time: ' + str(var1))
      
          except Exception as e:
              x = str(e)
              print('Error: ', x)
      
      if __name__ == "__main__":
          main()

      The above main application will capture the topics from the user & then will give the user a chance to ask specific questions on the topics, invoking the main class to extract the transcript from YouTube & then feed it as a source using ChainLang & finally deliver the response. If there is no response, then it will skip the overall options.

      USAGE & COST FACTOR:

      Please find the OpenAI usage –

      Please find the YouTube API usage –


      So, finally, we’ve done it.

      I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

      You will get the complete codebase in the following GitHub link.

      I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

      Till then, Happy Avenging! 🙂

      Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality. Sample video taken from Santrel Media & you would find the link over here.

      Documenting undocumented python scripts using Python-OpenAI

      Today, I will discuss another very impressive & innovative new AI, which is now operational in Python. We’ll document a dummy python code with no comment captured through OpenAI’s ChatGPT model. But before we start, don’t we see the demo first?

      Demo

      Great! Let us understand we can leverage this by writing a tiny snippet using this new AI model.

      Architecture:

      Let us understand the flow of events –

      The above diagram represents the newly released OpenAI ChatGPT, where one needs to supply the code, which was missed to capture the logic earlier due to whatever may be the reasons. We need to provide these scripts (maybe in parts) as source code to be analyzed. Then it will use this new model & translate that into English-like language & capture the logic/comments for that specific snippet.


      Python Packages:

      Following are the python packages that are necessary to develop this brilliant use case –

      pip install pandas
      pip install openai

      To know more, please click the below – “Continue Reading” link –

      Continue reading “Documenting undocumented python scripts using Python-OpenAI”

      Predicting ball movement from live sports using Open-CV Python & Kalman filter

      Today, I’m going to discuss another Computer Vision installment. I’ll use Open CV & Kalman filter to predict a live ball movement of Cricket, one of the most popular sports in the Indian sub-continent, along with the UK & Australia. But before we start a deep dive, why don’t we first watch the demo?

      Demo

      Isn’t it exciting? Let’s explore it in detail.


      Architecture:

      Let us understand the flow of events –

      The above diagram shows that the application, which uses Open CV, analyzes individual frames. It detects the cricket ball & finally, it tracks every movement by analyzing each frame & then it predicts (pink line) based on the supplied data points.


      Python Packages:

      Following are the python packages that are necessary to develop this brilliant use case –

      pip install opencv-python
      pip install numpy
      pip install cvzone

      CODE:

      Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

      • clsPredictBodyLine.py (The main class that will handle the prediction of Cricket balls in the real-time video feed.)


      #####################################################
      #### Written By: SATYAKI DE ####
      #### Written On: 20-Nov-2022 ####
      #### Modified On 30-Nov-2022 ####
      #### ####
      #### Objective: This is the main calling ####
      #### python script that will invoke the ####
      #### clsPredictBodyLine class to initiate ####
      #### the prediction capability in real-time ####
      #### & display the result from a live sports. ####
      #####################################################
      import cv2
      import cvzone
      from cvzone.ColorModule import ColorFinder
      from clsKalmanFilter import clsKalmanFilter
      from clsConfigClient import clsConfigClient as cf
      import numpy as np
      import math
      import ssl
      import time
      # Bypassing SSL Authentication
      try:
      _create_unverified_https_context = ssl._create_unverified_context
      except AttributeError:
      # Legacy python that doesn't verify HTTPS certificates by default
      pass
      else:
      # Handle target environment that doesn't support HTTPS verification
      ssl._create_default_https_context = _create_unverified_https_context
      # Disbling Warning
      def warn(*args, **kwargs):
      pass
      import warnings
      warnings.warn = warn
      ###############################################
      ### Global Section ###
      ###############################################
      # Load Kalman filter to predict the trajectory
      kf = clsKalmanFilter()
      # Create the color ColorFinder
      myColorFinder = ColorFinder(False)
      posListX = []
      posListY = []
      xList = [item for item in range(0, 1300)]
      prediction=False
      ###############################################
      ### End of Global Section ###
      ###############################################
      class clsPredictBodyLine(object):
      def __init__(self):
      self.inputFile_1 = str(cf.conf['BASE_FILE'])
      self.inputFile_2 = str(cf.conf['BASE_IMAGE_FILE'])
      self.src_path = str(cf.conf['SRC_PATH'])
      self.hsvVals = cf.conf['HSV']
      self.pauseTime = cf.conf['PAUSE']
      self.pT1 = int(cf.conf['POINT_1'])
      self.pT2 = int(cf.conf['POINT_2'])
      self.pT3 = int(cf.conf['POINT_3'])
      self.pT4 = int(cf.conf['POINT_4'])
      def predStream(self, img, hsvVals, FrNo):
      try:
      pT1 = self.pT1
      pT2 = self.pT2
      pT3 = self.pT3
      pT4 = self.pT4
      #Find the color ball
      imgColor, mask = myColorFinder.update(img, hsvVals)
      #Find location of the red_ball
      imgContours, contours = cvzone.findContours(img, mask, minArea=500)
      if contours:
      posListX.append(contours[0]['center'][0])
      posListY.append(contours[0]['center'][1])
      if posListX:
      # Find the Coefficients
      A, B, C = np.polyfit(posListX, posListY, 2)
      for i, (posX, posY) in enumerate(zip(posListX, posListY)):
      pos = (posX, posY)
      cv2.circle(imgContours, pos, 10, (0,255,0), cv2.FILLED)
      # Using Karman Filter Prediction
      predicted = kf.predict(posX, posY)
      cv2.circle(imgContours, (predicted[0], predicted[1]), 12, (255,0,255), cv2.FILLED)
      ballDetectFlag = True
      if ballDetectFlag:
      print('Balls Detected!')
      if i == 0:
      cv2.line(imgContours, pos, pos, (0,255,0), 5)
      cv2.line(imgContours, predicted, predicted, (255,0,255), 5)
      else:
      predictedM = kf.predict(posListX[i-1], posListY[i-1])
      cv2.line(imgContours, pos, (posListX[i-1], posListY[i-1]), (0,255,0), 5)
      cv2.line(imgContours, predicted, predictedM, (255,0,255), 5)
      if len(posListX) < 10:
      # Calculation for best place to ball
      a1 = A
      b1 = B
      c1 = C – pT1
      X1 = int((- b1 – math.sqrt(b1**2 – (4*a1*c1)))/(2*a1))
      prediction1 = pT2 < X1 < pT3
      a2 = A
      b2 = B
      c2 = C – pT4
      X2 = int((- b2 – math.sqrt(b2**2 – (4*a2*c2)))/(2*a2))
      prediction2 = pT2 < X2 < pT3
      prediction = prediction1 | prediction2
      if prediction:
      print('Good Length Ball!')
      sMsg = "Good Length Ball – (" + str(FrNo) + ")"
      cvzone.putTextRect(imgContours, sMsg, (50,150), scale=5, thickness=5, colorR=(0,200,0), offset=20)
      else:
      print('Loose Ball!')
      sMsg = "Loose Ball – (" + str(FrNo) + ")"
      cvzone.putTextRect(imgContours, sMsg, (50,150), scale=5, thickness=5, colorR=(0,0,200), offset=20)
      return imgContours
      except Exception as e:
      x = str(e)
      print('Error predStream:', x)
      return img
      def processVideo(self, debugInd, var):
      try:
      cnt = 0
      lastRowFlag=True
      breakFlag = False
      pauseTime = self.pauseTime
      src_path = self.src_path
      inputFile_1 = self.inputFile_1
      inputFile_2 = self.inputFile_2
      hsvVals = self.hsvVals
      FileName_1 = src_path + inputFile_1
      FileName_2 = src_path + inputFile_2
      # Initialize the video
      cap = cv2.VideoCapture(FileName_1)
      while True:
      try:
      if breakFlag:
      break
      # Grab the frames
      success, img = cap.read()
      time.sleep(pauseTime)
      cnt+=1
      print('*'*60)
      print('Frame Number:', str(cnt))
      if (cv2.waitKey(1) & 0xFF) == ord("q"):
      break
      if success:
      imgContours = self.predStream(img, hsvVals, cnt)
      if imgContours is None:
      imgContours = img
      imgColor = cv2.resize(imgContours, (0,0), None, 0.7, 0.7)
      # Display
      cv2.imshow("ImageColor", imgColor)
      print('*'*60)
      else:
      breakFlag=True
      except Exception as e:
      x = str(e)
      print('Error Main:', x)
      cv2.destroyAllWindows()
      return 0
      except Exception as e:
      x = str(e)
      print('Error:', x)
      cv2.destroyAllWindows()
      return 1

      Please find the key snippet from the above script –

      kf = clsKalmanFilter()

      The application is instantiating the modified Kalman filter.

      myColorFinder = ColorFinder(False)

      This command has more purpose than creating a proper mask in debug mode if you want to isolate the color of any object you want to track. To debug this property, one needs to set the flag to True. And you will see the following screen. Click the next video to get the process to generate the accurate HSV.

      In the end, you will get a similar entry to the below one –

      And you can see the entry that is available in the config for the following parameter –

      'HSV': {'hmin': 173, 'smin':177, 'vmin':57, 'hmax':178, 'smax':255, 'vmax':255},

      The next important block is –

      def predStream(self, img, hsvVals, FrNo):
          try:
              pT1 = self.pT1
              pT2 = self.pT2
              pT3 = self.pT3
              pT4 = self.pT4

      The four points mentioned above will help us determine the best region for the ball, forcing the batsman to play the shots & a 90% chance of getting caught behind.


      The snippets below will apply the mask & identify the contour of the objects which the program intends to track. In this case, we are talking about the pink cricket ball.

      #Find the color ball
      imgColor, mask = myColorFinder.update(img, hsvVals)
      
      #Find location of the red_ball
      imgContours, contours = cvzone.findContours(img, mask, minArea=500)
      
      if contours:
          posListX.append(contours[0]['center'][0])
          posListY.append(contours[0]['center'][1])

      The next key snippets are as follows –

      if posListX:
          # Find the Coefficients
          A, B, C = np.polyfit(posListX, posListY, 2)
      
          for i, (posX, posY) in enumerate(zip(posListX, posListY)):
              pos = (posX, posY)
              cv2.circle(imgContours, pos, 10, (0,255,0), cv2.FILLED)
      
              # Using Karman Filter Prediction
              predicted = kf.predict(posX, posY)
              cv2.circle(imgContours, (predicted[0], predicted[1]), 12, (255,0,255), cv2.FILLED)
      
              ballDetectFlag = True
              if ballDetectFlag:
                  print('Balls Detected!')
      
              if i == 0:
                  cv2.line(imgContours, pos, pos, (0,255,0), 5)
                  cv2.line(imgContours, predicted, predicted, (255,0,255), 5)
              else:
                  predictedM = kf.predict(posListX[i-1], posListY[i-1])
      
                  cv2.line(imgContours, pos, (posListX[i-1], posListY[i-1]), (0,255,0), 5)
                  cv2.line(imgContours, predicted, predictedM, (255,0,255), 5)

      The above lines will track the original & predicted lines & then it will plot on top of the frame in real time.

      The next line will be as follows –

      if len(posListX) < 10:
      
          # Calculation for best place to ball
          a1 = A
          b1 = B
          c1 = C - pT1
      
          X1 = int((- b1 - math.sqrt(b1**2 - (4*a1*c1)))/(2*a1))
          prediction1 = pT2 < X1 < pT3
      
          a2 = A
          b2 = B
          c2 = C - pT4
      
          X2 = int((- b2 - math.sqrt(b2**2 - (4*a2*c2)))/(2*a2))
          prediction2 = pT2 < X2 < pT3
      
          prediction = prediction1 | prediction2
      
      if prediction:
          print('Good Length Ball!')
          sMsg = "Good Length Ball - (" + str(FrNo) + ")"
          cvzone.putTextRect(imgContours, sMsg, (50,150), scale=5, thickness=5, colorR=(0,200,0), offset=20)
      else:
          print('Loose Ball!')
          sMsg = "Loose Ball - (" + str(FrNo) + ")"
          cvzone.putTextRect(imgContours, sMsg, (50,150), scale=5, thickness=5, colorR=(0,0,200), offset=20)
      • predictBodyLine.py (The main python script that will invoke the class to predict Cricket balls in the real-time video feed.)


      #####################################################
      #### Written By: SATYAKI DE ####
      #### Written On: 25-Nov-2022 ####
      #### Modified On 30-Nov-2022 ####
      #### ####
      #### Objective: This is the main calling ####
      #### python script that will invoke the ####
      #### clsPredictBodyLine class to initiate ####
      #### the predict capability in real-time ####
      #### from a cricket (Sports) streaming. ####
      #####################################################
      # We keep the setup code in a different class as shown below.
      import clsPredictBodyLine as pbdl
      from clsConfigClient import clsConfigClient as cf
      import datetime
      import logging
      def main():
      try:
      # Other useful variables
      debugInd = 'Y'
      var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      var1 = datetime.datetime.now()
      print('Start Time: ', str(var))
      # End of useful variables
      # Initiating Log Class
      general_log_path = str(cf.conf['LOG_PATH'])
      # Enabling Logging Info
      logging.basicConfig(filename=general_log_path + 'predBodyLine.log', level=logging.INFO)
      print('Started predicting best bodyline deliveries from the Cricket Streaming!')
      # Passing source data csv file
      x1 = pbdl.clsPredictBodyLine()
      # Execute all the pass
      r1 = x1.processVideo(debugInd, var)
      if (r1 == 0):
      print('Successfully predicted body-line deliveries!')
      else:
      print('Failed to predict body-line deliveries!')
      var2 = datetime.datetime.now()
      c = var2 – var1
      minutes = c.total_seconds() / 60
      print('Total difference in minutes: ', str(minutes))
      print('End Time: ', str(var1))
      except Exception as e:
      x = str(e)
      print('Error: ', x)
      if __name__ == "__main__":
      main()

      Here is the final key snippet –

      # Passing source data csv file
      x1 = pbdl.clsPredictBodyLine()
      
      # Execute all the pass
      r1 = x1.processVideo(debugInd, var)
      
      if (r1 == 0):
          print('Successfully predicted body-line deliveries!')
      else:
          print('Failed to predict body-line deliveries!')

      The above lines will first instantiate the main class & then invoke it.

      You can find it here if you want to know more about the Kalman filter.

      So, finally, we’ve done it.


      FOLDER STRUCTURE:

      You will get the complete codebase in the following GitHub link.

      I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

      Till then, Happy Avenging! 🙂

      Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.