Real-time video summary assistance App – Part 2

As a continuation of the previous post, I would like to continue my discussion about the implementation of MCP protocols among agents. But before that, I want to add the quick demo one more time to recap our objectives.

Let us recap the process flow –

Also, understand the groupings of scripts by each group as posted in the previous post –

Message-Chaining Protocol (MCP) Implementation:

    clsMCPMessage.py
    clsMCPBroker.py

YouTube Transcript Extraction:

    clsYouTubeVideoProcessor.py

Language Detection:

    clsLanguageDetector.py

Translation Services & Agents:

    clsTranslationAgent.py
    clsTranslationService.py

Documentation Agent:

    clsDocumentationAgent.py
    
Research Agent:

    clsDocumentationAgent.py

Great! Now, we’ll continue with the main discussion.


def extract_youtube_id(youtube_url):
    """Extract YouTube video ID from URL"""
    youtube_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url)
    if youtube_id_match:
        return youtube_id_match.group(1)
    return None

def get_youtube_transcript(youtube_url):
    """Get transcript from YouTube video"""
    video_id = extract_youtube_id(youtube_url)
    if not video_id:
        return {"error": "Invalid YouTube URL or ID"}
    
    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        
        # First try to get manual transcripts
        try:
            transcript = transcript_list.find_manually_created_transcript(["en"])
            transcript_data = transcript.fetch()
            print(f"Debug - Manual transcript format: {type(transcript_data)}")
            if transcript_data and len(transcript_data) > 0:
                print(f"Debug - First item type: {type(transcript_data[0])}")
                print(f"Debug - First item sample: {transcript_data[0]}")
            return {"text": transcript_data, "language": "en", "auto_generated": False}
        except Exception as e:
            print(f"Debug - No manual transcript: {str(e)}")
            # If no manual English transcript, try any available transcript
            try:
                available_transcripts = list(transcript_list)
                if available_transcripts:
                    transcript = available_transcripts[0]
                    print(f"Debug - Using transcript in language: {transcript.language_code}")
                    transcript_data = transcript.fetch()
                    print(f"Debug - Auto transcript format: {type(transcript_data)}")
                    if transcript_data and len(transcript_data) > 0:
                        print(f"Debug - First item type: {type(transcript_data[0])}")
                        print(f"Debug - First item sample: {transcript_data[0]}")
                    return {
                        "text": transcript_data, 
                        "language": transcript.language_code, 
                        "auto_generated": transcript.is_generated
                    }
                else:
                    return {"error": "No transcripts available for this video"}
            except Exception as e:
                return {"error": f"Error getting transcript: {str(e)}"}
    except Exception as e:
        return {"error": f"Error getting transcript list: {str(e)}"}

# ----------------------------------------------------------------------------------
# YouTube Video Processor
# ----------------------------------------------------------------------------------

class clsYouTubeVideoProcessor:
    """Process YouTube videos using the agent system"""
    
    def __init__(self, documentation_agent, translation_agent, research_agent):
        self.documentation_agent = documentation_agent
        self.translation_agent = translation_agent
        self.research_agent = research_agent
    
    def process_youtube_video(self, youtube_url):
        """Process a YouTube video"""
        print(f"Processing YouTube video: {youtube_url}")
        
        # Extract transcript
        transcript_result = get_youtube_transcript(youtube_url)
        
        if "error" in transcript_result:
            return {"error": transcript_result["error"]}
        
        # Start a new conversation
        conversation_id = self.documentation_agent.start_processing()
        
        # Process transcript segments
        transcript_data = transcript_result["text"]
        transcript_language = transcript_result["language"]
        
        print(f"Debug - Type of transcript_data: {type(transcript_data)}")
        
        # For each segment, detect language and translate if needed
        processed_segments = []
        
        try:
            # Make sure transcript_data is a list of dictionaries with text and start fields
            if isinstance(transcript_data, list):
                for idx, segment in enumerate(transcript_data):
                    print(f"Debug - Processing segment {idx}, type: {type(segment)}")
                    
                    # Extract text properly based on the type
                    if isinstance(segment, dict) and "text" in segment:
                        text = segment["text"]
                        start = segment.get("start", 0)
                    else:
                        # Try to access attributes for non-dict types
                        try:
                            text = segment.text
                            start = getattr(segment, "start", 0)
                        except AttributeError:
                            # If all else fails, convert to string
                            text = str(segment)
                            start = idx * 5  # Arbitrary timestamp
                    
                    print(f"Debug - Extracted text: {text[:30]}...")
                    
                    # Create a standardized segment
                    std_segment = {
                        "text": text,
                        "start": start
                    }
                    
                    # Process through translation agent
                    translation_result = self.translation_agent.process_text(text, conversation_id)
                    
                    # Update segment with translation information
                    segment_with_translation = {
                        **std_segment,
                        "translation_info": translation_result
                    }
                    
                    # Use translated text for documentation
                    if "final_text" in translation_result and translation_result["final_text"] != text:
                        std_segment["processed_text"] = translation_result["final_text"]
                    else:
                        std_segment["processed_text"] = text
                    
                    processed_segments.append(segment_with_translation)
            else:
                # If transcript_data is not a list, treat it as a single text block
                print(f"Debug - Transcript is not a list, treating as single text")
                text = str(transcript_data)
                std_segment = {
                    "text": text,
                    "start": 0
                }
                
                translation_result = self.translation_agent.process_text(text, conversation_id)
                segment_with_translation = {
                    **std_segment,
                    "translation_info": translation_result
                }
                
                if "final_text" in translation_result and translation_result["final_text"] != text:
                    std_segment["processed_text"] = translation_result["final_text"]
                else:
                    std_segment["processed_text"] = text
                
                processed_segments.append(segment_with_translation)
                
        except Exception as e:
            print(f"Debug - Error processing transcript: {str(e)}")
            return {"error": f"Error processing transcript: {str(e)}"}
        
        # Process the transcript with the documentation agent
        documentation_result = self.documentation_agent.process_transcript(
            processed_segments,
            conversation_id
        )
        
        return {
            "youtube_url": youtube_url,
            "transcript_language": transcript_language,
            "processed_segments": processed_segments,
            "documentation": documentation_result,
            "conversation_id": conversation_id
        }

Let us understand this step-by-step:

Part 1: Getting the YouTube Transcript

def extract_youtube_id(youtube_url):
    ...

This extracts the unique video ID from any YouTube link. 

def get_youtube_transcript(youtube_url):
    ...
  • This gets the actual spoken content of the video.
  • It tries to get a manual transcript first (created by humans).
  • If not available, it falls back to an auto-generated version (created by YouTube’s AI).
  • If nothing is found, it gives back an error message like: “Transcript not available.”

Part 2: Processing the Video with Agents

class clsYouTubeVideoProcessor:
    ...

This is like the control center that tells each intelligent agent what to do with the transcript. Here are the detailed steps:

1. Start the Process

def process_youtube_video(self, youtube_url):
    ...
  • The system starts with a YouTube video link.
  • It prints a message like: “Processing YouTube video: [link]”

2. Extract the Transcript

  • The system runs the get_youtube_transcript() function.
  • If it fails, it returns an error (e.g., invalid link or no subtitles available).

3. Start a “Conversation”

  • The documentation agent begins a new session, tracked by a unique conversation ID.
  • Think of this like opening a new folder in a shared team workspace to store everything related to this video.

4. Go Through Each Segment of the Transcript

  • The spoken text is often broken into small parts (segments), like subtitles.
  • For each part:
    • It checks the text.
    • It finds out the time that part was spoken.
    • It sends it to the translation agent to clean up or translate the text.

5. Translate (if needed)

  • If the translation agent finds a better or translated version, it replaces the original.
  • Otherwise, it keeps the original.

6. Prepare for Documentation

  • After translation, the segment is passed to the documentation agent.
  • This agent might:
    • Summarize the content,
    • Highlight important terms,
    • Structure it into a readable format.

7. Return the Final Result

The system gives back a structured package with:

  • The video link
  • The original language
  • The transcript in parts (processed and translated)
  • A documentation summary
  • The conversation ID (for tracking or further updates)

class clsDocumentationAgent:
    """Documentation Agent built with LangChain"""
    
    def __init__(self, agent_id: str, broker: clsMCPBroker):
        self.agent_id = agent_id
        self.broker = broker
        self.broker.register_agent(agent_id)
        
        # Initialize LangChain components
        self.llm = ChatOpenAI(
            model="gpt-4-0125-preview",
            temperature=0.1,
            api_key=OPENAI_API_KEY
        )
        
        # Create tools
        self.tools = [
            clsSendMessageTool(sender_id=self.agent_id, broker=self.broker)
        ]
        
        # Set up LLM with tools
        self.llm_with_tools = self.llm.bind(
            tools=[tool.tool_config for tool in self.tools]
        )
        
        # Setup memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        
        # Create prompt
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a Documentation Agent for YouTube video transcripts. Your responsibilities include:
                1. Process YouTube video transcripts
                2. Identify key points, topics, and main ideas
                3. Organize content into a coherent and structured format
                4. Create concise summaries
                5. Request research information when necessary
                
                When you need additional context or research, send a request to the Research Agent.
                Always maintain a professional tone and ensure your documentation is clear and organized.
            """),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        # Create agent
        self.agent = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: self.memory.load_memory_variables({})["chat_history"],
                "agent_scratchpad": lambda x: format_to_openai_tool_messages(x["intermediate_steps"]),
            }
            | self.prompt
            | self.llm_with_tools
            | OpenAIToolsAgentOutputParser()
        )
        
        # Create agent executor
        self.agent_executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            verbose=True,
            memory=self.memory
        )
        
        # Video data
        self.current_conversation_id = None
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
    def start_processing(self) -> str:
        """Start processing a new video"""
        self.current_conversation_id = str(uuid.uuid4())
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
        return self.current_conversation_id
    
    def process_transcript(self, transcript_segments, conversation_id=None):
        """Process a YouTube transcript"""
        if not conversation_id:
            conversation_id = self.start_processing()
        self.current_conversation_id = conversation_id
        
        # Store transcript segments
        self.transcript_segments = transcript_segments
        
        # Process segments
        processed_segments = []
        for segment in transcript_segments:
            processed_result = self.process_segment(segment)
            processed_segments.append(processed_result)
        
        # Generate summary
        summary = self.generate_summary()
        
        return {
            "processed_segments": processed_segments,
            "summary": summary,
            "conversation_id": conversation_id
        }
    
    def process_segment(self, segment):
        """Process individual transcript segment"""
        text = segment.get("text", "")
        start = segment.get("start", 0)
        
        # Use LangChain agent to process the segment
        result = self.agent_executor.invoke({
            "input": f"Process this video transcript segment at timestamp {start}s: {text}. If research is needed, send a request to the research_agent."
        })
        
        # Update video notes
        timestamp = start
        self.video_notes[timestamp] = {
            "text": text,
            "analysis": result["output"]
        }
        
        return {
            "timestamp": timestamp,
            "text": text,
            "analysis": result["output"]
        }
    
    def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
        """Handle an incoming MCP message"""
        if message.message_type == "research_response":
            # Process research information received from Research Agent
            research_info = message.content.get("text", "")
            
            result = self.agent_executor.invoke({
                "input": f"Incorporate this research information into video analysis: {research_info}"
            })
            
            # Send acknowledgment back to Research Agent
            response = clsMCPMessage(
                sender=self.agent_id,
                receiver=message.sender,
                message_type="acknowledgment",
                content={"text": "Research information incorporated into video analysis."},
                reply_to=message.id,
                conversation_id=message.conversation_id
            )
            
            self.broker.publish(response)
            return response
        
        elif message.message_type == "translation_response":
            # Process translation response from Translation Agent
            translation_result = message.content
            
            # Process the translated text
            if "final_text" in translation_result:
                text = translation_result["final_text"]
                original_text = translation_result.get("original_text", "")
                language_info = translation_result.get("language", {})
                
                result = self.agent_executor.invoke({
                    "input": f"Process this translated text: {text}\nOriginal language: {language_info.get('language', 'unknown')}\nOriginal text: {original_text}"
                })
                
                # Update notes with translation information
                for timestamp, note in self.video_notes.items():
                    if note["text"] == original_text:
                        note["translated_text"] = text
                        note["language"] = language_info
                        break
            
            return None
        
        return None
    
    def run(self):
        """Run the agent to listen for MCP messages"""
        print(f"Documentation Agent {self.agent_id} is running...")
        while True:
            message = self.broker.get_message(self.agent_id, timeout=1)
            if message:
                self.handle_mcp_message(message)
            time.sleep(0.1)
    
    def generate_summary(self) -> str:
        """Generate a summary of the video"""
        if not self.video_notes:
            return "No video data available to summarize."
        
        all_notes = "\n".join([f"{ts}: {note['text']}" for ts, note in self.video_notes.items()])
        
        result = self.agent_executor.invoke({
            "input": f"Generate a concise summary of this YouTube video, including key points and topics:\n{all_notes}"
        })
        
        return result["output"]

Let us understand the key methods in a step-by-step manner:

The Documentation Agent is like a smart assistant that watches a YouTube video, takes notes, pulls out important ideas, and creates a summary — almost like a professional note-taker trained to help educators, researchers, and content creators. It works with a team of other assistants, like a Translator Agent and a Research Agent, and they all talk to each other through a messaging system.

1. Starting to Work on a New Video

    def start_processing(self) -> str
    

    When a new video is being processed:

    • A new project ID is created.
    • Old notes and transcripts are cleared to start fresh.

    2. Processing the Whole Transcript

    def process_transcript(...)
    

    This is where the assistant:

    • Takes in the full transcript (what was said in the video).
    • Breaks it into small parts (like subtitles).
    • Sends each part to the smart brain for analysis.
    • Collects the results.
    • Finally, a summary of all the main ideas is created.

    3. Processing One Transcript Segment at a Time

    def process_segment(self, segment)
    

    For each chunk of the video:

    • The assistant reads the text and timestamp.
    • It asks GPT-4 to analyze it and suggest important insights.
    • It saves that insight along with the original text and timestamp.

    4. Handling Incoming Messages from Other Agents

    def handle_mcp_message(self, message)
    

    The assistant can also receive messages from teammates (other agents):

    If the message is from the Research Agent:

    • It reads new information and adds it to its notes.
    • It replies with a thank-you message to say it got the research.

    If the message is from the Translation Agent:

    • It takes the translated version of a transcript.
    • Updates its notes to reflect the translated text and its language.

    This is like a team of assistants emailing back and forth to make sure the notes are complete and accurate.

    5. Summarizing the Whole Video

    def generate_summary(self)
    

    After going through all the transcript parts, the agent asks GPT-4 to create a short, clean summary — identifying:

    • Main ideas
    • Key talking points
    • Structure of the content

    The final result is clear, professional, and usable in learning materials or documentation.


    class clsResearchAgent:
        """Research Agent built with AutoGen"""
        
        def __init__(self, agent_id: str, broker: clsMCPBroker):
            self.agent_id = agent_id
            self.broker = broker
            self.broker.register_agent(agent_id)
            
            # Configure AutoGen directly with API key
            if not OPENAI_API_KEY:
                print("Warning: OPENAI_API_KEY not set for ResearchAgent")
                
            # Create config list directly instead of loading from file
            config_list = [
                {
                    "model": "gpt-4-0125-preview",
                    "api_key": OPENAI_API_KEY
                }
            ]
            # Create AutoGen assistant for research
            self.assistant = AssistantAgent(
                name="research_assistant",
                system_message="""You are a Research Agent for YouTube videos. Your responsibilities include:
                    1. Research topics mentioned in the video
                    2. Find relevant information, facts, references, or context
                    3. Provide concise, accurate information to support the documentation
                    4. Focus on delivering high-quality, relevant information
                    
                    Respond directly to research requests with clear, factual information.
                """,
                llm_config={"config_list": config_list, "temperature": 0.1}
            )
            
            # Create user proxy to handle message passing
            self.user_proxy = UserProxyAgent(
                name="research_manager",
                human_input_mode="NEVER",
                code_execution_config={"work_dir": "coding", "use_docker": False},
                default_auto_reply="Working on the research request..."
            )
            
            # Current conversation tracking
            self.current_requests = {}
        
        def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
            """Handle an incoming MCP message"""
            if message.message_type == "request":
                # Process research request from Documentation Agent
                request_text = message.content.get("text", "")
                
                # Use AutoGen to process the research request
                def research_task():
                    self.user_proxy.initiate_chat(
                        self.assistant,
                        message=f"Research request for YouTube video content: {request_text}. Provide concise, factual information."
                    )
                    # Return last assistant message
                    return self.assistant.chat_messages[self.user_proxy.name][-1]["content"]
                
                # Execute research task
                research_result = research_task()
                
                # Send research results back to Documentation Agent
                response = clsMCPMessage(
                    sender=self.agent_id,
                    receiver=message.sender,
                    message_type="research_response",
                    content={"text": research_result},
                    reply_to=message.id,
                    conversation_id=message.conversation_id
                )
                
                self.broker.publish(response)
                return response
            
            return None
        
        def run(self):
            """Run the agent to listen for MCP messages"""
            print(f"Research Agent {self.agent_id} is running...")
            while True:
                message = self.broker.get_message(self.agent_id, timeout=1)
                if message:
                    self.handle_mcp_message(message)
                time.sleep(0.1)
    

    Let us understand the key methods in detail.

    1. Receiving and Responding to Research Requests

      def handle_mcp_message(self, message)
      

      When the Research Agent gets a message (like a question or request for info), it:

      1. Reads the message to see what needs to be researched.
      2. Asks GPT-4 to find helpful, accurate info about that topic.
      3. Sends the answer back to whoever asked the question (usually the Documentation Agent).

      class clsTranslationAgent:
          """Agent for language detection and translation"""
          
          def __init__(self, agent_id: str, broker: clsMCPBroker):
              self.agent_id = agent_id
              self.broker = broker
              self.broker.register_agent(agent_id)
              
              # Initialize language detector
              self.language_detector = clsLanguageDetector()
              
              # Initialize translation service
              self.translation_service = clsTranslationService()
          
          def process_text(self, text, conversation_id=None):
              """Process text: detect language and translate if needed, handling mixed language content"""
              if not conversation_id:
                  conversation_id = str(uuid.uuid4())
              
              # Detect language with support for mixed language content
              language_info = self.language_detector.detect(text)
              
              # Decide if translation is needed
              needs_translation = True
              
              # Pure English content doesn't need translation
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  needs_translation = False
              
              # For mixed language, check if it's primarily English
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  english_langs = [
                      lang for lang in language_info.get("languages", []) 
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-")
                  ]
                  
                  # If the highest confidence language is English and > 60% confident, don't translate
                  if english_langs and english_langs[0].get("confidence", 0) > 0.6:
                      needs_translation = False
              
              if needs_translation:
                  # Translate using the appropriate service based on language detection
                  translation_result = self.translation_service.translate(text, language_info)
                  
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": translation_result,
                      "final_text": translation_result.get("translated_text", text),
                      "conversation_id": conversation_id
                  }
              else:
                  # Already English or unknown language, return as is
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": {"provider": "none"},
                      "final_text": text,
                      "conversation_id": conversation_id
                  }
          
          def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
              """Handle an incoming MCP message"""
              if message.message_type == "translation_request":
                  # Process translation request from Documentation Agent
                  text = message.content.get("text", "")
                  
                  # Process the text
                  result = self.process_text(text, message.conversation_id)
                  
                  # Send translation results back to requester
                  response = clsMCPMessage(
                      sender=self.agent_id,
                      receiver=message.sender,
                      message_type="translation_response",
                      content=result,
                      reply_to=message.id,
                      conversation_id=message.conversation_id
                  )
                  
                  self.broker.publish(response)
                  return response
              
              return None
          
          def run(self):
              """Run the agent to listen for MCP messages"""
              print(f"Translation Agent {self.agent_id} is running...")
              while True:
                  message = self.broker.get_message(self.agent_id, timeout=1)
                  if message:
                      self.handle_mcp_message(message)
                  time.sleep(0.1)

      Let us understand the key methods in step-by-step manner:

      1. Understanding and Translating Text:

      def process_text(...)
      

      This is the core job of the agent. Here’s what it does with any piece of text:

      Step 1: Detect the Language

      • It tries to figure out the language of the input text.
      • It can handle cases where more than one language is mixed together, which is common in casual speech or subtitles.

      Step 2: Decide Whether to Translate

      • If the text is clearly in English, or it’s unclear what the language is, it decides not to translate.
      • If the text is mostly in another language or has less than 60% confidence in being English, it will translate it into English.

      Step 3: Translate (if needed)

      • If translation is required, it uses the translation service to do the job.
      • Then it packages all the information: the original text, detected language, the translated version, and a unique conversation ID.

      Step 4: Return the Results

      • If no translation is needed, it returns the original text and a note saying “no translation was applied.”

      2. Receiving Messages and Responding

      def handle_mcp_message(...)
      

      The agent listens for messages from other agents. When someone asks it to translate something:

      • It takes the text from the message.
      • Runs it through the process_text function (as explained above).
      • Sends the translated (or original) result to the person who asked.
      class clsTranslationService:
          """Translation service using multiple providers with support for mixed languages"""
          
          def __init__(self):
              # Initialize Sarvam AI client
              self.sarvam_api_key = SARVAM_API_KEY
              self.sarvam_url = "https://api.sarvam.ai/translate"
              
              # Initialize Google Cloud Translation client using simple HTTP requests
              self.google_api_key = GOOGLE_API_KEY
              self.google_translate_url = "https://translation.googleapis.com/language/translate/v2"
          
          def translate_with_sarvam(self, text, source_lang, target_lang="en-IN"):
              """Translate text using Sarvam AI (for Indian languages)"""
              if not self.sarvam_api_key:
                  return {"error": "Sarvam API key not set"}
              
              headers = {
                  "Content-Type": "application/json",
                  "api-subscription-key": self.sarvam_api_key
              }
              
              payload = {
                  "input": text,
                  "source_language_code": source_lang,
                  "target_language_code": target_lang,
                  "speaker_gender": "Female",
                  "mode": "formal",
                  "model": "mayura:v1"
              }
              
              try:
                  response = requests.post(self.sarvam_url, headers=headers, json=payload)
                  if response.status_code == 200:
                      return {"translated_text": response.json().get("translated_text", ""), "provider": "sarvam"}
                  else:
                      return {"error": f"Sarvam API error: {response.text}", "provider": "sarvam"}
              except Exception as e:
                  return {"error": f"Error calling Sarvam API: {str(e)}", "provider": "sarvam"}
          
          def translate_with_google(self, text, target_lang="en"):
              """Translate text using Google Cloud Translation API with direct HTTP request"""
              if not self.google_api_key:
                  return {"error": "Google API key not set"}
              
              try:
                  # Using the translation API v2 with API key
                  params = {
                      "key": self.google_api_key,
                      "q": text,
                      "target": target_lang
                  }
                  
                  response = requests.post(self.google_translate_url, params=params)
                  if response.status_code == 200:
                      data = response.json()
                      translation = data.get("data", {}).get("translations", [{}])[0]
                      return {
                          "translated_text": translation.get("translatedText", ""),
                          "detected_source_language": translation.get("detectedSourceLanguage", ""),
                          "provider": "google"
                      }
                  else:
                      return {"error": f"Google API error: {response.text}", "provider": "google"}
              except Exception as e:
                  return {"error": f"Error calling Google Translation API: {str(e)}", "provider": "google"}
          
          def translate(self, text, language_info):
              """Translate text to English based on language detection info"""
              # If already English or unknown language, return as is
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  return {"translated_text": text, "provider": "none"}
              
              # Handle mixed language content
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  # Strategy for mixed language: 
                  # 1. If one of the languages is English, don't translate the entire text, as it might distort English portions
                  # 2. If no English but contains Indian languages, use Sarvam as it handles code-mixing better
                  # 3. Otherwise, use Google Translate for the primary detected language
                  
                  has_english = False
                  has_indian = False
                  
                  for lang in language_info.get("languages", []):
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-"):
                          has_english = True
                      if lang.get("is_indian", False):
                          has_indian = True
                  
                  if has_english:
                      # Contains English - use Google for full text as it handles code-mixing well
                      return self.translate_with_google(text)
                  elif has_indian:
                      # Contains Indian languages - use Sarvam
                      # Use the highest confidence Indian language as source
                      indian_langs = [lang for lang in language_info.get("languages", []) if lang.get("is_indian", False)]
                      if indian_langs:
                          # Sort by confidence
                          indian_langs.sort(key=lambda x: x.get("confidence", 0), reverse=True)
                          source_lang = indian_langs[0]["language_code"]
                          return self.translate_with_sarvam(text, source_lang)
                      else:
                          # Fallback to primary language
                          if language_info["is_indian"]:
                              return self.translate_with_sarvam(text, language_info["language_code"])
                          else:
                              return self.translate_with_google(text)
                  else:
                      # No English, no Indian languages - use Google for primary language
                      return self.translate_with_google(text)
              else:
                  # Not mixed language - use standard approach
                  if language_info["is_indian"]:
                      # Use Sarvam AI for Indian languages
                      return self.translate_with_sarvam(text, language_info["language_code"])
                  else:
                      # Use Google for other languages
                      return self.translate_with_google(text)

      This Translation Service is like a smart translator that knows how to:

      • Detect what language the text is written in,
      • Choose the best translation provider depending on the language (especially for Indian languages),
      • And then translate the text into English.

      It supports mixed-language content (such as Hindi-English in one sentence) and uses either Google Translate or Sarvam AI, a translation service designed for Indian languages.

      Now, let us understand the key methods in a step-by-step manner:

      1. Translating Using Google Translate

      def translate_with_google(...)
      

      This function uses Google Translate:

      • It sends the text, asks for English as the target language, and gets a translation back.
      • It also detects the source language automatically.
      • If successful, it returns the translated text and the detected original language.
      • If there’s an error, it returns a message saying what went wrong.

      Best For: Non-Indian languages (like Spanish, French, Chinese) and content that is not mixed with English.

      2. Main Translation Logic

      def translate(self, text, language_info)
      

      This is the decision-maker. Here’s how it works:

      Case 1: No Translation Needed

      If the text is already in English or the language is unknown, it simply returns the original text.

      Case 2: Mixed Language (e.g., Hindi + English)

      If the text contains more than one language:

      • ✅ If one part is English → use Google Translate (it’s good with mixed languages).
      • ✅ If it includes Indian languages only → use Sarvam AI (better at handling Indian content).
      • ✅ If it’s neither English nor Indian → use Google Translate.

      The service checks how confident it is about each language in the mix and chooses the most likely one to translate from.

      Case 3: Single Language

      If the text is only in one language:

      • ✅ If it’s an Indian language (like Bengali, Tamil, or Marathi), use Sarvam AI.
      • ✅ If it’s any other language, use Google Translate.

      So, we’ve done it.

      I’ve included the complete working solutions for you in the GitHub Link.

      We’ll cover the detailed performance testing, Optimized configurations & many other useful details in our next post.

      Till then, Happy Avenging! 🙂

      Building & deploying a RAG architecture rapidly using Langflow & Python

      I’ve been looking for a solution that can help deploy any RAG solution involving Python faster. It would be more effective if an available UI helped deliver the solution faster. And, here comes the solution that does exactly what I needed – “LangFlow.”

      Before delving into the details, I strongly recommend taking a look at the demo. It’s a great way to get a comprehensive understanding of LangFlow and its capabilities in deploying RAG architecture rapidly.

      Demo

      This describes the entire architecture; hence, I’ll share the architecture components I used to build the solution.

      To know more about RAG-Architecture, please refer to the following link.

      As we all know, we can parse the data from the source website URL (in this case, I’m referring to my photography website to extract the text of one of my blogs) and then embed it into the newly created Astra DB & new collection, where I will be storing the vector embeddings.

      As you can see from the above diagram, the flow that I configured within 5 minutes and the full functionality of writing a complete solution (underlying Python application) within no time that extracts chunks, converts them into embeddings, and finally stores them inside the Astra DB.

      Now, let us understand the next phase, where, based on the ask from a chatbot, I need to convert that question into Vector DB & then find the similarity search to bring the relevant vectors as shown below –

      You need to configure this entire flow by dragging the necessary widgets from the left-side panel as marked in the Blue-Box shown below –

      For this specific use case, we’ve created an instance of Astra DB & then created an empty vector collection. Also, we need to ensure that we generate the API-Key & and provide the right roles assigned with the token. After successfully creating the token, you need to copy the endpoint, token & collection details & paste them into the desired fields of the Astra-DB components inside the LangFlow. Think of it as a framework where one needs to provide all the necessary information to build & run the entire flow successfully.

      Following are some of the important snapshots from the Astra-DB –

      Step – 1

      Step – 2

      Once you run the vector DB population, this will insert extracted text & then convert it into vectors, which will show in the following screenshot –

      You can see the sample vectors along with the text chunks inside the Astra DB data explorer as shown below –

      Some of the critical components are highlighted in the Blue-box which is important for us to monitor the vector embeddings.

      Now, here is how you can modify the current Python code of any available widgets or build your own widget by using the custom widget.

      The first step is to click the code button highlighted in the Red-box as shown below –

      The next step is when you click that button, which will open the detailed Python code representing the entire widget build & its functionality. This button is the place where you can add, modify, or keep it as it is depending upon your need, which will shown below –

      Once one builds the entire solution, you must click the final compile button (shown in the red box), which will eventually compile all the individual widgets. However, you can build the compile button for the individual widgets as soon as you make the solution. So you can pinpoint any potential problems at that very step.

      Let us understand one sample code of a widget. In this case, we will take vector embedding insertion into the Astra DB. Let us see the code –

      from typing import List, Optional, Union
      from langchain_astradb import AstraDBVectorStore
      from langchain_astradb.utils.astradb import SetupMode
      
      from langflow.custom import CustomComponent
      from langflow.field_typing import Embeddings, VectorStore
      from langflow.schema import Record
      from langchain_core.retrievers import BaseRetriever
      
      
      class AstraDBVectorStoreComponent(CustomComponent):
          display_name = "Astra DB"
          description = "Builds or loads an Astra DB Vector Store."
          icon = "AstraDB"
          field_order = ["token", "api_endpoint", "collection_name", "inputs", "embedding"]
      
          def build_config(self):
              return {
                  "inputs": {
                      "display_name": "Inputs",
                      "info": "Optional list of records to be processed and stored in the vector store.",
                  },
                  "embedding": {"display_name": "Embedding", "info": "Embedding to use"},
                  "collection_name": {
                      "display_name": "Collection Name",
                      "info": "The name of the collection within Astra DB where the vectors will be stored.",
                  },
                  "token": {
                      "display_name": "Token",
                      "info": "Authentication token for accessing Astra DB.",
                      "password": True,
                  },
                  "api_endpoint": {
                      "display_name": "API Endpoint",
                      "info": "API endpoint URL for the Astra DB service.",
                  },
                  "namespace": {
                      "display_name": "Namespace",
                      "info": "Optional namespace within Astra DB to use for the collection.",
                      "advanced": True,
                  },
                  "metric": {
                      "display_name": "Metric",
                      "info": "Optional distance metric for vector comparisons in the vector store.",
                      "advanced": True,
                  },
                  "batch_size": {
                      "display_name": "Batch Size",
                      "info": "Optional number of records to process in a single batch.",
                      "advanced": True,
                  },
                  "bulk_insert_batch_concurrency": {
                      "display_name": "Bulk Insert Batch Concurrency",
                      "info": "Optional concurrency level for bulk insert operations.",
                      "advanced": True,
                  },
                  "bulk_insert_overwrite_concurrency": {
                      "display_name": "Bulk Insert Overwrite Concurrency",
                      "info": "Optional concurrency level for bulk insert operations that overwrite existing records.",
                      "advanced": True,
                  },
                  "bulk_delete_concurrency": {
                      "display_name": "Bulk Delete Concurrency",
                      "info": "Optional concurrency level for bulk delete operations.",
                      "advanced": True,
                  },
                  "setup_mode": {
                      "display_name": "Setup Mode",
                      "info": "Configuration mode for setting up the vector store, with options likeSync,Async, orOff”.",
                      "options": ["Sync", "Async", "Off"],
                      "advanced": True,
                  },
                  "pre_delete_collection": {
                      "display_name": "Pre Delete Collection",
                      "info": "Boolean flag to determine whether to delete the collection before creating a new one.",
                      "advanced": True,
                  },
                  "metadata_indexing_include": {
                      "display_name": "Metadata Indexing Include",
                      "info": "Optional list of metadata fields to include in the indexing.",
                      "advanced": True,
                  },
                  "metadata_indexing_exclude": {
                      "display_name": "Metadata Indexing Exclude",
                      "info": "Optional list of metadata fields to exclude from the indexing.",
                      "advanced": True,
                  },
                  "collection_indexing_policy": {
                      "display_name": "Collection Indexing Policy",
                      "info": "Optional dictionary defining the indexing policy for the collection.",
                      "advanced": True,
                  },
              }
      
          def build(
              self,
              embedding: Embeddings,
              token: str,
              api_endpoint: str,
              collection_name: str,
              inputs: Optional[List[Record]] = None,
              namespace: Optional[str] = None,
              metric: Optional[str] = None,
              batch_size: Optional[int] = None,
              bulk_insert_batch_concurrency: Optional[int] = None,
              bulk_insert_overwrite_concurrency: Optional[int] = None,
              bulk_delete_concurrency: Optional[int] = None,
              setup_mode: str = "Sync",
              pre_delete_collection: bool = False,
              metadata_indexing_include: Optional[List[str]] = None,
              metadata_indexing_exclude: Optional[List[str]] = None,
              collection_indexing_policy: Optional[dict] = None,
          ) -> Union[VectorStore, BaseRetriever]:
              try:
                  setup_mode_value = SetupMode[setup_mode.upper()]
              except KeyError:
                  raise ValueError(f"Invalid setup mode: {setup_mode}")
              if inputs:
                  documents = [_input.to_lc_document() for _input in inputs]
      
                  vector_store = AstraDBVectorStore.from_documents(
                      documents=documents,
                      embedding=embedding,
                      collection_name=collection_name,
                      token=token,
                      api_endpoint=api_endpoint,
                      namespace=namespace,
                      metric=metric,
                      batch_size=batch_size,
                      bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                      bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                      bulk_delete_concurrency=bulk_delete_concurrency,
                      setup_mode=setup_mode_value,
                      pre_delete_collection=pre_delete_collection,
                      metadata_indexing_include=metadata_indexing_include,
                      metadata_indexing_exclude=metadata_indexing_exclude,
                      collection_indexing_policy=collection_indexing_policy,
                  )
              else:
                  vector_store = AstraDBVectorStore(
                      embedding=embedding,
                      collection_name=collection_name,
                      token=token,
                      api_endpoint=api_endpoint,
                      namespace=namespace,
                      metric=metric,
                      batch_size=batch_size,
                      bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                      bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                      bulk_delete_concurrency=bulk_delete_concurrency,
                      setup_mode=setup_mode_value,
                      pre_delete_collection=pre_delete_collection,
                      metadata_indexing_include=metadata_indexing_include,
                      metadata_indexing_exclude=metadata_indexing_exclude,
                      collection_indexing_policy=collection_indexing_policy,
                  )
      
              return vector_store
      

      Method: build_config:

      • This method defines the configuration options for the component.
      • Each configuration option includes a display_name and info, which provides details about the option.
      • Some options are marked as advanced, indicating they are optional and more complex.

      Method: build:

      • This method is used to create an instance of the Astra DB Vector Store.
      • It takes several parameters, including embedding, token, api_endpoint, collection_name, and various optional parameters.
      • It converts the setup_mode string to an enum value.
      • If inputs are provided, they are converted to a format suitable for storing in the vector store.
      • Depending on whether inputs are provided, a new vector store from documents can be created, or an empty vector store can be initialized with the given configurations.
      • Finally, it returns the created vector store instance.

      And, here is the the screenshot of your run –

      And, this is the last steps to run the Integrated Chatbot as shown below –

      As one can see the left side highlighted shows the reference text & chunks & the right side actual response.


      So, we’ve done it. And, you know the fun fact. I did this entire workflow within 35 minutes alone. 😛

      I’ll bring some more exciting topics in the coming days from the Python verse.

      To learn more about LangFlow, please click here.

      To learn about Astra DB, you need to click the following link.

      To learn about my blog & photography, you can click the following url.

      Till then, Happy Avenging!  🙂

      Enabling & Exploring Stable Defussion – Part 1

      This new solution will evaluate the power of Stable Defussion, which is created solutions as we progress & refine our prompt from scratch by using Stable Defussion & Python. This post opens new opportunities for IT companies & business start-ups looking to deliver solutions & have better performance compared to the paid version of Stable Defussion AI’s API performance. This project is for the advanced Python, Stable Defussion for data Science Newbies & AI evangelists.

      In a series of posts, I’ll explain and focus on the Stable Defussion API and custom solution using the Python-based SDK of Stable Defussion.

      But, before that, let us view the video that it generates from the prompt by using the third-party API:

      Prompt to Video

      And, let us understand the prompt that we supplied to create the above video –

      Isn’t it exciting?

      However, I want to stress this point: the video generated by the Stable Defusion (Stability AI) API was able to partially apply the animation effect. Even though the animation applies to the cloud, It doesn’t apply the animation to the wave. But, I must admit, the quality of the video is quite good.


      Let us understand the code and how we run the solution, and then we can try to understand its performance along with the other solutions later in the subsequent series.

      As you know, we’re exploring the code base of the third-party API, which will actually execute a series of API calls that create a video out of the prompt.

      Let us understand some of the important snippet –

      class clsStabilityAIAPI:
          def __init__(self, STABLE_DIFF_API_KEY, OUT_DIR_PATH, FILE_NM, VID_FILE_NM):
              self.STABLE_DIFF_API_KEY = STABLE_DIFF_API_KEY
              self.OUT_DIR_PATH = OUT_DIR_PATH
              self.FILE_NM = FILE_NM
              self.VID_FILE_NM = VID_FILE_NM
      
          def delFile(self, fileName):
              try:
                  # Deleting the intermediate image
                  os.remove(fileName)
      
                  return 0 
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return 1
      
          def generateText2Image(self, inputDescription):
              try:
                  STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
                  fullFileName = self.OUT_DIR_PATH + self.FILE_NM
                  
                  if STABLE_DIFF_API_KEY is None:
                      raise Exception("Missing Stability API key.")
                  
                  response = requests.post(f"{api_host}/v1/generation/{engine_id}/text-to-image",
                                          headers={
                                              "Content-Type": "application/json",
                                              "Accept": "application/json",
                                              "Authorization": f"Bearer {STABLE_DIFF_API_KEY}"
                                              },
                                              json={
                                                  "text_prompts": [{"text": inputDescription}],
                                                  "cfg_scale": 7,
                                                  "height": 1024,
                                                  "width": 576,
                                                  "samples": 1,
                                                  "steps": 30,
                                                  },)
                  
                  if response.status_code != 200:
                      raise Exception("Non-200 response: " + str(response.text))
                  
                  data = response.json()
      
                  for i, image in enumerate(data["artifacts"]):
                      with open(fullFileName, "wb") as f:
                          f.write(base64.b64decode(image["base64"]))      
                  
                  return fullFileName
      
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return 'N/A'
      
          def image2VideoPassOne(self, imgNameWithPath):
              try:
                  STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
      
                  response = requests.post(f"https://api.stability.ai/v2beta/image-to-video",
                                          headers={"authorization": f"Bearer {STABLE_DIFF_API_KEY}"},
                                          files={"image": open(imgNameWithPath, "rb")},
                                          data={"seed": 0,"cfg_scale": 1.8,"motion_bucket_id": 127},
                                          )
                  
                  print('First Pass Response:')
                  print(str(response.text))
                  
                  genID = response.json().get('id')
      
                  return genID 
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return 'N/A'
      
          def image2VideoPassTwo(self, genId):
              try:
                  generation_id = genId
                  STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
                  fullVideoFileName = self.OUT_DIR_PATH + self.VID_FILE_NM
      
                  response = requests.request("GET", f"https://api.stability.ai/v2beta/image-to-video/result/{generation_id}",
                                              headers={
                                                  'accept': "video/*",  # Use 'application/json' to receive base64 encoded JSON
                                                  'authorization': f"Bearer {STABLE_DIFF_API_KEY}"
                                                  },) 
                  
                  print('Retrieve Status Code: ', str(response.status_code))
                  
                  if response.status_code == 202:
                      print("Generation in-progress, try again in 10 seconds.")
      
                      return 5
                  elif response.status_code == 200:
                      print("Generation complete!")
                      with open(fullVideoFileName, 'wb') as file:
                          file.write(response.content)
      
                      print("Successfully Retrieved the video file!")
      
                      return 0
                  else:
                      raise Exception(str(response.json()))
                  
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return 1

      Now, let us understand the code –

      This function is called when an object of the class is created. It initializes four properties:

      • STABLE_DIFF_API_KEY: the API key for Stability AI services.
      • OUT_DIR_PATH: the folder path to save files.
      • FILE_NM: the name of the generated image file.
      • VID_FILE_NM: the name of the generated video file.

      This function deletes a file specified by fileName.

      • If successful, it returns 0.
      • If an error occurs, it logs the error and returns 1.

      This function generates an image based on a text description:

      • Sends a request to the Stability AI text-to-image endpoint using the API key.
      • Saves the resulting image to a file.
      • Returns the file’s path on success or 'N/A' if an error occurs.

      This function uploads an image to create a video in its first phase:

      • Sends the image to Stability AI’s image-to-video endpoint.
      • Logs the response and extracts the id (generation ID) for the next phase.
      • Returns the id if successful or 'N/A' on failure.

      This function retrieves the video created in the second phase using the genId:

      • Checks the video generation status from the Stability AI endpoint.
      • If complete, saves the video file and returns 0.
      • If still processing, returns 5.
      • Logs and returns 1 for any errors.

      As you can see, the code is pretty simple to understand & we’ve taken all the necessary actions in case of any unforeseen network issues or even if the video is not ready after our job submission in the following lines of the main calling script (generateText2VideoAPI.py) –

      waitTime = 10
      time.sleep(waitTime)
      
      # Failed case retry
      retries = 1
      success = False
      
      try:
          while not success:
              try:
                  z = r1.image2VideoPassTwo(gID)
              except Exception as e:
                  success = False
      
              if z == 0:
                  success = True
              else:
                  wait = retries * 2 * 15
                  str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"
      
                  print(str_R1)
      
                  time.sleep(wait)
                  retries += 1
      
              # Checking maximum retries
              if retries >= maxRetryNo:
                  success = True
                  raise  Exception
      except:
          print()

      And, let us see how the run looks like –

      Let us understand the CPU utilization –

      As you can see, CPU utilization is minimal since most tasks are at the API end.


      So, we’ve done it. 🙂

      Please find the next series on this topic below:

      Enabling & Exploring Stable Defussion – Part 2

      Enabling & Exploring Stable Defussion – Part 3

      Please let me know your feedback after reviewing all the posts! 🙂

      Demystifying Modern Data Technologies: Insights from the Global PowerBI Summit

      In an engaging session at the Global PowerBI Summit, we and our co-host delved into the evolving landscape of data technologies. Our discussion aimed to illuminate the distinctions and applications of several pivotal technologies in the data sphere, ranging from Lakehouse vs. Storage Account to the nuanced differences between Fabric Pipeline and Data Pipeline and the critical comparisons of Notebooks vs. Databricks, including their performance metrics. Furthermore, we explored the realm of model experimentation and Azure ML, shedding light on their performance benchmarks.

      • Enhanced File Previews and Transformations: The Lakehouse paradigm revolutionizes how we preview and transform files into SQL tables, offering a seamless data manipulation experience.
      • Robust Data Governance: It introduces native indexing for data lineage, PII scans, and discovery, thus laying a solid foundation for data governance.
      • Optimized Performance for Reporting: With direct lake mode, Lakehouse significantly improves performance for Power BI Reporting, catering to the needs of data analysts and business intelligence professionals.
      • Functional Restrictions: Despite its strengths, Lakehouse falls short in providing a native file download feature, demands manual refresh for new file visibility, and has limited support for file formats outside of Delta and Parquet.
      • Lakehouse distinguishes itself by being user-friendly and efficient in data uploading, albeit with slower previews. Its distinction from a Storage Account lies in these unique functionalities and user experience.
      • Ease of Data Transformation: It introduces a low-code, no-code approach with the Power Query Editor, enriching the data transformation process.
      • Advanced Monitoring Capabilities: The ability to monitor pipelines and trace lineage enhances the management and integration of fabric artifacts.
      • Artifact and Trigger Limitations: A notable drawback is the isolated nature of each pipeline artifact and the limitation to a single scheduled trigger type per pipeline.
      • Our analysis reveals that while both platforms share a user-friendly interface reminiscent of Azure’s, navigating between pipelines in Fabric requires additional steps. However, both platforms demonstrate rapid execution capabilities, with Azure slightly leading due to its unified pipeline management.
      • Comprehensive Support and Integration: Notably, Notebooks excel in providing native support for various programming and visualization packages, coupled with a direct connection to Lakehouse.
      • Collaborative Features and Efficiency: The platform encourages collaboration through real-time co-editing and optimizes resource usage by stopping clusters when not in use.
      • Cluster and Resource Management: External management of clusters and the absence of a shared folder or user notebooks present challenges in collaborative environments.
      • Our discussion highlighted that Notebooks offers a superior user interface and connectivity options despite Databricks’ having certain advantages in data processing speeds.

      Our performance analysis underscored Fabric Notebooks’ superiority in handling large datasets and running machine learning models more efficiently than Databricks, especially highlighting Lakehouse’s faster cluster initiation times and data storage efficiencies.

      • Seamless Integration and Configuration: Fabric’s integration with Lakehouse and direct pipeline connections streamline the data science workflow.
      • Graphical Interface and Focus: Fabric’s lack of a graphical interface contrasts with Azure ML’s user-friendly studio, indicating Fabric’s analytics and BI focus against Azure ML’s comprehensive experiment capabilities.
      • Our comparative performance review revealed that Fabric excels in dataset loading and model execution speeds, offering significant advantages over Azure ML.

      Our Global PowerBI Summit session aimed to demystify the complexities of modern data technologies, providing attendees with clear, actionable insights. Our collaborative presentation underscored the importance of understanding each technology’s strengths and limitations, empowering data professionals to make informed decisions in their projects. The dynamic interplay between these technologies illustrates the vibrant and evolving nature of the data landscape, promising exciting possibilities for innovation and efficiency in data management and analysis.

      These stats were taken during the early release of the product. However, there is a continuous improvement of this product. Hence, we need to revisit this after a period of some time.