The LLM Security Chronicles – Part 4

If Parts 1, 2, and 3 were the horror movie showing you all the ways things can go wrong, Part 3 is the training montage where humanity fights back. Spoiler alert: We’re not winning yet, but at least we’re no longer bringing knife emojis to a prompt injection fight.

Let’s start with some hard truths from 2025’s research –

• 90%+ of current defenses fail against adaptive attacks
• Static defenses are obsolete before deployment
• No single solution exists for prompt injection
• The attacker moves second and usually wins

But before you unplug your AI and go back to using carrier pigeons, there’s hope. The same research teaching us about vulnerabilities is also pointing toward solutions.

No single layer is perfect (hence the holes in the Swiss cheese), but multiple imperfect layers create robust defense.

import re
import torch
from transformers import AutoTokenizer, AutoModel
import numpy as np

class AdvancedInputValidator:
    def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        self.baseline_embeddings = self.load_baseline_embeddings()
        self.threat_patterns = self.compile_threat_patterns()
        
    def validateInput(self, user_input):
        """
        Multi-layer input validation
        """
        # Layer 1: Syntactic checks
        if not self.syntacticValidation(user_input):
            return False, "Failed syntactic validation"
        
        # Layer 2: Semantic analysis
        semantic_score = self.semanticAnalysis(user_input)
        if semantic_score > 0.8:  # High risk threshold
            return False, f"Semantic risk score: {semantic_score}"
        
        # Layer 3: Embedding similarity
        if self.isAdversarialEmbedding(user_input):
            return False, "Detected adversarial pattern in embedding"
        
        # Layer 4: Entropy analysis
        if self.entropyCheck(user_input) > 4.5:
            return False, "Unusual entropy detected"
        
        # Layer 5: Known attack patterns
        pattern_match = self.checkThreatPatterns(user_input)
        if pattern_match:
            return False, f"Matched threat pattern: {pattern_match}"
        
        return True, "Validation passed"
    
    def semanticAnalysis(self, text):
        """
        Analyzes semantic intent using embedding similarity
        """
        # Generate embedding for input
        inputs = self.tokenizer(text, return_tensors='pt', truncation=True)
        with torch.no_grad():
            embeddings = self.model(**inputs).last_hidden_state.mean(dim=1)
        
        # Compare against known malicious embeddings
        max_similarity = 0
        for malicious_emb in self.baseline_embeddings['malicious']:
            similarity = torch.cosine_similarity(embeddings, malicious_emb)
            max_similarity = max(max_similarity, similarity.item())
        
        return max_similarity
    
    def entropyCheck(self, text):
        """
        Calculates Shannon entropy to detect obfuscation
        """
        # Calculate character frequency
        freq = {}
        for char in text:
            freq[char] = freq.get(char, 0) + 1
        
        # Calculate entropy
        entropy = 0
        total = len(text)
        for count in freq.values():
            if count > 0:
                probability = count / total
                entropy -= probability * np.log2(probability)
        
        return entropy
    
    def compile_threat_patterns(self):
        """
        Compiles regex patterns for known threats
        """
        patterns = {
            'injection': r'(ignore|disregard|forget).{0,20}(previous|prior|above)',
            'extraction': r'(system|initial).{0,20}(prompt|instruction)',
            'jailbreak': r'(act as|pretend|roleplay).{0,20}(no limits|unrestricted)',
            'encoding': r'(base64|hex|rot13|decode)',
            'escalation': r'(debug|admin|sudo|root).{0,20}(mode|access)',
        }
        return {k: re.compile(v, re.IGNORECASE) for k, v in patterns.items()}

This code creates an advanced system that checks whether user input is safe before processing it. It uses multiple layers of validation, including basic syntax checks, meaning-based analysis with AI embeddings, similarity detection to known malicious examples, entropy measurements to spot obfuscated text, and pattern matching for common attack behaviors such as jailbreaks or prompt injections. If any layer finds a risk—high semantic similarity, unusual entropy, or a threat pattern—the input is rejected. If all checks pass, the system marks the input as safe.

class SecurePromptArchitecture:
    def __init__(self):
        self.system_prompt = self.load_immutable_system_prompt()
        self.contextWindowBudget = {
            'system': 0.3,  # 30% reserved for system
            'history': 0.2,  # 20% for conversation history
            'user': 0.4,    # 40% for user input
            'buffer': 0.1   # 10% safety buffer
        }
    
    def constructPrompt(self, user_input, conversation_history=None):
        """
        Builds secure prompt with proper isolation
        """
        # Calculate token budgets
        total_tokens = 4096  # Model's context window
        budgets = {k: int(v * total_tokens) 
                   for k, v in self.contextWindowBudget.items()}
        
        # Build prompt with clear boundaries
        prompt_parts = []
        
        # System section (immutable)
        prompt_parts.append(
            f"<|SYSTEM|>{self.systemPrompt[:budgets['system']]}<|/SYSTEM|>"
        )
        
        # History section (sanitized)
        if conversation_history:
            sanitized_history = self.sanitizeHistory(conversation_history)
            prompt_parts.append(
                f"<|HISTORY|>{sanitized_history[:budgets['history']]}<|/HISTORY|>"
            )
        
        # User section (contained)
        sanitized_input = self.sanitizeUserInput(user_input)
        prompt_parts.append(
            f"<|USER|>{sanitized_input[:budgets['user']]}<|/USER|>"
        )
        
        # Combine with clear delimiters
        final_prompt = "\n<|BOUNDARY|>\n".join(prompt_parts)
        
        return final_prompt
    
    def sanitizeUserInput(self, input_text):
        """
        Removes potentially harmful content while preserving intent
        """
        # Remove system-level commands
        sanitized = re.sub(r'<\|.*?\|>', '', input_text)
        
        # Escape special characters
        sanitized = sanitized.replace('\\', '\\\\')
        sanitized = sanitized.replace('"', '\\"')
        
        # Remove null bytes and control characters
        sanitized = ''.join(char for char in sanitized 
                          if ord(char) >= 32 or char == '\n')
        
        return sanitized

This code establishes a secure framework for creating and sending prompts to an AI model. It divides the model’s context window into fixed sections for system instructions, conversation history, user input, and a safety buffer. Each section is clearly separated with boundaries to prevent user input from altering system rules. Before adding anything, the system cleans both history and user text by removing harmful commands and unsafe characters. The final prompt ensures isolation, protects system instructions, and reduces the risk of prompt injection or manipulation.

import pickle
from sklearn.ensemble import IsolationForest
from collections import deque

class BehavioralMonitor:
    def __init__(self, window_size=100):
        self.behaviorHistory = deque(maxlen=window_size)
        self.anomalyDetector = IsolationForest(contamination=0.1)
        self.baselineBehaviors = self.load_baseline_behaviors()
        self.alertThreshold = 0.85
        
    def analyzeInteraction(self, user_id, prompt, response, metadata):
        """
        Performs comprehensive behavioral analysis
        """
        # Extract behavioral features
        features = self.extractFeatures(prompt, response, metadata)
        
        # Add to history
        self.behavior_history.append({
            'user_id': user_id,
            'timestamp': metadata['timestamp'],
            'features': features
        })
        
        # Check for anomalies
        anomaly_score = self.detectAnomaly(features)
        
        # Pattern detection
        patterns = self.detectPatterns()
        
        # Risk assessment
        risk_level = self.assessRisk(anomaly_score, patterns)
        
        return {
            'anomaly_score': anomaly_score,
            'patterns_detected': patterns,
            'risk_level': risk_level,
            'action_required': risk_level > self.alertThreshold
        }
    
    def extractFeatures(self, prompt, response, metadata):
        """
        Extracts behavioral features for analysis
        """
        features = {
            # Temporal features
            'time_of_day': metadata['timestamp'].hour,
            'day_of_week': metadata['timestamp'].weekday(),
            'request_frequency': self.calculateFrequency(metadata['user_id']),
            
            # Content features
            'prompt_length': len(prompt),
            'response_length': len(response),
            'prompt_complexity': self.calculateComplexity(prompt),
            'topic_consistency': self.calculateTopicConsistency(prompt),
            
            # Interaction features
            'question_type': self.classifyQuestionType(prompt),
            'sentiment_score': self.analyzeSentiment(prompt),
            'urgency_indicators': self.detectUrgency(prompt),
            
            # Security features
            'encoding_present': self.detectEncoding(prompt),
            'injection_keywords': self.countInjectionKeywords(prompt),
            'system_references': self.countSystemReferences(prompt),
        }
        
        return features
    
    def detectPatterns(self):
        """
        Identifies suspicious behavioral patterns
        """
        patterns = []
        
        # Check for velocity attacks
        if self.detectVelocityAttack():
            patterns.append('velocity_attack')
        
        # Check for reconnaissance patterns
        if self.detectReconnaissance():
            patterns.append('reconnaissance')
        
        # Check for escalation patterns
        if self.detectPrivilegeEscalation():
            patterns.append('privilege_escalation')
        
        return patterns
    
    def detectVelocityAttack(self):
        """
        Detects rapid-fire attack attempts
        """
        if len(self.behaviorHistory) < 10:
            return False
        
        recent = list(self.behaviorHistory)[-10:]
        time_diffs = []
        
        for i in range(1, len(recent)):
            diff = (recent[i]['timestamp'] - recent[i-1]['timestamp']).seconds
            time_diffs.append(diff)
        
        # Check if requests are too rapid
        avg_diff = np.mean(time_diffs)
        return avg_diff < 2  # Less than 2 seconds average

This code monitors user behavior when interacting with an AI system to detect unusual or risky activity. It collects features such as timing, prompt length, sentiment, complexity, and security-related keywords. An Isolation Forest model checks whether the behavior is normal or suspicious. It also looks for specific attack patterns, such as very rapid requests, probing for system details, or attempts to escalate privileges. The system then assigns a risk level, and if the risk is high, it signals that immediate action may be required.

class OutputSanitizer:
    def __init__(self):
        self.sensitive_patterns = self.load_sensitive_patterns()
        self.pii_detector = self.initialize_pii_detector()
        
    def sanitizeOutput(self, raw_output, context):
        """
        Multi-stage output sanitization pipeline
        """
        # Stage 1: Remove sensitive data
        output = self.removeSensitiveData(raw_output)
        
        # Stage 2: PII detection and masking
        output = self.maskPii(output)
        
        # Stage 3: URL and email sanitization
        output = self.sanitizeUrlsEmails(output)
        
        # Stage 4: Code injection prevention
        output = self.preventCodeInjection(output)
        
        # Stage 5: Context-aware filtering
        output = self.contextFilter(output, context)
        
        # Stage 6: Final validation
        if not self.finalValidation(output):
            return "[Output blocked due to security concerns]"
        
        return output
    
    def removeSensitiveData(self, text):
        """
        Removes potentially sensitive information
        """
        sensitive_patterns = [
            r'\b[A-Za-z0-9+/]{40}\b',  # API keys
            r'\b[0-9]{3}-[0-9]{2}-[0-9]{4}\b',  # SSN
            r'\b[0-9]{16}\b',  # Credit card numbers
            r'password\s*[:=]\s*\S+',  # Passwords
            r'BEGIN RSA PRIVATE KEY.*END RSA PRIVATE KEY',  # Private keys
        ]
        
        for pattern in sensitive_patterns:
            text = re.sub(pattern, '[REDACTED]', text, flags=re.DOTALL)
        
        return text
    
    def maskPii(self, text):
        """
        Masks personally identifiable information
        """
        # This would use a proper NER model in production
        pii_entities = self.piiDetector.detect(text)
        
        for entity in pii_entities:
            if entity['type'] in ['PERSON', 'EMAIL', 'PHONE', 'ADDRESS']:
                mask = f"[{entity['type']}]"
                text = text.replace(entity['text'], mask)
        
        return text
    
    def preventCodeInjection(self, text):
        """
        Prevents code injection in output
        """
        # Escape HTML/JavaScript
        text = text.replace('<', '<').replace('>', '>')
        text = re.sub(r'<script.*?</script>', '[SCRIPT REMOVED]', text, flags=re.DOTALL)
        
        # Remove potential SQL injection
        sql_keywords = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'EXEC', 'UNION']
        for keyword in sql_keywords:
            pattern = rf'\b{keyword}\b.*?(;|$)'
            text = re.sub(pattern, '[SQL REMOVED]', text, flags=re.IGNORECASE)
        
        return text

This code cleans and secures the AI’s output before it is shown to a user. It removes sensitive data such as API keys, credit card numbers, passwords, or private keys. It then detects and masks personal information, including names, emails, phone numbers, and addresses. The system also sanitizes URLs and emails, blocks possible code or script injections, and applies context-aware filters to prevent unsafe content. Finally, a validation step checks that the cleaned output meets safety rules. If any issues remain, the output is blocked for security reasons.

class HumanInTheLoop:
    def __init__(self):
        self.review_queue = []
        self.risk_thresholds = {
            'low': 0.3,
            'medium': 0.6,
            'high': 0.8,
            'critical': 0.95
        }
    
    def evaluateForReview(self, interaction):
        """
        Determines if human review is needed
        """
        risk_score = interaction['risk_score']
        
        # Always require human review for critical risks
        if risk_score >= self.risk_thresholds['critical']:
            return self.escalateToHuman(interaction, priority='URGENT')
        
        # Check specific triggers
        triggers = [
            'financial_transaction',
            'data_export',
            'system_modification',
            'user_data_access',
            'code_generation',
        ]
        
        for trigger in triggers:
            if trigger in interaction['categories']:
                return self.escalateToHuman(interaction, priority='HIGH')
        
        # Probabilistic review for medium risks
        if risk_score >= self.risk_thresholds['medium']:
            if random.random() < risk_score:
                return self.escalateToHuman(interaction, priority='NORMAL')
        
        return None
    
    def escalateToHuman(self, interaction, priority='NORMAL'):
        """
        Adds interaction to human review queue
        """
        review_item = {
            'id': str(uuid.uuid4()),
            'timestamp': datetime.utcnow(),
            'priority': priority,
            'interaction': interaction,
            'status': 'PENDING',
            'reviewer': None,
            'decision': None
        }
        
        self.review_queue.append(review_item)
        
        # Send notification based on priority
        if priority == 'URGENT':
            self.sendUrgentAlert(review_item)
        
        return review_item['id']

This code decides when an AI system should involve a human reviewer to ensure safety and accuracy. It evaluates each interaction’s risk score and automatically escalates high-risk or critical cases for human review. It also flags interactions involving sensitive actions, such as financial transactions, data access, or system changes. Medium-risk cases may be reviewed based on probability. When escalation is needed, the system creates a review task with a priority level, adds it to a queue, and sends alerts for urgent issues. This framework ensures human judgment is used whenever machine decisions may not be sufficient.


So, in this post, we’ve discussed some of the defensive mechanisms & we’ll deep dive more about this in the next & final post.

We’ll meet again in our next instalment. Till then, Happy Avenging! 🙂

Real-time video summary assistance App – Part 2

As a continuation of the previous post, I would like to continue my discussion about the implementation of MCP protocols among agents. But before that, I want to add the quick demo one more time to recap our objectives.

Let us recap the process flow –

Also, understand the groupings of scripts by each group as posted in the previous post –

Message-Chaining Protocol (MCP) Implementation:

    clsMCPMessage.py
    clsMCPBroker.py

YouTube Transcript Extraction:

    clsYouTubeVideoProcessor.py

Language Detection:

    clsLanguageDetector.py

Translation Services & Agents:

    clsTranslationAgent.py
    clsTranslationService.py

Documentation Agent:

    clsDocumentationAgent.py
    
Research Agent:

    clsDocumentationAgent.py

Great! Now, we’ll continue with the main discussion.


def extract_youtube_id(youtube_url):
    """Extract YouTube video ID from URL"""
    youtube_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url)
    if youtube_id_match:
        return youtube_id_match.group(1)
    return None

def get_youtube_transcript(youtube_url):
    """Get transcript from YouTube video"""
    video_id = extract_youtube_id(youtube_url)
    if not video_id:
        return {"error": "Invalid YouTube URL or ID"}
    
    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        
        # First try to get manual transcripts
        try:
            transcript = transcript_list.find_manually_created_transcript(["en"])
            transcript_data = transcript.fetch()
            print(f"Debug - Manual transcript format: {type(transcript_data)}")
            if transcript_data and len(transcript_data) > 0:
                print(f"Debug - First item type: {type(transcript_data[0])}")
                print(f"Debug - First item sample: {transcript_data[0]}")
            return {"text": transcript_data, "language": "en", "auto_generated": False}
        except Exception as e:
            print(f"Debug - No manual transcript: {str(e)}")
            # If no manual English transcript, try any available transcript
            try:
                available_transcripts = list(transcript_list)
                if available_transcripts:
                    transcript = available_transcripts[0]
                    print(f"Debug - Using transcript in language: {transcript.language_code}")
                    transcript_data = transcript.fetch()
                    print(f"Debug - Auto transcript format: {type(transcript_data)}")
                    if transcript_data and len(transcript_data) > 0:
                        print(f"Debug - First item type: {type(transcript_data[0])}")
                        print(f"Debug - First item sample: {transcript_data[0]}")
                    return {
                        "text": transcript_data, 
                        "language": transcript.language_code, 
                        "auto_generated": transcript.is_generated
                    }
                else:
                    return {"error": "No transcripts available for this video"}
            except Exception as e:
                return {"error": f"Error getting transcript: {str(e)}"}
    except Exception as e:
        return {"error": f"Error getting transcript list: {str(e)}"}

# ----------------------------------------------------------------------------------
# YouTube Video Processor
# ----------------------------------------------------------------------------------

class clsYouTubeVideoProcessor:
    """Process YouTube videos using the agent system"""
    
    def __init__(self, documentation_agent, translation_agent, research_agent):
        self.documentation_agent = documentation_agent
        self.translation_agent = translation_agent
        self.research_agent = research_agent
    
    def process_youtube_video(self, youtube_url):
        """Process a YouTube video"""
        print(f"Processing YouTube video: {youtube_url}")
        
        # Extract transcript
        transcript_result = get_youtube_transcript(youtube_url)
        
        if "error" in transcript_result:
            return {"error": transcript_result["error"]}
        
        # Start a new conversation
        conversation_id = self.documentation_agent.start_processing()
        
        # Process transcript segments
        transcript_data = transcript_result["text"]
        transcript_language = transcript_result["language"]
        
        print(f"Debug - Type of transcript_data: {type(transcript_data)}")
        
        # For each segment, detect language and translate if needed
        processed_segments = []
        
        try:
            # Make sure transcript_data is a list of dictionaries with text and start fields
            if isinstance(transcript_data, list):
                for idx, segment in enumerate(transcript_data):
                    print(f"Debug - Processing segment {idx}, type: {type(segment)}")
                    
                    # Extract text properly based on the type
                    if isinstance(segment, dict) and "text" in segment:
                        text = segment["text"]
                        start = segment.get("start", 0)
                    else:
                        # Try to access attributes for non-dict types
                        try:
                            text = segment.text
                            start = getattr(segment, "start", 0)
                        except AttributeError:
                            # If all else fails, convert to string
                            text = str(segment)
                            start = idx * 5  # Arbitrary timestamp
                    
                    print(f"Debug - Extracted text: {text[:30]}...")
                    
                    # Create a standardized segment
                    std_segment = {
                        "text": text,
                        "start": start
                    }
                    
                    # Process through translation agent
                    translation_result = self.translation_agent.process_text(text, conversation_id)
                    
                    # Update segment with translation information
                    segment_with_translation = {
                        **std_segment,
                        "translation_info": translation_result
                    }
                    
                    # Use translated text for documentation
                    if "final_text" in translation_result and translation_result["final_text"] != text:
                        std_segment["processed_text"] = translation_result["final_text"]
                    else:
                        std_segment["processed_text"] = text
                    
                    processed_segments.append(segment_with_translation)
            else:
                # If transcript_data is not a list, treat it as a single text block
                print(f"Debug - Transcript is not a list, treating as single text")
                text = str(transcript_data)
                std_segment = {
                    "text": text,
                    "start": 0
                }
                
                translation_result = self.translation_agent.process_text(text, conversation_id)
                segment_with_translation = {
                    **std_segment,
                    "translation_info": translation_result
                }
                
                if "final_text" in translation_result and translation_result["final_text"] != text:
                    std_segment["processed_text"] = translation_result["final_text"]
                else:
                    std_segment["processed_text"] = text
                
                processed_segments.append(segment_with_translation)
                
        except Exception as e:
            print(f"Debug - Error processing transcript: {str(e)}")
            return {"error": f"Error processing transcript: {str(e)}"}
        
        # Process the transcript with the documentation agent
        documentation_result = self.documentation_agent.process_transcript(
            processed_segments,
            conversation_id
        )
        
        return {
            "youtube_url": youtube_url,
            "transcript_language": transcript_language,
            "processed_segments": processed_segments,
            "documentation": documentation_result,
            "conversation_id": conversation_id
        }

Let us understand this step-by-step:

Part 1: Getting the YouTube Transcript

def extract_youtube_id(youtube_url):
    ...

This extracts the unique video ID from any YouTube link. 

def get_youtube_transcript(youtube_url):
    ...
  • This gets the actual spoken content of the video.
  • It tries to get a manual transcript first (created by humans).
  • If not available, it falls back to an auto-generated version (created by YouTube’s AI).
  • If nothing is found, it gives back an error message like: “Transcript not available.”

Part 2: Processing the Video with Agents

class clsYouTubeVideoProcessor:
    ...

This is like the control center that tells each intelligent agent what to do with the transcript. Here are the detailed steps:

1. Start the Process

def process_youtube_video(self, youtube_url):
    ...
  • The system starts with a YouTube video link.
  • It prints a message like: “Processing YouTube video: [link]”

2. Extract the Transcript

  • The system runs the get_youtube_transcript() function.
  • If it fails, it returns an error (e.g., invalid link or no subtitles available).

3. Start a “Conversation”

  • The documentation agent begins a new session, tracked by a unique conversation ID.
  • Think of this like opening a new folder in a shared team workspace to store everything related to this video.

4. Go Through Each Segment of the Transcript

  • The spoken text is often broken into small parts (segments), like subtitles.
  • For each part:
    • It checks the text.
    • It finds out the time that part was spoken.
    • It sends it to the translation agent to clean up or translate the text.

5. Translate (if needed)

  • If the translation agent finds a better or translated version, it replaces the original.
  • Otherwise, it keeps the original.

6. Prepare for Documentation

  • After translation, the segment is passed to the documentation agent.
  • This agent might:
    • Summarize the content,
    • Highlight important terms,
    • Structure it into a readable format.

7. Return the Final Result

The system gives back a structured package with:

  • The video link
  • The original language
  • The transcript in parts (processed and translated)
  • A documentation summary
  • The conversation ID (for tracking or further updates)

class clsDocumentationAgent:
    """Documentation Agent built with LangChain"""
    
    def __init__(self, agent_id: str, broker: clsMCPBroker):
        self.agent_id = agent_id
        self.broker = broker
        self.broker.register_agent(agent_id)
        
        # Initialize LangChain components
        self.llm = ChatOpenAI(
            model="gpt-4-0125-preview",
            temperature=0.1,
            api_key=OPENAI_API_KEY
        )
        
        # Create tools
        self.tools = [
            clsSendMessageTool(sender_id=self.agent_id, broker=self.broker)
        ]
        
        # Set up LLM with tools
        self.llm_with_tools = self.llm.bind(
            tools=[tool.tool_config for tool in self.tools]
        )
        
        # Setup memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        
        # Create prompt
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a Documentation Agent for YouTube video transcripts. Your responsibilities include:
                1. Process YouTube video transcripts
                2. Identify key points, topics, and main ideas
                3. Organize content into a coherent and structured format
                4. Create concise summaries
                5. Request research information when necessary
                
                When you need additional context or research, send a request to the Research Agent.
                Always maintain a professional tone and ensure your documentation is clear and organized.
            """),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        # Create agent
        self.agent = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: self.memory.load_memory_variables({})["chat_history"],
                "agent_scratchpad": lambda x: format_to_openai_tool_messages(x["intermediate_steps"]),
            }
            | self.prompt
            | self.llm_with_tools
            | OpenAIToolsAgentOutputParser()
        )
        
        # Create agent executor
        self.agent_executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            verbose=True,
            memory=self.memory
        )
        
        # Video data
        self.current_conversation_id = None
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
    def start_processing(self) -> str:
        """Start processing a new video"""
        self.current_conversation_id = str(uuid.uuid4())
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
        return self.current_conversation_id
    
    def process_transcript(self, transcript_segments, conversation_id=None):
        """Process a YouTube transcript"""
        if not conversation_id:
            conversation_id = self.start_processing()
        self.current_conversation_id = conversation_id
        
        # Store transcript segments
        self.transcript_segments = transcript_segments
        
        # Process segments
        processed_segments = []
        for segment in transcript_segments:
            processed_result = self.process_segment(segment)
            processed_segments.append(processed_result)
        
        # Generate summary
        summary = self.generate_summary()
        
        return {
            "processed_segments": processed_segments,
            "summary": summary,
            "conversation_id": conversation_id
        }
    
    def process_segment(self, segment):
        """Process individual transcript segment"""
        text = segment.get("text", "")
        start = segment.get("start", 0)
        
        # Use LangChain agent to process the segment
        result = self.agent_executor.invoke({
            "input": f"Process this video transcript segment at timestamp {start}s: {text}. If research is needed, send a request to the research_agent."
        })
        
        # Update video notes
        timestamp = start
        self.video_notes[timestamp] = {
            "text": text,
            "analysis": result["output"]
        }
        
        return {
            "timestamp": timestamp,
            "text": text,
            "analysis": result["output"]
        }
    
    def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
        """Handle an incoming MCP message"""
        if message.message_type == "research_response":
            # Process research information received from Research Agent
            research_info = message.content.get("text", "")
            
            result = self.agent_executor.invoke({
                "input": f"Incorporate this research information into video analysis: {research_info}"
            })
            
            # Send acknowledgment back to Research Agent
            response = clsMCPMessage(
                sender=self.agent_id,
                receiver=message.sender,
                message_type="acknowledgment",
                content={"text": "Research information incorporated into video analysis."},
                reply_to=message.id,
                conversation_id=message.conversation_id
            )
            
            self.broker.publish(response)
            return response
        
        elif message.message_type == "translation_response":
            # Process translation response from Translation Agent
            translation_result = message.content
            
            # Process the translated text
            if "final_text" in translation_result:
                text = translation_result["final_text"]
                original_text = translation_result.get("original_text", "")
                language_info = translation_result.get("language", {})
                
                result = self.agent_executor.invoke({
                    "input": f"Process this translated text: {text}\nOriginal language: {language_info.get('language', 'unknown')}\nOriginal text: {original_text}"
                })
                
                # Update notes with translation information
                for timestamp, note in self.video_notes.items():
                    if note["text"] == original_text:
                        note["translated_text"] = text
                        note["language"] = language_info
                        break
            
            return None
        
        return None
    
    def run(self):
        """Run the agent to listen for MCP messages"""
        print(f"Documentation Agent {self.agent_id} is running...")
        while True:
            message = self.broker.get_message(self.agent_id, timeout=1)
            if message:
                self.handle_mcp_message(message)
            time.sleep(0.1)
    
    def generate_summary(self) -> str:
        """Generate a summary of the video"""
        if not self.video_notes:
            return "No video data available to summarize."
        
        all_notes = "\n".join([f"{ts}: {note['text']}" for ts, note in self.video_notes.items()])
        
        result = self.agent_executor.invoke({
            "input": f"Generate a concise summary of this YouTube video, including key points and topics:\n{all_notes}"
        })
        
        return result["output"]

Let us understand the key methods in a step-by-step manner:

The Documentation Agent is like a smart assistant that watches a YouTube video, takes notes, pulls out important ideas, and creates a summary — almost like a professional note-taker trained to help educators, researchers, and content creators. It works with a team of other assistants, like a Translator Agent and a Research Agent, and they all talk to each other through a messaging system.

1. Starting to Work on a New Video

    def start_processing(self) -> str
    

    When a new video is being processed:

    • A new project ID is created.
    • Old notes and transcripts are cleared to start fresh.

    2. Processing the Whole Transcript

    def process_transcript(...)
    

    This is where the assistant:

    • Takes in the full transcript (what was said in the video).
    • Breaks it into small parts (like subtitles).
    • Sends each part to the smart brain for analysis.
    • Collects the results.
    • Finally, a summary of all the main ideas is created.

    3. Processing One Transcript Segment at a Time

    def process_segment(self, segment)
    

    For each chunk of the video:

    • The assistant reads the text and timestamp.
    • It asks GPT-4 to analyze it and suggest important insights.
    • It saves that insight along with the original text and timestamp.

    4. Handling Incoming Messages from Other Agents

    def handle_mcp_message(self, message)
    

    The assistant can also receive messages from teammates (other agents):

    If the message is from the Research Agent:

    • It reads new information and adds it to its notes.
    • It replies with a thank-you message to say it got the research.

    If the message is from the Translation Agent:

    • It takes the translated version of a transcript.
    • Updates its notes to reflect the translated text and its language.

    This is like a team of assistants emailing back and forth to make sure the notes are complete and accurate.

    5. Summarizing the Whole Video

    def generate_summary(self)
    

    After going through all the transcript parts, the agent asks GPT-4 to create a short, clean summary — identifying:

    • Main ideas
    • Key talking points
    • Structure of the content

    The final result is clear, professional, and usable in learning materials or documentation.


    class clsResearchAgent:
        """Research Agent built with AutoGen"""
        
        def __init__(self, agent_id: str, broker: clsMCPBroker):
            self.agent_id = agent_id
            self.broker = broker
            self.broker.register_agent(agent_id)
            
            # Configure AutoGen directly with API key
            if not OPENAI_API_KEY:
                print("Warning: OPENAI_API_KEY not set for ResearchAgent")
                
            # Create config list directly instead of loading from file
            config_list = [
                {
                    "model": "gpt-4-0125-preview",
                    "api_key": OPENAI_API_KEY
                }
            ]
            # Create AutoGen assistant for research
            self.assistant = AssistantAgent(
                name="research_assistant",
                system_message="""You are a Research Agent for YouTube videos. Your responsibilities include:
                    1. Research topics mentioned in the video
                    2. Find relevant information, facts, references, or context
                    3. Provide concise, accurate information to support the documentation
                    4. Focus on delivering high-quality, relevant information
                    
                    Respond directly to research requests with clear, factual information.
                """,
                llm_config={"config_list": config_list, "temperature": 0.1}
            )
            
            # Create user proxy to handle message passing
            self.user_proxy = UserProxyAgent(
                name="research_manager",
                human_input_mode="NEVER",
                code_execution_config={"work_dir": "coding", "use_docker": False},
                default_auto_reply="Working on the research request..."
            )
            
            # Current conversation tracking
            self.current_requests = {}
        
        def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
            """Handle an incoming MCP message"""
            if message.message_type == "request":
                # Process research request from Documentation Agent
                request_text = message.content.get("text", "")
                
                # Use AutoGen to process the research request
                def research_task():
                    self.user_proxy.initiate_chat(
                        self.assistant,
                        message=f"Research request for YouTube video content: {request_text}. Provide concise, factual information."
                    )
                    # Return last assistant message
                    return self.assistant.chat_messages[self.user_proxy.name][-1]["content"]
                
                # Execute research task
                research_result = research_task()
                
                # Send research results back to Documentation Agent
                response = clsMCPMessage(
                    sender=self.agent_id,
                    receiver=message.sender,
                    message_type="research_response",
                    content={"text": research_result},
                    reply_to=message.id,
                    conversation_id=message.conversation_id
                )
                
                self.broker.publish(response)
                return response
            
            return None
        
        def run(self):
            """Run the agent to listen for MCP messages"""
            print(f"Research Agent {self.agent_id} is running...")
            while True:
                message = self.broker.get_message(self.agent_id, timeout=1)
                if message:
                    self.handle_mcp_message(message)
                time.sleep(0.1)
    

    Let us understand the key methods in detail.

    1. Receiving and Responding to Research Requests

      def handle_mcp_message(self, message)
      

      When the Research Agent gets a message (like a question or request for info), it:

      1. Reads the message to see what needs to be researched.
      2. Asks GPT-4 to find helpful, accurate info about that topic.
      3. Sends the answer back to whoever asked the question (usually the Documentation Agent).

      class clsTranslationAgent:
          """Agent for language detection and translation"""
          
          def __init__(self, agent_id: str, broker: clsMCPBroker):
              self.agent_id = agent_id
              self.broker = broker
              self.broker.register_agent(agent_id)
              
              # Initialize language detector
              self.language_detector = clsLanguageDetector()
              
              # Initialize translation service
              self.translation_service = clsTranslationService()
          
          def process_text(self, text, conversation_id=None):
              """Process text: detect language and translate if needed, handling mixed language content"""
              if not conversation_id:
                  conversation_id = str(uuid.uuid4())
              
              # Detect language with support for mixed language content
              language_info = self.language_detector.detect(text)
              
              # Decide if translation is needed
              needs_translation = True
              
              # Pure English content doesn't need translation
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  needs_translation = False
              
              # For mixed language, check if it's primarily English
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  english_langs = [
                      lang for lang in language_info.get("languages", []) 
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-")
                  ]
                  
                  # If the highest confidence language is English and > 60% confident, don't translate
                  if english_langs and english_langs[0].get("confidence", 0) > 0.6:
                      needs_translation = False
              
              if needs_translation:
                  # Translate using the appropriate service based on language detection
                  translation_result = self.translation_service.translate(text, language_info)
                  
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": translation_result,
                      "final_text": translation_result.get("translated_text", text),
                      "conversation_id": conversation_id
                  }
              else:
                  # Already English or unknown language, return as is
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": {"provider": "none"},
                      "final_text": text,
                      "conversation_id": conversation_id
                  }
          
          def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
              """Handle an incoming MCP message"""
              if message.message_type == "translation_request":
                  # Process translation request from Documentation Agent
                  text = message.content.get("text", "")
                  
                  # Process the text
                  result = self.process_text(text, message.conversation_id)
                  
                  # Send translation results back to requester
                  response = clsMCPMessage(
                      sender=self.agent_id,
                      receiver=message.sender,
                      message_type="translation_response",
                      content=result,
                      reply_to=message.id,
                      conversation_id=message.conversation_id
                  )
                  
                  self.broker.publish(response)
                  return response
              
              return None
          
          def run(self):
              """Run the agent to listen for MCP messages"""
              print(f"Translation Agent {self.agent_id} is running...")
              while True:
                  message = self.broker.get_message(self.agent_id, timeout=1)
                  if message:
                      self.handle_mcp_message(message)
                  time.sleep(0.1)

      Let us understand the key methods in step-by-step manner:

      1. Understanding and Translating Text:

      def process_text(...)
      

      This is the core job of the agent. Here’s what it does with any piece of text:

      Step 1: Detect the Language

      • It tries to figure out the language of the input text.
      • It can handle cases where more than one language is mixed together, which is common in casual speech or subtitles.

      Step 2: Decide Whether to Translate

      • If the text is clearly in English, or it’s unclear what the language is, it decides not to translate.
      • If the text is mostly in another language or has less than 60% confidence in being English, it will translate it into English.

      Step 3: Translate (if needed)

      • If translation is required, it uses the translation service to do the job.
      • Then it packages all the information: the original text, detected language, the translated version, and a unique conversation ID.

      Step 4: Return the Results

      • If no translation is needed, it returns the original text and a note saying “no translation was applied.”

      2. Receiving Messages and Responding

      def handle_mcp_message(...)
      

      The agent listens for messages from other agents. When someone asks it to translate something:

      • It takes the text from the message.
      • Runs it through the process_text function (as explained above).
      • Sends the translated (or original) result to the person who asked.
      class clsTranslationService:
          """Translation service using multiple providers with support for mixed languages"""
          
          def __init__(self):
              # Initialize Sarvam AI client
              self.sarvam_api_key = SARVAM_API_KEY
              self.sarvam_url = "https://api.sarvam.ai/translate"
              
              # Initialize Google Cloud Translation client using simple HTTP requests
              self.google_api_key = GOOGLE_API_KEY
              self.google_translate_url = "https://translation.googleapis.com/language/translate/v2"
          
          def translate_with_sarvam(self, text, source_lang, target_lang="en-IN"):
              """Translate text using Sarvam AI (for Indian languages)"""
              if not self.sarvam_api_key:
                  return {"error": "Sarvam API key not set"}
              
              headers = {
                  "Content-Type": "application/json",
                  "api-subscription-key": self.sarvam_api_key
              }
              
              payload = {
                  "input": text,
                  "source_language_code": source_lang,
                  "target_language_code": target_lang,
                  "speaker_gender": "Female",
                  "mode": "formal",
                  "model": "mayura:v1"
              }
              
              try:
                  response = requests.post(self.sarvam_url, headers=headers, json=payload)
                  if response.status_code == 200:
                      return {"translated_text": response.json().get("translated_text", ""), "provider": "sarvam"}
                  else:
                      return {"error": f"Sarvam API error: {response.text}", "provider": "sarvam"}
              except Exception as e:
                  return {"error": f"Error calling Sarvam API: {str(e)}", "provider": "sarvam"}
          
          def translate_with_google(self, text, target_lang="en"):
              """Translate text using Google Cloud Translation API with direct HTTP request"""
              if not self.google_api_key:
                  return {"error": "Google API key not set"}
              
              try:
                  # Using the translation API v2 with API key
                  params = {
                      "key": self.google_api_key,
                      "q": text,
                      "target": target_lang
                  }
                  
                  response = requests.post(self.google_translate_url, params=params)
                  if response.status_code == 200:
                      data = response.json()
                      translation = data.get("data", {}).get("translations", [{}])[0]
                      return {
                          "translated_text": translation.get("translatedText", ""),
                          "detected_source_language": translation.get("detectedSourceLanguage", ""),
                          "provider": "google"
                      }
                  else:
                      return {"error": f"Google API error: {response.text}", "provider": "google"}
              except Exception as e:
                  return {"error": f"Error calling Google Translation API: {str(e)}", "provider": "google"}
          
          def translate(self, text, language_info):
              """Translate text to English based on language detection info"""
              # If already English or unknown language, return as is
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  return {"translated_text": text, "provider": "none"}
              
              # Handle mixed language content
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  # Strategy for mixed language: 
                  # 1. If one of the languages is English, don't translate the entire text, as it might distort English portions
                  # 2. If no English but contains Indian languages, use Sarvam as it handles code-mixing better
                  # 3. Otherwise, use Google Translate for the primary detected language
                  
                  has_english = False
                  has_indian = False
                  
                  for lang in language_info.get("languages", []):
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-"):
                          has_english = True
                      if lang.get("is_indian", False):
                          has_indian = True
                  
                  if has_english:
                      # Contains English - use Google for full text as it handles code-mixing well
                      return self.translate_with_google(text)
                  elif has_indian:
                      # Contains Indian languages - use Sarvam
                      # Use the highest confidence Indian language as source
                      indian_langs = [lang for lang in language_info.get("languages", []) if lang.get("is_indian", False)]
                      if indian_langs:
                          # Sort by confidence
                          indian_langs.sort(key=lambda x: x.get("confidence", 0), reverse=True)
                          source_lang = indian_langs[0]["language_code"]
                          return self.translate_with_sarvam(text, source_lang)
                      else:
                          # Fallback to primary language
                          if language_info["is_indian"]:
                              return self.translate_with_sarvam(text, language_info["language_code"])
                          else:
                              return self.translate_with_google(text)
                  else:
                      # No English, no Indian languages - use Google for primary language
                      return self.translate_with_google(text)
              else:
                  # Not mixed language - use standard approach
                  if language_info["is_indian"]:
                      # Use Sarvam AI for Indian languages
                      return self.translate_with_sarvam(text, language_info["language_code"])
                  else:
                      # Use Google for other languages
                      return self.translate_with_google(text)

      This Translation Service is like a smart translator that knows how to:

      • Detect what language the text is written in,
      • Choose the best translation provider depending on the language (especially for Indian languages),
      • And then translate the text into English.

      It supports mixed-language content (such as Hindi-English in one sentence) and uses either Google Translate or Sarvam AI, a translation service designed for Indian languages.

      Now, let us understand the key methods in a step-by-step manner:

      1. Translating Using Google Translate

      def translate_with_google(...)
      

      This function uses Google Translate:

      • It sends the text, asks for English as the target language, and gets a translation back.
      • It also detects the source language automatically.
      • If successful, it returns the translated text and the detected original language.
      • If there’s an error, it returns a message saying what went wrong.

      Best For: Non-Indian languages (like Spanish, French, Chinese) and content that is not mixed with English.

      2. Main Translation Logic

      def translate(self, text, language_info)
      

      This is the decision-maker. Here’s how it works:

      Case 1: No Translation Needed

      If the text is already in English or the language is unknown, it simply returns the original text.

      Case 2: Mixed Language (e.g., Hindi + English)

      If the text contains more than one language:

      • ✅ If one part is English → use Google Translate (it’s good with mixed languages).
      • ✅ If it includes Indian languages only → use Sarvam AI (better at handling Indian content).
      • ✅ If it’s neither English nor Indian → use Google Translate.

      The service checks how confident it is about each language in the mix and chooses the most likely one to translate from.

      Case 3: Single Language

      If the text is only in one language:

      • ✅ If it’s an Indian language (like Bengali, Tamil, or Marathi), use Sarvam AI.
      • ✅ If it’s any other language, use Google Translate.

      So, we’ve done it.

      I’ve included the complete working solutions for you in the GitHub Link.

      We’ll cover the detailed performance testing, Optimized configurations & many other useful details in our next post.

      Till then, Happy Avenging! 🙂

      Real-time video summary assistance App – Part 1

      Today, we’ll discuss another topic in our two-part series. We will understand the importance of the MCP protocol for communicating between agents.

      This will be an in-depth highly technical as well as depicting using easy-to-understand visuals.

      But, before that, let us understand the demo first.

      Isn’t it exciting?


      Let us first understand in easy language about the MCP protocol.

      MCP (Multi-Agent Communication Protocol) is a custom message exchange system that facilitates structured and scalable communication among multiple AI agents operating within an application. These agents collaborate asynchronously or in real-time to complete complex tasks by sharing results, context, and commands through a common messaging layer.

      How MCP Protocol Helps:

      FeatureBenefit
      Agent-Oriented ArchitectureEach agent handles a focused task, improving modularity and scalability.
      Event-Driven Message PassingAgents communicate based on triggers, not polling—leading to faster and efficient responses.
      Structured Communication FormatAll messages follow a standard format (e.g., JSON) with metadata for sender, recipient, type, and payload.
      State PreservationAgents maintain context across messages using memory (e.g., ConversationBufferMemory) to ensure coherence.

      How It Works (Step-by-Step):

      • 📥 User uploads or streams a video.
      • 🧑‍💻 MCP Protocol triggers the Transcription Agent to start converting audio into text.
      • 🌐 Translation Agent receives this text (if a different language is needed).
      • 🧾 Summarization Agent receives the translated or original transcript and generates a concise summary.
      • 📚 Research Agent checks for references or terminology used in the video.
      • 📄 Documentation Agent compiles the output into a structured report.
      • 🔁 All communication between agents flows through MCP, ensuring consistent message delivery and coordination.

      Now, let us understand the solution that we intend to implement for our solutions:

      This app provides live summarization and contextual insights from videos such as webinars, interviews, or YouTube recordings using multiple cooperating AI agents. These agents may include:

      • Transcription Agent: Converts spoken words to text.
      • Translation Agent: Translates text to different languages (if needed).
      • Summarization Agent: Generates concise summaries.
      • Research Agent: Finds background or supplementary data related to the discussion.
      • Documentation Agent: Converts outputs into structured reports or learning materials.

      We need to understand one more thing before deep diving into the code. Part of your conversation may be mixed, like part Hindi & part English. So, in that case, it will break the sentences into chunks & then convert all of them into the same language. Hence, the following rules are applied while translating the sentences –


      Now, we will go through the basic frame of the system & try to understand how it fits all the principles that we discussed above for this particular solution mapped against the specific technology –

      1. Documentation Agent built with the LangChain framework
      2. Research Agent built with the AutoGen framework
      3. MCP Broker for seamless communication between agents

      Let us understand from the given picture the flow of the process that our app is trying to implement –


      Great! So, now, we’ll focus on some of the key Python scripts & go through their key features.

      But, before that, we share the group of scripts that belong to specific tasks.

      • clsMCPMessage.py
      • clsMCPBroker.py
      • clsYouTubeVideoProcessor.py
      • clsLanguageDetector.py
      • clsTranslationAgent.py
      • clsTranslationService.py
      • clsDocumentationAgent.py
      • clsResearchAgent.py

      Now, we’ll review some of the script in this post, along with the next post, as a continuation from this post.

      class clsMCPMessage(BaseModel):
          """Message format for MCP protocol"""
          id: str = Field(default_factory=lambda: str(uuid.uuid4()))
          timestamp: float = Field(default_factory=time.time)
          sender: str
          receiver: str
          message_type: str  # "request", "response", "notification"
          content: Dict[str, Any]
          reply_to: Optional[str] = None
          conversation_id: str
          metadata: Dict[str, Any] = {}
          
      class clsMCPBroker:
          """Message broker for MCP protocol communication between agents"""
          
          def __init__(self):
              self.message_queues: Dict[str, queue.Queue] = {}
              self.subscribers: Dict[str, List[str]] = {}
              self.conversation_history: Dict[str, List[clsMCPMessage]] = {}
          
          def register_agent(self, agent_id: str) -> None:
              """Register an agent with the broker"""
              if agent_id not in self.message_queues:
                  self.message_queues[agent_id] = queue.Queue()
                  self.subscribers[agent_id] = []
          
          def subscribe(self, subscriber_id: str, publisher_id: str) -> None:
              """Subscribe an agent to messages from another agent"""
              if publisher_id in self.subscribers:
                  if subscriber_id not in self.subscribers[publisher_id]:
                      self.subscribers[publisher_id].append(subscriber_id)
          
          def publish(self, message: clsMCPMessage) -> None:
              """Publish a message to its intended receiver"""
              # Store in conversation history
              if message.conversation_id not in self.conversation_history:
                  self.conversation_history[message.conversation_id] = []
              self.conversation_history[message.conversation_id].append(message)
              
              # Deliver to direct receiver
              if message.receiver in self.message_queues:
                  self.message_queues[message.receiver].put(message)
              
              # Deliver to subscribers of the sender
              for subscriber in self.subscribers.get(message.sender, []):
                  if subscriber != message.receiver:  # Avoid duplicates
                      self.message_queues[subscriber].put(message)
          
          def get_message(self, agent_id: str, timeout: Optional[float] = None) -> Optional[clsMCPMessage]:
              """Get a message for the specified agent"""
              try:
                  return self.message_queues[agent_id].get(timeout=timeout)
              except (queue.Empty, KeyError):
                  return None
          
          def get_conversation_history(self, conversation_id: str) -> List[clsMCPMessage]:
              """Get the history of a conversation"""
              return self.conversation_history.get(conversation_id, [])

      Imagine a system where different virtual agents (like robots or apps) need to talk to each other. To do that, they send messages back and forth—kind of like emails or text messages. This code is responsible for:

      • Making sure those messages are properly written (like filling out all parts of a form).
      • Making sure messages are delivered to the right people.
      • Keeping a record of conversations so you can go back and review what was said.

      This part (clsMCPMessage) is like a template or a form that every message needs to follow. Each message has:

      • ID: A unique number so every message is different (like a serial number).
      • Time Sent: When the message was created.
      • Sender & Receiver: Who sent the message and who is supposed to receive it.
      • Type of Message: Is it a request, a response, or just a notification?
      • Content: The actual information or question the message is about.
      • Reply To: If this message is answering another one, this tells which one.
      • Conversation ID: So we know which group of messages belongs to the same conversation.
      • Extra Info (Metadata): Any other small details that might help explain the message.

      This (clsMCPBroker) is the system (or “post office”) that makes sure messages get to where they’re supposed to go. Here’s what it does:

      1. Registering an Agent

      • Think of this like signing up a new user in the system.
      • Each agent gets their own personal mailbox (called a “message queue”) so others can send them messages.

      2. Subscribing to Another Agent

      • If Agent A wants to receive copies of messages from Agent B, they can “subscribe” to B.
      • This is like signing up for B’s newsletter—whenever B sends something, A gets a copy.

      3. Sending a Message

      • When someone sends a message:
        • It is saved into a conversation history (like keeping emails in your inbox).
        • It is delivered to the main person it was meant for.
        • And, if anyone subscribed to the sender, they get a copy too—unless they’re already the main receiver (to avoid sending duplicates).

      4. Receiving Messages

      • Each agent can check their personal mailbox to see if they got any new messages.
      • If there are no messages, they’ll either wait for some time or move on.

      5. Viewing Past Conversations

      • You can look up all messages that were part of a specific conversation.
      • This is helpful for remembering what was said earlier.

      In systems where many different smart tools or services need to work together and communicate, this kind of communication system makes sure everything is:

      • Organized
      • Delivered correctly
      • Easy to trace back when needed

      So, in this post, we’ll finish it here. We’ll cover the rest of the post in the next post.

      I’ll bring some more exciting topics in the coming days from the Python verse.

      Till then, Happy Avenging!  🙂

      Monitoring & evaluating the leading LLMs (both the established & new) by Python-based evaluator

      As we’re leaping more & more into the field of Generative AI, one of the frequent questions or challenges people are getting more & more is the performance & other evaluation factors. These factors will eventually bring the fruit of this technology; otherwise, you will end up in technical debt.

      This post will discuss the key snippets of the monitoring app based on the Python-based AI app. But before that, let us first view the demo.

      Isn’t it exciting?


      Let us deep dive into it. But, here is the flow this solution will follow.

      So, the current application will invoke the industry bigshots and some relatively unknown or new LLMs.

      In this case, we’ll evaluate Anthropic, Open AI, DeepSeek, and Bharat GPT’s various models. However, Bharat GPT is open source, so we’ll use the Huggingface library and execute it locally against my MacBook Pro M4 Max.

      The following are the KPIs we’re going to evaluate:

      Here are the lists of dependant python packages that is require to run this application –

      pip install certifi==2024.8.30
      pip install anthropic==0.42.0
      pip install huggingface-hub==0.27.0
      pip install nltk==3.9.1
      pip install numpy==2.2.1
      pip install moviepy==2.1.1
      pip install numpy==2.1.3
      pip install openai==1.59.3
      pip install pandas==2.2.3
      pip install pillow==11.1.0
      pip install pip==24.3.1
      pip install psutil==6.1.1
      pip install requests==2.32.3
      pip install rouge_score==0.1.2
      pip install scikit-learn==1.6.0
      pip install setuptools==70.2.0
      pip install tokenizers==0.21.0
      pip install torch==2.6.0.dev20250104
      pip install torchaudio==2.6.0.dev20250104
      pip install torchvision==0.22.0.dev20250104
      pip install tqdm==4.67.1
      pip install transformers==4.47.1
          @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
          def get_claude_response(self, prompt: str) -> str:
              response = self.anthropic_client.messages.create(
                  model=anthropic_model,
                  max_tokens=maxToken,
                  messages=[{"role": "user", "content": prompt}]
              )
              return response.content[0].text
      1. The Retry Mechanism
        • The @retry line means this function will automatically try again if it fails.
        • It will stop retrying after 3 attempts (stop_after_attempt(3)).
        • It will wait longer between retries, starting at 4 seconds and increasing up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).
      2. The Function Purpose
        • The function takes a message, called prompt, as input (a string of text).
        • It uses a service (likely an AI system like Claude) to generate a response to this prompt.
      3. Sending the Message
        • Inside the function, the code self.anthropic_client.messages.create is the part that actually sends the prompt to the AI.
        • It specifies:Which AI model to use (e.g., anthropic_model).
        • The maximum length of the response (controlled by maxToken).
        • The input message for the AI has a “role” (user), as well as the content of the prompt.
      4. Getting the Response
        • Once the AI generates a response, it’s saved as response.
        • The code retrieves the first part of the response (response.content[0].text) and sends it back to whoever called the function.

      Similarly, it will work for Open AI as well.

          @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
          def get_deepseek_response(self, prompt: str) -> tuple:
              deepseek_api_key = self.deepseek_api_key
      
              headers = {
                  "Authorization": f"Bearer {deepseek_api_key}",
                  "Content-Type": "application/json"
                  }
              
              payload = {
                  "model": deepseek_model,  
                  "messages": [{"role": "user", "content": prompt}],
                  "max_tokens": maxToken
                  }
              
              response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload)
      
              if response.status_code == 200:
                  res = response.json()["choices"][0]["message"]["content"]
              else:
                  res = "API request failed with status code " + str(response.status_code) + ":" + str(response.text)
      
              return res
      1. Retry Mechanism:
        • The @retry line ensures the function will try again if it fails.
        • It will stop retrying after 3 attempts (stop_after_attempt(3)).
        • It waits between retries, starting at 4 seconds and increasing up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).

      1. What the Function Does:
        • The function takes one input, prompt, which is the message or question you want to send to the AI.
        • It returns the AI’s response or an error message.

      1. Preparing to Communicate with the API:
        • API Key: It gets the API key for the DeepSeek service from self.deepseek_api_key.
        • Headers: These tell the API that the request will use the API key (for security) and that the data format is JSON (structured text).
        • Payload: This is the information sent to the AI. It includes:
          • Model: Specifies which version of the AI to use (deepseek_model).
          • Messages: The input message with the role “user” and your prompt.
          • Max Tokens: Defines the maximum size of the AI’s response (maxToken).

      1. Sending the Request:
        • It uses the requests.post() method to send the payload and headers to the DeepSeek API using the URL DEEPSEEK_API_URL.

      1. Processing the Response:
        • If the API responds successfully (status_code == 200):
          • It extracts the AI’s reply from the response data.
          • Specifically, it gets the first choice’s message content: response.json()["choices"][0]["message"]["content"].
        • If there’s an error:
          • It constructs an error message with the status code and detailed error text from the API.

      1. Returning the Result:
        • The function outputs either the AI’s response or the error message.
          @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
          def get_bharatgpt_response(self, prompt: str) -> tuple:
              try:
                  messages = [[{"role": "user", "content": prompt}]]
                  
                  response = pipe(messages, max_new_tokens=maxToken,)
      
                  # Extract 'content' field safely
                  res = next((entry.get("content", "")
                              for entry in response[0][0].get("generated_text", [])
                              if isinstance(entry, dict) and entry.get("role") == "assistant"
                              ),
                              None,
                              )
                  
                  return res
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return ""
      1. Retry Mechanism:The @retry ensures the function will try again if it fails.
        • It will stop retrying after 3 attempts (stop_after_attempt(3)).
        • The waiting time between retries starts at 4 seconds and increases exponentially up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).
      2. What the Function Does:The function takes one input, prompt, which is the message or question you want to send to BharatGPT.
        • It returns the AI’s response or an empty string if something goes wrong.
      3. Sending the Prompt:Messages Structure: The function wraps the user’s prompt in a format that the BharatGPT AI understands:
        • messages = [[{"role": "user", "content": prompt}]]
        • This tells the AI that the prompt is coming from the “user.”
      4. Pipe Function: It uses a pipe() method to send the messages to the AI system.
        • max_new_tokens=maxToken: Limits how long the AI’s response can be.
      5. Extracting the Response:The response from the AI is in a structured format. The code looks for the first piece of text where:
        • The role is “assistant” (meaning it’s the AI’s reply).
        • The text is in the “content” field.
        • The next() function safely extracts this “content” field or returns None if it can’t find it.
      6. Error Handling:If something goes wrong (e.g., the AI doesn’t respond or there’s a technical issue), the code:
        • Captures the error message in e.
        • Prints the error message: print('Error: ', x).
        • Returns an empty string ("") instead of crashing.
      7. Returning the Result:If everything works, the function gives you the AI’s response as plain text.
        • If there’s an error, it gives you an empty string, indicating no response was received.

          def get_model_response(self, model_name: str, prompt: str) -> ModelResponse:
              """Get response from specified model with metrics"""
              start_time = time.time()
              start_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
      
              try:
                  if model_name == "claude-3":
                      response_content = self.get_claude_response(prompt)
                  elif model_name == "gpt4":
                      response_content = self.get_gpt4_response(prompt)
                  elif model_name == "deepseek-chat":
                      response_content = self.get_deepseek_response(prompt)
                  elif model_name == "bharat-gpt":
                      response_content = self.get_bharatgpt_response(prompt)
      
                  # Model-specific API calls 
                  token_count = len(self.bert_tokenizer.encode(response_content))
                  
                  end_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
                  memory_usage = end_memory - start_memory
                  
                  return ModelResponse(
                      content=response_content,
                      response_time=time.time() - start_time,
                      token_count=token_count,
                      memory_usage=memory_usage
                  )
              except Exception as e:
                  logging.error(f"Error getting response from {model_name}: {str(e)}")
                  return ModelResponse(
                      content="",
                      response_time=0,
                      token_count=0,
                      memory_usage=0,
                      error=str(e)
                  )

      Start Tracking Time and Memory:

        • The function starts a timer (start_time) to measure how long it takes to get a response.
        • It also checks how much memory is being used at the beginning (start_memory).

        Choose the AI Model:

        • Based on the model_name provided, the function selects the appropriate method to get a response:
          • "claude-3" → Calls get_claude_response(prompt).
          • "gpt4" → Calls get_gpt4_response(prompt).
          • "deepseek-chat" → Calls get_deepseek_response(prompt).
          • "bharat-gpt" → Calls get_bharatgpt_response(prompt).

        Process the Response:

        • Once the response is received, the function calculates:
          • Token Count: The number of tokens (small chunks of text) in the response using a tokenizer.
          • Memory Usage: The difference between memory usage after the response (end_memory) and before it (start_memory).

        Return the Results:

        • The function bundles all the information into a ModelResponse object:
          • The AI’s reply (content).
          • How long the response took (response_time).
          • The number of tokens in the reply (token_count).
          • How much memory was used (memory_usage).

        Handle Errors:

        • If something goes wrong (e.g., the AI doesn’t respond), the function:
          • Logs the error message.
          • Returns an empty response with default values and the error message.
            def evaluate_text_quality(self, generated: str, reference: str) -> Dict[str, float]:
                """Evaluate text quality metrics"""
                # BERTScore
                gen_embedding = self.sentence_model.encode([generated])
                ref_embedding = self.sentence_model.encode([reference])
                bert_score = cosine_similarity(gen_embedding, ref_embedding)[0][0]
        
                # BLEU Score
                generated_tokens = word_tokenize(generated.lower())
                reference_tokens = word_tokenize(reference.lower())
                bleu = sentence_bleu([reference_tokens], generated_tokens)
        
                # METEOR Score
                meteor = meteor_score([reference_tokens], generated_tokens)
        
                return {
                    'bert_score': bert_score,
                    'bleu_score': bleu,
                    'meteor_score': meteor
                }

        Inputs:

        • generated: The text produced by the AI.
        • reference: The correct or expected version of the text.

        Calculating BERTScore:

        • Converts the generated and reference texts into numerical embeddings (mathematical representations) using a pre-trained model (self.sentence_model.encode).
        • Measures the similarity between the two embeddings using cosine similarity. This gives the bert_score, which ranges from -1 (completely different) to 1 (very similar).

        Calculating BLEU Score:

        • Breaks the generated and reference texts into individual words (tokens) using word_tokenize.
        • Converts both texts to lowercase for consistent comparison.
        • Calculates the BLEU Score (sentence_bleu), which checks how many words or phrases in the generated text overlap with the reference. BLEU values range from 0 (no match) to 1 (perfect match).

        Calculating METEOR Score:

        • Also uses the tokenized versions of generated and reference texts.
        • Calculates the METEOR Score (meteor_score), which considers exact matches, synonyms, and word order. Scores range from 0 (no match) to 1 (perfect match).

        Returning the Results:

        • Combines the three scores into a dictionary with the keys 'bert_score''bleu_score', and 'meteor_score'.

        Similarly, other functions are developed.

            def run_comprehensive_evaluation(self, evaluation_data: List[Dict]) -> pd.DataFrame:
                """Run comprehensive evaluation on all metrics"""
                results = []
                
                for item in evaluation_data:
                    prompt = item['prompt']
                    reference = item['reference']
                    task_criteria = item.get('task_criteria', {})
                    
                    for model_name in self.model_configs.keys():
                        # Get multiple responses to evaluate reliability
                        responses = [
                            self.get_model_response(model_name, prompt)
                            for _ in range(3)  # Get 3 responses for reliability testing
                        ]
                        
                        # Use the best response for other evaluations
                        best_response = max(responses, key=lambda x: len(x.content) if not x.error else 0)
                        
                        if best_response.error:
                            logging.error(f"Error in model {model_name}: {best_response.error}")
                            continue
                        
                        # Gather all metrics
                        metrics = {
                            'model': model_name,
                            'prompt': prompt,
                            'response': best_response.content,
                            **self.evaluate_text_quality(best_response.content, reference),
                            **self.evaluate_factual_accuracy(best_response.content, reference),
                            **self.evaluate_task_performance(best_response.content, task_criteria),
                            **self.evaluate_technical_performance(best_response),
                            **self.evaluate_reliability(responses),
                            **self.evaluate_safety(best_response.content)
                        }
                        
                        # Add business impact metrics using task performance
                        metrics.update(self.evaluate_business_impact(
                            best_response,
                            metrics['task_completion']
                        ))
                        
                        results.append(metrics)
                
                return pd.DataFrame(results)
        • Input:
          • evaluation_data: A list of test cases, where each case is a dictionary containing:
            • prompt: The question or input to the AI model.
            • reference: The ideal or expected answer.
            • task_criteria (optional): Additional rules or requirements for the task.
        • Initialize Results:
          • An empty list results is created to store the evaluation metrics for each model and test case.
        • Iterate Through Test Cases:
          • For each item in the evaluation_data:
            • Extract the promptreference, and task_criteria.
        • Evaluate Each Model:
          • Loop through all available AI models (self.model_configs.keys()).
          • Generate three responses for each model to test reliability.
        • Select the Best Response:
          • Out of the three responses, pick the one with the most content (best_response), ignoring responses with errors.
        • Handle Errors:
          • If a response has an error, log the issue and skip further evaluation for that model.
        • Evaluate Metrics:
          • Using the best_response, calculate a variety of metrics, including:
            • Text Quality: How similar the response is to the reference.
            • Factual Accuracy: Whether the response is factually correct.
            • Task Performance: How well it meets task-specific criteria.
            • Technical Performance: Evaluate time, memory, or other system-related metrics.
            • Reliability: Check consistency across multiple responses.
            • Safety: Ensure the response is safe and appropriate.
        • Evaluate Business Impact:
          • Add metrics for business impact (e.g., how well the task was completed, using task_completion as a key factor).
        • Store Results:
          • Add the calculated metrics for this model and prompt to the results list.
        • Return Results as a DataFrame:
          • Convert the results list into a structured table (a pandas DataFrame) for easy analysis and visualization.

        Great! So, now, we’ve explained the code.

        Let us understand the final outcome of this run & what we can conclude from that.

        1. BERT Score (Semantic Understanding):
          • GPT4 leads slightly at 0.8322 (83.22%)
          • Bharat-GPT close second at 0.8118 (81.18%)
          • Claude-3 at 0.8019 (80.19%)
          • DeepSeek-Chat at 0.7819 (78.19%) Think of this like a “comprehension score” – how well the models understand the context. All models show strong understanding, with only a 5% difference between best and worst.
        2. BLEU Score (Word-for-Word Accuracy):
          • Bharat-GPT leads at 0.0567 (5.67%)
          • Claude-3 at 0.0344 (3.44%)
          • GPT4 at 0.0306 (3.06%)
          • DeepSeek-Chat lowest at 0.0189 (1.89%) These low scores suggest models use different wording than references, which isn’t necessarily bad.
        3. METEOR Score (Meaning Preservation):
          • Bharat-GPT leads at 0.4684 (46.84%)
          • Claude-3 close second at 0.4507 (45.07%)
          • GPT4 at 0.2960 (29.60%)
          • DeepSeek-Chat at 0.2652 (26.52%) This shows how well models maintain meaning while using different words.
        4. Response Time (Speed):
          • Claude-3 fastest: 4.40 seconds
          • Bharat-GPT: 6.35 seconds
          • GPT4: 6.43 seconds
          • DeepSeek-Chat slowest: 8.52 seconds
        5. Safety and Reliability:
          • Error Rate: Perfect 0.0 for all models
          • Toxicity: All very safe (below 0.15%) 
            • Claude-3 safest at 0.0007GPT4 at 0.0008Bharat-GPT at 0.0012
            • DeepSeek-Chat at 0.0014
        6. Cost Efficiency:
          • Claude-3 most economical: $0.0019 per response
          • Bharat-GPT close: $0.0021
          • GPT4: $0.0038
          • DeepSeek-Chat highest: $0.0050

        Key Takeaways by Model:

        1. Claude-3: ✓ Fastest responses ✓ Most cost-effective ✓ Excellent meaning preservation ✓ Lowest toxicity
        2. Bharat-GPT: ✓ Best BLEU and METEOR scores ✓ Strong semantic understanding ✓ Cost-effective ✗ Moderate response time
        3. GPT4: ✓ Best semantic understanding ✓ Good safety metrics ✗ Higher cost ✗ Moderate response time
        4. DeepSeek-Chat: ✗ Generally lower performance ✗ Slowest responses ✗ Highest cost ✗ Slightly higher toxicity

        Reliability of These Statistics:

        Strong Points:

        • Comprehensive metric coverage
        • Consistent patterns across evaluations
        • Zero error rates show reliability
        • Clear differentiation between models

        Limitations:

        • BLEU scores are quite low across all models
        • Doesn’t measure creative or innovative responses
        • May not reflect specific use case performance
        • Single snapshot rather than long-term performance

        Final Observation:

        1. Best Overall Value: Claude-3
          • Fast, cost-effective, safe, good performance
        2. Best for Accuracy: Bharat-GPT
          • Highest meaning preservation and precision
        3. Best for Understanding: GPT4
          • Strongest semantic comprehension
        4. Consider Your Priorities: 
          • Speed → Choose Claude-3
          • Cost → Choose Claude-3 or Bharat-GPT
          • Accuracy → Choose Bharat-GPT
          • Understanding → Choose GPT4

        These statistics provide reliable comparative data but should be part of a broader decision-making process that includes your specific needs, budget, and use cases.


        For the Bharat GPT model, we’ve tested this locally on my MacBook Pro 4 Max. And, the configuration is as follows –

        I’ve tried the API version locally, & it provided a similar performance against the stats that we received by running locally. Unfortunately, they haven’t made the API version public yet.

        So, apart from the Anthropic & Open AI, I’ll watch this new LLM (Bharat GPT) for overall stats in the coming days.


        So, we’ve done it.

        You can find the detailed code at the GitHub link.

        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Enabling & Exploring Stable Defussion – Part 3

        Before we dive into the details of this post, let us provide the previous two links that precede it.

        Enabling & Exploring Stable Defussion – Part 1

        Enabling & Exploring Stable Defussion – Part 2

        For, reference, we’ll share the demo before deep dive into the actual follow-up analysis in the below section –


        Now, let us continue our discussions from where we left.

        class clsText2Image:
            def __init__(self, pipe, output_path, filename):
        
                self.pipe = pipe
                
                # More aggressive attention slicing
                self.pipe.enable_attention_slicing(slice_size=1)
        
                self.output_path = f"{output_path}{filename}"
                
                # Warm up the pipeline
                self._warmup()
            
            def _warmup(self):
                """Warm up the pipeline to optimize memory allocation"""
                with torch.no_grad():
                    _ = self.pipe("warmup", num_inference_steps=1, height=512, width=512)
                torch.mps.empty_cache()
                gc.collect()
            
            def generate(self, prompt, num_inference_steps=12, guidance_scale=3.0):
                try:
                    torch.mps.empty_cache()
                    gc.collect()
                    
                    with torch.autocast(device_type="mps"):
                        with torch.no_grad():
                            image = self.pipe(
                                prompt,
                                num_inference_steps=num_inference_steps,
                                guidance_scale=guidance_scale,
                                height=1024,
                                width=1024,
                            ).images[0]
                    
                    image.save(self.output_path)
                    return 0
                except Exception as e:
                    print(f'Error: {str(e)}')
                    return 1
                finally:
                    torch.mps.empty_cache()
                    gc.collect()
        
            def genImage(self, prompt):
                try:
        
                    # Initialize generator
                    x = self.generate(prompt)
        
                    if x == 0:
                        print('Successfully processed first pass!')
                    else:
                        print('Failed complete first pass!')
                        raise 
        
                    return 0
        
                except Exception as e:
                    print(f"\nAn unexpected error occurred: {str(e)}")
        
                    return 1

        This is the initialization method for the clsText2Image class:

        • Takes a pre-configured pipe (text-to-image pipeline), an output_path, and a filename.
        • Enables more aggressive memory optimization by setting “attention slicing.”
        • Prepares the full file path for saving generated images.
        • Calls a _warmup method to pre-load the pipeline and optimize memory allocation.

        This private method warms up the pipeline:

        • Sends a dummy “warmup” request with basic parameters to allocate memory efficiently.
        • Clears any cached memory (torch.mps.empty_cache()) and performs garbage collection (gc.collect()).
        • Ensures smoother operation for future image generation tasks.

        This method generates an image from a text prompt:

        • Clears memory cache and performs garbage collection before starting.
        • Uses the text-to-image pipeline (pipe) to generate an image:
          • Takes the prompt, number of inference steps, and guidance scale as input.
          • Outputs an image at 1024×1024 resolution.
        • Saves the generated image to the specified output path.
        • Returns 0 on success or 1 on failure.
        • Ensures cleanup by clearing memory and collecting garbage, even in case of errors.

        This method simplifies image generation:

        • Calls the generate method with the given prompt.
        • Prints a success message if the image is generated (0 return value).
        • On failure, logs the error and raises an exception.
        • Returns 0 on success or 1 on failure.
        class clsImage2Video:
            def __init__(self, pipeline):
                
                # Optimize model loading
                torch.mps.empty_cache()
                self.pipeline = pipeline
        
            def generate_frames(self, pipeline, init_image, prompt, duration_seconds=10):
                try:
                    torch.mps.empty_cache()
                    gc.collect()
        
                    base_frames = []
                    img = Image.open(init_image).convert("RGB").resize((1024, 1024))
                    
                    for _ in range(10):
                        result = pipeline(
                            prompt=prompt,
                            image=img,
                            strength=0.45,
                            guidance_scale=7.5,
                            num_inference_steps=25
                        ).images[0]
        
                        base_frames.append(np.array(result))
                        img = result
                        torch.mps.empty_cache()
        
                    frames = []
                    for i in range(len(base_frames)-1):
                        frame1, frame2 = base_frames[i], base_frames[i+1]
                        for t in np.linspace(0, 1, int(duration_seconds*24/10)):
                            frame = (1-t)*frame1 + t*frame2
                            frames.append(frame.astype(np.uint8))
                    
                    return frames
                except Exception as e:
                    frames = []
                    print(f'Error: {str(e)}')
        
                    return frames
                finally:
                    torch.mps.empty_cache()
                    gc.collect()
        
            # Main method
            def genVideo(self, prompt, inputImage, targetVideo, fps):
                try:
                    print("Starting animation generation...")
                    
                    init_image_path = inputImage
                    output_path = targetVideo
                    fps = fps
                    
                    frames = self.generate_frames(
                        pipeline=self.pipeline,
                        init_image=init_image_path,
                        prompt=prompt,
                        duration_seconds=20
                    )
                    
                    imageio.mimsave(output_path, frames, fps=30)
        
                    print("Animation completed successfully!")
        
                    return 0
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 1

        This initializes the clsImage2Video class:

        • Clears the GPU cache to optimize memory before loading.
        • Sets up the pipeline for generating frames, which uses an image-to-video transformation model.

        This function generates frames for a video:

        • Starts by clearing GPU memory and running garbage collection.
        • Loads the init_image, resizes it to 1024×1024 pixels, and converts it to RGB format.
        • Iteratively applies the pipeline to transform the image:
          • Uses the prompt and specified parameters like strengthguidance_scale, and num_inference_steps.
          • Stores the resulting frames in a list.
        • Interpolates between consecutive frames to create smooth transitions:
          • Uses linear blending for smooth animation across a specified duration and frame rate (24 fps for 10 segments).
        • Returns the final list of generated frames or an empty list if an error occurs.
        • Always clears memory after execution.

        This is the main function for creating a video from an image and text prompt:

        • Logs the start of the animation generation process.
        • Calls generate_frames() with the given pipelineinputImage, and prompt to create frames.
        • Saves the generated frames as a video using the imageio library, setting the specified frame rate (fps).
        • Logs a success message and returns 0 if the process is successful.
        • On error, logs the issue and returns 1.

        Now, let us understand the performance. But, before that let us explore the device on which we’ve performed these stress test that involves GPU & CPUs as well.

        And, here is the performance stats –

        From the above snapshot, we can clearly communicate that the GPU is 100% utilized. However, the CPU has shown a significant % of availability.

        As you can see, the first pass converts the input prompt to intermediate images within 1 min 30 sec. However, the second pass constitutes multiple hops (11 hops) on an avg 22 seconds. Overall, the application will finish in 5 minutes 36 seconds for a 10-second video clip.


        So, we’ve done it.

        You can find the detailed code at the GitHub link.

        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Enabling & Exploring Stable Defussion – Part 2

        As we’ve started explaining, the importance & usage of Stable Defussion in our previous post:

        Enabling & Exploring Stable Defussion – Part 1

        In today’s post, we’ll discuss another approach, where we built the custom Python-based SDK solution that consumes HuggingFace Library, which generates video out of the supplied prompt.

        But, before that, let us view the demo generated from a custom solution.

        Isn’t it exciting? Let us dive deep into the details.


        Let us understand basic flow of events for the custom solution –

        So, the application will interact with the python-sdk like “stable-diffusion-3.5-large” & “dreamshaper-xl-1-0”, which is available in HuggingFace. As part of the process, these libraries will load all the large models inside the local laptop that require some time depend upon the bandwidth of your internet.

        Before we even deep dive into the code, let us understand the flow of Python scripts as shown below:

        From the above diagram, we can understand that the main application will be triggered by “generateText2Video.py”. As you can see that “clsConfigClient.py” has all the necessary parameter information that will be supplied to all the scripts.

        “generateText2Video.py” will trigger the main class named “clsText2Video.py”, which then calls all the subsequent classes.

        Great! Since we now have better visibility of the script flow, let’s examine the key snippets individually.


        class clsText2Video:
            def __init__(self, model_id_1, model_id_2, output_path, filename, vidfilename, fps, force_cpu=False):
                self.model_id_1 = model_id_1
                self.model_id_2 = model_id_2
                self.output_path = output_path
                self.filename = filename
                self.vidfilename = vidfilename
                self.force_cpu = force_cpu
                self.fps = fps
        
                # Initialize in main process
                os.environ["TOKENIZERS_PARALLELISM"] = "true"
                self.r1 = cm.clsMaster(force_cpu)
                self.torch_type = self.r1.getTorchType()
                
                torch.mps.empty_cache()
                self.pipe = self.r1.getText2ImagePipe(self.model_id_1, self.torch_type)
                self.pipeline = self.r1.getImage2VideoPipe(self.model_id_2, self.torch_type)
        
                self.text2img = cti.clsText2Image(self.pipe, self.output_path, self.filename)
                self.img2vid = civ.clsImage2Video(self.pipeline)
        
            def getPrompt2Video(self, prompt):
                try:
                    input_image = self.output_path + self.filename
                    target_video = self.output_path + self.vidfilename
        
                    if self.text2img.genImage(prompt) == 0:
                        print('Pass 1: Text to intermediate images generated!')
                        
                        if self.img2vid.genVideo(prompt, input_image, target_video, self.fps) == 0:
                            print('Pass 2: Successfully generated!')
                            return 0
                    return 1
                except Exception as e:
                    print(f"\nAn unexpected error occurred: {str(e)}")
                    return 1

        Now, let us interpret:

        This is the initialization method for the class. It does the following:

        • Sets up configurations like model IDs, output paths, filenames, video filename, frames per second (fps), and whether to use the CPU (force_cpu).
        • Configures an environment variable for tokenizer parallelism.
        • Initializes helper classes (clsMaster) to manage system resources and retrieve appropriate PyTorch settings.
        • Creates two pipelines:
          • pipe: For converting text to images using the first model.
          • pipeline: For converting images to video using the second model.
        • Initializes text2img and img2vid objects:
          • text2img handles text-to-image conversions.
          • img2vid handles image-to-video conversions.

        This method generates a video from a text prompt in two steps:

        1. Text-to-Image Conversion:
          • Calls genImage(prompt) using the text2img object to create an intermediate image file.
          • If successful, it prints confirmation.
        2. Image-to-Video Conversion:
          • Uses the img2vid object to convert the intermediate image into a video file.
          • Includes the input image path, target video path, and frames per second (fps).
          • If successful, it prints confirmation.
        • If either step fails, the method returns 1.
        • Logs any unexpected errors and returns 1 in such cases.
        # Set device for Apple Silicon GPU
        def setup_gpu(force_cpu=False):
            if not force_cpu and torch.backends.mps.is_available() and torch.backends.mps.is_built():
                print('Running on Apple Silicon MPS GPU!')
                return torch.device("mps")
            return torch.device("cpu")
        
        ######################################
        ####         Global Flag      ########
        ######################################
        
        class clsMaster:
            def __init__(self, force_cpu=False):
                self.device = setup_gpu(force_cpu)
        
            def getTorchType(self):
                try:
                    # Check if MPS (Apple Silicon GPU) is available
                    if not torch.backends.mps.is_available():
                        torch_dtype = torch.float32
                        raise RuntimeError("MPS (Metal Performance Shaders) is not available on this system.")
                    else:
                        torch_dtype = torch.float16
                    
                    return torch_dtype
                except Exception as e:
                    torch_dtype = torch.float16
                    print(f'Error: {str(e)}')
        
                    return torch_dtype
        
            def getText2ImagePipe(self, model_id, torchType):
                try:
                    device = self.device
        
                    torch.mps.empty_cache()
                    self.pipe = StableDiffusion3Pipeline.from_pretrained(model_id, torch_dtype=torchType, use_safetensors=True, variant="fp16",).to(device)
        
                    return self.pipe
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    torch.mps.empty_cache()
                    self.pipe = StableDiffusion3Pipeline.from_pretrained(model_id, torch_dtype=torchType,).to(device)
        
                    return self.pipe
                
            def getImage2VideoPipe(self, model_id, torchType):
                try:
                    device = self.device
        
                    torch.mps.empty_cache()
                    self.pipeline = StableDiffusionXLImg2ImgPipeline.from_pretrained(model_id, torch_dtype=torchType, use_safetensors=True, use_fast=True).to(device)
        
                    return self.pipeline
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    torch.mps.empty_cache()
                    self.pipeline = StableDiffusionXLImg2ImgPipeline.from_pretrained(model_id, torch_dtype=torchType).to(device)
        
                    return self.pipeline

        Let us interpret:

        This function determines whether to use the Apple Silicon GPU (MPS) or the CPU:

        • If force_cpu is False and the MPS GPU is available, it sets the device to “mps” (Apple GPU) and prints a message.
        • Otherwise, it defaults to the CPU.

        This is the initializer for the clsMaster class:

        • It sets the device to either GPU or CPU using the setup_gpu function (mentioned above) based on the force_cpu flag.

        This method determines the PyTorch data type to use:

        • Checks if MPS GPU is available:
          • If available, uses torch.float16 for optimized performance.
          • If unavailable, defaults to torch.float32 and raises a warning.
        • Handles errors gracefully by defaulting to torch.float16 and printing the error.

        This method initializes a text-to-image pipeline:

        • Loads the Stable Diffusion model with the given model_id and torchType.
        • Configures it for MPS GPU or CPU, based on the device.
        • Clears the GPU cache before loading the model to optimize memory usage.
        • If an error occurs, attempts to reload the pipeline without safetensors.

        This method initializes an image-to-video pipeline:

        • Similar to getText2ImagePipe, it loads the Stable Diffusion XL Img2Img pipeline with the specified model_id and torchType.
        • Configures it for MPS GPU or CPU and clears the cache before loading.
        • On error, reloads the pipeline without additional optimization settings and prints the error.

        Let us continue this in the next post:

        Enabling & Exploring Stable Defussion – Part 3

        Till then, Happy Avenging! 🙂

        Building solutions using LLM AutoGen in Python – Part 3

        Before we dive into the details of this post, let us provide the previous two links that precede it.

        Building solutions using LLM AutoGen in Python – Part 1

        Building solutions using LLM AutoGen in Python – Part 2

        For, reference, we’ll share the demo before deep dive into the actual follow-up analysis in the below section –


        In this post, we will understand the initial code generated & then the revised code to compare them for a better understanding of the impact of revised prompts.

        But, before that let us broadly understand the communication types between the agents.

        • Agents InvolvedAgent1Agent2
        • Flow:
          • Agent1 sends a request directly to Agent2.
          • Agent2 processes the request and sends the response back to Agent1.
        • Use Case: Simple query-response interactions without intermediaries.
        • Agents InvolvedUserAgentMediatorSpecialistAgent1SpecialistAgent2
        • Flow:
          • UserAgent sends input to Mediator.
          • Mediator delegates tasks to SpecialistAgent1 and SpecialistAgent2.
          • Specialists process tasks and return results to Mediator.
          • Mediator consolidates results and sends them back to UserAgent.
        • Agents InvolvedBroadcasterAgentAAgentBAgentC
        • Flow:
          • Broadcaster sends a message to multiple agents simultaneously.
          • Agents that find the message relevant (AgentAAgentC) acknowledge or respond.
        • Use Case: System-wide notifications or alerts.
        • Agents InvolvedSupervisorWorker1Worker2
        • Flow:
          • Supervisor assigns tasks to Worker1 and Worker2.
          • Workers execute tasks and report progress back to Supervisor.
        • Use Case: Task delegation in structured organizations.
        • Agents InvolvedPublisherSubscriber1Topic
        • Flow:
          • Publisher publishes an event or message to a Topic.
          • Subscriber1, who is subscribed to the Topic, receives the event.
        • Use Case: Decoupled systems where publishers and subscribers do not need direct knowledge of each other.
        • Agents InvolvedTriggerEventReactiveAgentNextStep
        • Flow:
          • An event occurs (TriggerEvent).
          • ReactiveAgent detects the event and acts.
          • The action leads to the NextStep in the process.
        • Use Case: Systems that need to respond to asynchronous events or changes in the environment.

        Since, we now understand the basic communication types. Let us understand the AutoGen generated first code & the last code (That satisfies our need) –

        # filename: simple_snake.py (Generated by AutoGen)
        
        import pygame
        import time
        import random
         
        snake_speed = 15
         
        # Window color
        white = pygame.Color(255, 255, 255)
         
        # Snake color
        green = pygame.Color(0, 255, 0)
         
        snake_position = [100, 50]
         
        # defining first 4 blocks 
        # of snake body
        snake_body = [ [100, 50], 
                       [90, 50],
                       [80, 50],
                       [70, 50]
                    ]
        # fruit position
        fruit_position = [random.randrange(1, (1000//10)) * 10, 
                          random.randrange(1, (600//10)) * 10]
        fruit_spawn = True
         
        direction = 'RIGHT'
        change_to = direction
         
        score = 0
         
        # Initialising pygame
        pygame.init()
         
        # Initialise game window
        win = pygame.display.set_mode((1000, 600))
        pygame.display.set_caption("Snake game for kids")
         
        # FPS (frames per second) controller
        fps_controller = pygame.time.Clock()
         
          
        while True:
            # handling key events
            for event in pygame.event.get():
                if event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_UP:
                        change_to = 'UP'
                    if event.key == pygame.K_DOWN:
                        change_to = 'DOWN'
                    if event.key == pygame.K_LEFT:
                        change_to = 'LEFT'
                    if event.key == pygame.K_RIGHT:
                        change_to = 'RIGHT'
        
            # If two keys pressed simultaneously
            # we don't want snake to move into two
            # directions simultaneously
            if change_to == 'UP' and direction != 'DOWN':
                direction = 'UP'
            if change_to == 'DOWN' and direction != 'UP':
                direction = 'DOWN'
            if change_to == 'LEFT' and direction != 'RIGHT':
                direction = 'LEFT'
            if change_to == 'RIGHT' and direction != 'LEFT':
                direction = 'RIGHT'
         
            # Moving the snake
            if direction == 'UP':
                snake_position[1] -= 10
            if direction == 'DOWN':
                snake_position[1] += 10
            if direction == 'LEFT':
                snake_position[0] -= 10
            if direction == 'RIGHT':
                snake_position[0] += 10
         
            # Snake body growing mechanism
            # if fruits and snakes collide then scores
            # will increase by 10
            snake_body.insert(0, list(snake_position))
            if snake_position[0] == fruit_position[0] and snake_position[1] == fruit_position[1]:
                score += 10
                fruit_spawn = False
            else:
                snake_body.pop()
                 
            if not fruit_spawn:
                fruit_position = [random.randrange(1, (1000//10)) * 10, 
                                  random.randrange(1, (600//10)) * 10]
                 
            fruit_spawn = True
            win.fill(white)
            
            for pos in snake_body:
                pygame.draw.rect(win, green, pygame.Rect(pos[0], pos[1], 10, 10))
            pygame.draw.rect(win, white, pygame.Rect(
            fruit_position[0], fruit_position[1], 10, 10))
         
            # Game Over conditions
            if snake_position[0] < 0 or snake_position[0] > 1000-10:
                break
            if snake_position[1] < 0 or snake_position[1] > 600-10:
                break
         
            # Touching the snake body
            for block in snake_body[1:]:
                if snake_position[0] == block[0] and snake_position[1] == block[1]:
                    break
            
            # refresh game screen
            pygame.display.update()
        
            # Frame Per Second /Refresh rate
            fps_controller.tick(snake_speed)
        
        # displaying final score after game over
        print(f"You scored {score} in the game.")

        Now, let us remember the prompt refinement that we’ve done (We’ve already posted that in our last post as well) –

        I want to place the snake within the boundary of the visual wall & it should bounce each wall & then hit the target. And, it should increase the size. And, every time when it hits the target it will increase the score by 10. And, the application will run for 2 minutes, which will be displayed on the screen. This will exit once 2 minute is over.
        # filename: simple_snake.py (Generated by AutoGen)
        
        import pygame
        import time
        import random
        import math
        
        pygame.init()
        
        white = (255, 255, 255)
        yellow = (255, 255, 102)
        green = (0, 255, 0)
        red = (255, 0, 0)
        black = (0, 0, 0)
        blue = (0, 0, 255)
        
        dis_width = 800
        dis_height = 600
        
        dis = pygame.display.set_mode((dis_width, dis_height))
        pygame.display.set_caption('Snake Game')
        
        clock = pygame.time.Clock()
        snake_block = 10
        snake_speed = 30
        font_style = pygame.font.SysFont(None, 50)
        score_font = pygame.font.SysFont(None, 35)
        
        def our_snake(snake_block, snake_List):
            for x in snake_List:
                pygame.draw.rect(dis, green, [x[0], x[1], snake_block, snake_block])
        
        def message(msg,color):
            mesg = font_style.render(msg, True, color)
            dis.blit(mesg, [dis_width / 3, dis_height / 3])
        
        def gameLoop():  # creating a function
            game_over = False
            game_close = False
        
            # snake starting coordinates
            x1 = dis_width / 2
            y1 = dis_height / 2
        
            # snake initial movement direction
            x1_change = 0
            y1_change = 0
        
            # initialize snake length and list of coordinates
            snake_List = []
            Length_of_snake = 1
        
            # random starting point for the food
            foodx = round(random.randrange(0, dis_width - snake_block) / 10.0) * 10.0
            foody = round(random.randrange(0, dis_height - snake_block) / 10.0) * 10.0
        
            # initialize score
            score = 0
        
            # store starting time
            start_time = time.time()
        
            while not game_over:
        
                # Remaining time
                elapsed_time = time.time() - start_time
                remaining_time = 120 - elapsed_time  # 2 minutes game
                if remaining_time <= 0:
                    game_over = True
        
                # event handling loop
                for event in pygame.event.get():
                    if event.type == pygame.QUIT:
                        game_over = True  # when closing window
                    if event.type == pygame.MOUSEBUTTONUP:
                        # get mouse click coordinates
                        pos = pygame.mouse.get_pos()
        
                        # calculate new direction vector from snake to click position
                        x1_change = pos[0] - x1
                        y1_change = pos[1] - y1
        
                        # normalize direction vector
                        norm = math.sqrt(x1_change ** 2 + y1_change ** 2)
                        if norm != 0:
                            x1_change /= norm
                            y1_change /= norm
        
                        # multiply direction vector by step size
                        x1_change *= snake_block
                        y1_change *= snake_block
        
                x1 += x1_change
                y1 += y1_change
                dis.fill(white)
                pygame.draw.rect(dis, red, [foodx, foody, snake_block, snake_block])
                pygame.draw.rect(dis, green, [x1, y1, snake_block, snake_block])
                snake_Head = []
                snake_Head.append(x1)
                snake_Head.append(y1)
                snake_List.append(snake_Head)
                if len(snake_List) > Length_of_snake:
                    del snake_List[0]
        
                our_snake(snake_block, snake_List)
        
                # Bounces the snake back if it hits the edge
                if x1 < 0 or x1 > dis_width:
                    x1_change *= -1
                if y1 < 0 or y1 > dis_height:
                    y1_change *= -1
        
                # Display score
                value = score_font.render("Your Score: " + str(score), True, black)
                dis.blit(value, [0, 0])
        
                # Display remaining time
                time_value = score_font.render("Remaining Time: " + str(int(remaining_time)), True, blue)
                dis.blit(time_value, [0, 30])
        
                pygame.display.update()
        
                # Increase score and length of snake when snake gets the food
                if abs(x1 - foodx) < snake_block and abs(y1 - foody) < snake_block:
                    foodx = round(random.randrange(0, dis_width - snake_block) / 10.0) * 10.0
                    foody = round(random.randrange(0, dis_height - snake_block) / 10.0) * 10.0
                    Length_of_snake += 1
                    score += 10
        
                # Snake movement speed
                clock.tick(snake_speed)
        
            pygame.quit()
            quit()
        
        gameLoop()
        

        Now, let us understand the difference here –

        The first program is a snake game controlled by arrow keys that end if the Snake hits a wall or itself. The second game uses mouse clicks for control, bounces off walls instead of ending, includes a 2-minute timer, and displays the remaining time.

        So, we’ve done it. 🙂

        You can find the detailed code in the following Github link.


        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Building the optimized Indic Language bot by using the Python-based Sarvam AI LLMs – Part 2

        As we discover in our previous post about the Sarvam AI basic capabilities & a glimpse of code review. Today, we’ll finish the rest of the part & some of the matrices comparing against other popular LLMs.

        Before that, you can refer to the previous post for a recap, which is available here.

        Also, we’re providing the demo here –


        Now, let us jump into the rest of the code –

        clsSarvamAI.py (This script will capture the audio input in Indic languages & then provide an LLM response in the form of audio in Indic languages. In this post, we’ll discuss part of the code. In the next part, we’ll be discussing the next important methods. Note that we’re only going to discuss a few important functions here.)

        def createWavFile(self, audio, output_filename="output.wav", target_sample_rate=16000):
              try:
                  # Get the raw audio data as bytes
                  audio_data = audio.get_raw_data()
        
                  # Get the original sample rate
                  original_sample_rate = audio.sample_rate
        
                  # Open the output file in write mode
                  with wave.open(output_filename, 'wb') as wf:
                      # Set parameters: nchannels, sampwidth, framerate, nframes, comptype, compname
                      wf.setnchannels(1)  # Assuming mono audio
                      wf.setsampwidth(2)  # 16-bit audio (int16)
                      wf.setframerate(original_sample_rate)
        
                      # Write audio data in chunks
                      chunk_size = 1024 * 10  # Chunk size (adjust based on memory constraints)
                      for i in range(0, len(audio_data), chunk_size):
                          wf.writeframes(audio_data[i:i+chunk_size])
        
                  # Log the current timestamp
                  var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                  print('Audio Time: ', str(var))
        
                  return 0
        
              except Exception as e:
                  print('Error: <Wav File Creation>: ', str(e))
                  return 1

        Purpose:

        This method saves recorded audio data into a WAV file format.

        What it Does:

        • Takes raw audio data and converts it into bytes.
        • Gets the original sample rate of the audio.
        • Opens a new WAV file in write mode.
        • Sets the parameters for the audio file (like the number of channels, sample width, and frame rate).
        • Writes the audio data into the file in small chunks to manage memory usage.
        • Logs the current time to keep track of when the audio was saved.
        • Returns 0 on success or 1 if there was an error.

        The “createWavFile” method takes the recorded audio and saves it as a WAV file on your computer. It converts the audio into bytes and writes them into small file parts. If something goes wrong, it prints an error message.


        def chunkBengaliResponse(self, text, max_length=500):
              try:
                  chunks = []
                  current_chunk = ""
        
                  # Use regex to split on sentence-ending punctuation
                  sentences = re.split(r'(।|\?|!)', text)
        
                  for i in range(0, len(sentences), 2):
                      sentence = sentences[i] + (sentences[i+1] if i+1 < len(sentences) else '')
        
                      if len(current_chunk) + len(sentence) <= max_length:
                          current_chunk += sentence
                      else:
                          if current_chunk:
                              chunks.append(current_chunk.strip())
                          current_chunk = sentence
        
                  if current_chunk:
                      chunks.append(current_chunk.strip())
        
                  return chunks
              except Exception as e:
                  x = str(e)
                  print('Error: <<Chunking Bengali Response>>: ', x)
        
                  return ''

        Purpose:

        This method breaks down a large piece of text (in Bengali) into smaller, manageable chunks.

        What it Does:

        • Initializes an empty list to store the chunks of text.
        • It uses a regular expression to split the text based on punctuation marks like full stops (।), question marks (?), and exclamation points (!).
        • Iterates through the split sentences to form chunks that do not exceed a specified maximum length (max_length).
        • Adds each chunk to the list until the entire text is processed.
        • Returns the list of chunks or an empty string if an error occurs.

        The chunkBengaliResponse method takes a long Bengali text and splits it into smaller, easier-to-handle parts. It uses punctuation marks to determine where to split. If there’s a problem while splitting, it prints an error message.


        def playWav(self, audio_data):
              try:
                  # Create a wav file object from the audio data
                  WavFile = wave.open(io.BytesIO(audio_data), 'rb')
        
                  # Extract audio parameters
                  channels = WavFile.getnchannels()
                  sample_width = WavFile.getsampwidth()
                  framerate = WavFile.getframerate()
                  n_frames = WavFile.getnframes()
        
                  # Read the audio data
                  audio = WavFile.readframes(n_frames)
                  WavFile.close()
        
                  # Convert audio data to numpy array
                  dtype_map = {1: np.int8, 2: np.int16, 3: np.int32, 4: np.int32}
                  audio_np = np.frombuffer(audio, dtype=dtype_map[sample_width])
        
                  # Reshape audio if stereo
                  if channels == 2:
                      audio_np = audio_np.reshape(-1, 2)
        
                  # Play the audio
                  sd.play(audio_np, framerate)
                  sd.wait()
        
                  return 0
              except Exception as e:
                  x = str(e)
                  print('Error: <<Playing the Wav>>: ', x)
        
                  return 1

        Purpose:

        This method plays audio data stored in a WAV file format.

        What it Does:

        • Reads the audio data from a WAV file object.
        • Extracts parameters like the number of channels, sample width, and frame rate.
        • Converts the audio data into a format that the sound device can process.
        • If the audio is stereo (two channels), it reshapes the data for playback.
        • Plays the audio through the speakers.
        • Returns 0 on success or 1 if there was an error.

        The playWav method takes audio data from a WAV file and plays it through your computer’s speakers. It reads the data and converts it into a format your speakers can understand. If there’s an issue playing the audio, it prints an error message.


          def audioPlayerWorker(self, queue):
              try:
                  while True:
                      var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                      print('Response Audio Time: ', str(var))
                      audio_bytes = queue.get()
                      if audio_bytes is None:
                          break
                      self.playWav(audio_bytes)
                      queue.task_done()
        
                  return 0
              except Exception as e:
                  x = str(e)
                  print('Error: <<Audio Player Worker>>: ', x)
        
                  return 1

        Purpose:

        This method continuously plays audio from a queue until there is no more audio to play.

        What it Does:

        • It enters an infinite loop to keep checking for audio data in the queue.
        • Retrieves audio data from the queue and plays it using the “playWav”-method.
        • Logs the current time each time an audio response is played.
        • It breaks the loop if it encounters a None value, indicating no more audio to play.
        • Returns 0 on success or 1 if there was an error.

        The audioPlayerWorker method keeps checking a queue for new audio to play. It plays each piece of audio as it comes in and stops when there’s no more audio. If there’s an error during playback, it prints an error message.


          async def processChunk(self, chText, url_3, headers):
              try:
                  sarvamAPIKey = self.sarvamAPIKey
                  model_1 = self.model_1
                  langCode_1 = self.langCode_1
                  speakerName = self.speakerName
        
                  print()
                  print('Chunk Response: ')
                  vText = chText.replace('*','').replace(':',' , ')
                  print(vText)
        
                  payload_3 = {
                      "inputs": [vText],
                      "target_language_code": langCode_1,
                      "speaker": speakerName,
                      "pitch": 0.15,
                      "pace": 0.95,
                      "loudness": 2.1,
                      "speech_sample_rate": 16000,
                      "enable_preprocessing": True,
                      "model": model_1
                  }
                  response_3 = requests.request("POST", url_3, json=payload_3, headers=headers)
                  audio_data = response_3.text
                  data = json.loads(audio_data)
                  byte_data = data['audios'][0]
                  audio_bytes = base64.b64decode(byte_data)
        
                  return audio_bytes
              except Exception as e:
                  x = str(e)
                  print('Error: <<Process Chunk>>: ', x)
                  audio_bytes = base64.b64decode('')
        
                  return audio_bytes

        Purpose:

        This asynchronous method processes a chunk of text to generate audio using an external API.

        What it Does:

        • Cleans up the text chunk by removing unwanted characters.
        • Prepares a payload with the cleaned text and other parameters required for text-to-speech conversion.
        • Sends a POST request to an external API to generate audio from the text.
        • Decodes the audio data received from the API (in base64 format) into raw audio bytes.
        • Returns the audio bytes or an empty byte string if there is an error.

        The processChunk method takes a text, sends it to an external service to be converted into speech, and returns the audio data. If something goes wrong, it prints an error message.


          async def processAudio(self, audio):
              try:
                  model_2 = self.model_2
                  model_3 = self.model_3
                  url_1 = self.url_1
                  url_2 = self.url_2
                  url_3 = self.url_3
                  sarvamAPIKey = self.sarvamAPIKey
                  audioFile = self.audioFile
                  WavFile = self.WavFile
                  langCode_1 = self.langCode_1
                  langCode_2 = self.langCode_2
                  speakerGender = self.speakerGender
        
                  headers = {
                      "api-subscription-key": sarvamAPIKey
                  }
        
                  audio_queue = Queue()
                  data = {
                      "model": model_2,
                      "prompt": templateVal_1
                  }
                  files = {
                      "file": (audioFile, open(WavFile, "rb"), "audio/wav")
                  }
        
                  response_1 = requests.post(url_1, headers=headers, data=data, files=files)
                  tempDert = json.loads(response_1.text)
                  regionalT = tempDert['transcript']
                  langCd = tempDert['language_code']
                  statusCd = response_1.status_code
                  payload_2 = {
                      "input": regionalT,
                      "source_language_code": langCode_2,
                      "target_language_code": langCode_1,
                      "speaker_gender": speakerGender,
                      "mode": "formal",
                      "model": model_3,
                      "enable_preprocessing": True
                  }
        
                  response_2 = requests.request("POST", url_2, json=payload_2, headers=headers)
                  regionalT_2 = response_2.text
                  data_ = json.loads(regionalT_2)
                  regionalText = data_['translated_text']
                  chunked_response = self.chunkBengaliResponse(regionalText)
        
                  audio_thread = Thread(target=self.audioPlayerWorker, args=(audio_queue,))
                  audio_thread.start()
        
                  for chText in chunked_response:
                      audio_bytes = await self.processChunk(chText, url_3, headers)
                      audio_queue.put(audio_bytes)
        
                  audio_queue.join()
                  audio_queue.put(None)
                  audio_thread.join()
        
                  var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                  print('Retrieval Time: ', str(var))
        
                  return 0
        
              except Exception as e:
                  x = str(e)
                  print('Error: <<Processing Audio>>: ', x)
        
                  return 1

        Purpose:

        This asynchronous method handles the complete audio processing workflow, including speech recognition, translation, and audio playback.

        What it Does:

        • Initializes various configurations and headers required for processing.
        • Sends the recorded audio to an API to get the transcript and detected language.
        • Translates the transcript into another language using another API.
        • Splits the translated text into smaller chunks using the chunkBengaliResponse method.
        • Starts an audio playback thread to play each processed audio chunk.
        • Sends each text chunk to the processChunk method to convert to speech and adds the audio data to the queue for playback.
        • Waits for all audio chunks to be processed and played before finishing.
        • Logs the current time when the process is complete.
        • Returns 0 on success or 1 if there was an error.

        The “processAudio”-method takes recorded audio, recognizes what was said, translates it into another language, splits the translated text into parts, converts each part into speech, and plays it back. It uses different services to do this; if there’s a problem at any step, it prints an error message.

        And, here is the performance stats (Captured from Sarvam AI website) –


        So, finally, we’ve done it. You can view the complete code in this GitHub link.

        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Building the optimized Indic Language bot by using the Python-based Sarvam AI LLMs – Part 1

        In the rapidly evolving landscape of artificial intelligence, Sarvam AI has emerged as a pioneering force in developing language technologies for Indian languages. This article series aims to provide an in-depth look at Sarvam AI’s Indic APIs, exploring their features, performance, and potential impact on the Indian tech ecosystem.

        This LLM aims to bridge the language divide in India’s digital landscape by providing powerful, accessible AI tools for Indic languages.

        India has 22 official languages and hundreds of dialects, presenting a unique challenge for technology adoption and digital inclusion. Even though all the government work happens in both the official language along with English language.

        Developers can fine-tune the models for specific domains or use cases, improving accuracy for specialized applications.

        As of 2024, Sarvam AI’s Indic APIs support the following languages:

        • Hindi
        • Bengali
        • Tamil
        • Telugu
        • Marathi
        • Gujarati
        • Kannada
        • Malayalam
        • Punjabi
        • Odia

        Before delving into the details, I strongly recommend taking a look at the demo.

        Isn’t this exciting? Let us understand the flow of events in the following diagram –

        The application interacts with Sarvam AI’s API. After interpreting the initial audio inputs from the computer, it uses Sarvam AI’s API to get the answer based on the selected Indic language, Bengali.

        pip install SpeechRecognition==3.10.4
        pip install pydub==0.25.1
        pip install sounddevice==0.5.0
        pip install numpy==1.26.4
        pip install soundfile==0.12.1

        clsSarvamAI.py (This script will capture the audio input in Indic languages & then provide an LLM response in the form of audio in Indic languages. In this post, we’ll discuss part of the code. In the next part, we’ll be discussing the next important methods. Note that we’re only going to discuss a few important functions here.)

        def initializeMicrophone(self):
              try:
                  for index, name in enumerate(sr.Microphone.list_microphone_names()):
                      print(f"Microphone with name \"{name}\" found (device_index={index})")
                  return sr.Microphone()
              except Exception as e:
                  x = str(e)
                  print('Error: <<Initiating Microphone>>: ', x)
        
                  return ''
        
          def realTimeTranslation(self):
              try:
                  WavFile = self.WavFile
                  recognizer = sr.Recognizer()
                  try:
                      microphone = self.initializeMicrophone()
                  except Exception as e:
                      print(f"Error initializing microphone: {e}")
                      return
        
                  with microphone as source:
                      print("Adjusting for ambient noise. Please wait...")
                      recognizer.adjust_for_ambient_noise(source, duration=5)
                      print("Microphone initialized. Start speaking...")
        
                      try:
                          while True:
                              try:
                                  print("Listening...")
                                  audio = recognizer.listen(source, timeout=5, phrase_time_limit=5)
                                  print("Audio captured. Recognizing...")
        
                                  #var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                                  #print('Before Audio Time: ', str(var))
        
                                  self.createWavFile(audio, WavFile)
        
                                  try:
                                      text = recognizer.recognize_google(audio, language="bn-BD")  # Bengali language code
                                      sentences = text.split('')  # Bengali full stop
        
                                      print('Sentences: ')
                                      print(sentences)
                                      print('*'*120)
        
                                      if not text:
                                          print("No speech detected. Please try again.")
                                          continue
        
                                      if str(text).lower() == 'টাটা':
                                          raise BreakOuterLoop("Based on User Choice!")
        
                                      asyncio.run(self.processAudio(audio))
        
                                  except sr.UnknownValueError:
                                      print("Google Speech Recognition could not understand audio")
                                  except sr.RequestError as e:
                                      print(f"Could not request results from Google Speech Recognition service; {e}")
        
                              except sr.WaitTimeoutError:
                                  print("No speech detected within the timeout period. Listening again...")
                              except BreakOuterLoop:
                                  raise
                              except Exception as e:
                                  print(f"An unexpected error occurred: {e}")
        
                              time.sleep(1)  # Short pause before next iteration
        
                      except BreakOuterLoop as e:
                          print(f"Exited : {e}")
        
                  # Removing the temporary audio file that was generated at the begining
                  os.remove(WavFile)
        
                  return 0
              except Exception as e:
                  x = str(e)
                  print('Error: <<Real-time Translation>>: ', x)
        
                  return 1

        Purpose:

        This method is responsible for setting up and initializing the microphone for audio input.

        What it Does:

        • It attempts to list all available microphones connected to the system.
        • It prints the microphone’s name and corresponding device index (a unique identifier) for each microphone.
        • If successful, it returns a microphone object (sr.Microphone()), which can be used later to capture audio.
        • If this process encounters an error (e.g., no microphones being found or an internal error), it catches the exception, prints an error message, and returns an empty string (“).

        The “initializeMicrophone” Method finds all microphones connected to the computer and prints their names. If it finds a microphone, it prepares to use it for recording. If something goes wrong, it tells you what went wrong and stops the process.

        Purpose:
        This Method uses the microphone to handle real-time speech translation from a user. It captures spoken audio, converts it into text, and processes it further.

        What it Does:

        • Initializes a recognizer object (sr.Recognizer()) for speech recognition.
        • Call initializeMicrophone to set up the microphone. If initialization fails, an error message is printed, and the process is stopped.
        • Once the microphone is set up successfully, it adjusts for ambient noise to enhance accuracy.
        • Enters a loop to continuously listen for audio input from the user:
          • It waits for the user to speak and captures the audio.
          • Converts the captured audio to text using Google’s Speech Recognition service, specifying Bengali as the language.
          • If text is successfully captured and recognized:
            • Splits the text into sentences using the Bengali full-stop character.
            • Prints the sentences.
            • It checks if the text is a specific word (“টাটা”), and if so, it raises an exception to stop the loop (indicating that the user wants to exit).
            • Otherwise, it processes the audio asynchronously with processAudio.
          • If no speech is detected or an error occurs, it prints the relevant message and continues listening.
        • If the user decides to exit or if an error occurs, it breaks out of the loop, deletes any temporary audio files created, and returns a status code (0 for success, 1 for failure).


        The “realTimeTranslation” method continuously listens to the microphone for the user to speak. It captures what is said and tries to understand it using Google’s service, specifically for the Bengali language. It then splits what was said into sentences and prints them out. If the user says “টাটা” (which means “goodbye” in Bengali), it stops listening and exits. If it cannot understand the user or if there is a problem, it will let the user know and try again. It will print an error and stop the process if something goes wrong.


        Let’s wait for the next part & enjoy this part.

        Building & deploying a RAG architecture rapidly using Langflow & Python

        I’ve been looking for a solution that can help deploy any RAG solution involving Python faster. It would be more effective if an available UI helped deliver the solution faster. And, here comes the solution that does exactly what I needed – “LangFlow.”

        Before delving into the details, I strongly recommend taking a look at the demo. It’s a great way to get a comprehensive understanding of LangFlow and its capabilities in deploying RAG architecture rapidly.

        Demo

        This describes the entire architecture; hence, I’ll share the architecture components I used to build the solution.

        To know more about RAG-Architecture, please refer to the following link.

        As we all know, we can parse the data from the source website URL (in this case, I’m referring to my photography website to extract the text of one of my blogs) and then embed it into the newly created Astra DB & new collection, where I will be storing the vector embeddings.

        As you can see from the above diagram, the flow that I configured within 5 minutes and the full functionality of writing a complete solution (underlying Python application) within no time that extracts chunks, converts them into embeddings, and finally stores them inside the Astra DB.

        Now, let us understand the next phase, where, based on the ask from a chatbot, I need to convert that question into Vector DB & then find the similarity search to bring the relevant vectors as shown below –

        You need to configure this entire flow by dragging the necessary widgets from the left-side panel as marked in the Blue-Box shown below –

        For this specific use case, we’ve created an instance of Astra DB & then created an empty vector collection. Also, we need to ensure that we generate the API-Key & and provide the right roles assigned with the token. After successfully creating the token, you need to copy the endpoint, token & collection details & paste them into the desired fields of the Astra-DB components inside the LangFlow. Think of it as a framework where one needs to provide all the necessary information to build & run the entire flow successfully.

        Following are some of the important snapshots from the Astra-DB –

        Step – 1

        Step – 2

        Once you run the vector DB population, this will insert extracted text & then convert it into vectors, which will show in the following screenshot –

        You can see the sample vectors along with the text chunks inside the Astra DB data explorer as shown below –

        Some of the critical components are highlighted in the Blue-box which is important for us to monitor the vector embeddings.

        Now, here is how you can modify the current Python code of any available widgets or build your own widget by using the custom widget.

        The first step is to click the code button highlighted in the Red-box as shown below –

        The next step is when you click that button, which will open the detailed Python code representing the entire widget build & its functionality. This button is the place where you can add, modify, or keep it as it is depending upon your need, which will shown below –

        Once one builds the entire solution, you must click the final compile button (shown in the red box), which will eventually compile all the individual widgets. However, you can build the compile button for the individual widgets as soon as you make the solution. So you can pinpoint any potential problems at that very step.

        Let us understand one sample code of a widget. In this case, we will take vector embedding insertion into the Astra DB. Let us see the code –

        from typing import List, Optional, Union
        from langchain_astradb import AstraDBVectorStore
        from langchain_astradb.utils.astradb import SetupMode
        
        from langflow.custom import CustomComponent
        from langflow.field_typing import Embeddings, VectorStore
        from langflow.schema import Record
        from langchain_core.retrievers import BaseRetriever
        
        
        class AstraDBVectorStoreComponent(CustomComponent):
            display_name = "Astra DB"
            description = "Builds or loads an Astra DB Vector Store."
            icon = "AstraDB"
            field_order = ["token", "api_endpoint", "collection_name", "inputs", "embedding"]
        
            def build_config(self):
                return {
                    "inputs": {
                        "display_name": "Inputs",
                        "info": "Optional list of records to be processed and stored in the vector store.",
                    },
                    "embedding": {"display_name": "Embedding", "info": "Embedding to use"},
                    "collection_name": {
                        "display_name": "Collection Name",
                        "info": "The name of the collection within Astra DB where the vectors will be stored.",
                    },
                    "token": {
                        "display_name": "Token",
                        "info": "Authentication token for accessing Astra DB.",
                        "password": True,
                    },
                    "api_endpoint": {
                        "display_name": "API Endpoint",
                        "info": "API endpoint URL for the Astra DB service.",
                    },
                    "namespace": {
                        "display_name": "Namespace",
                        "info": "Optional namespace within Astra DB to use for the collection.",
                        "advanced": True,
                    },
                    "metric": {
                        "display_name": "Metric",
                        "info": "Optional distance metric for vector comparisons in the vector store.",
                        "advanced": True,
                    },
                    "batch_size": {
                        "display_name": "Batch Size",
                        "info": "Optional number of records to process in a single batch.",
                        "advanced": True,
                    },
                    "bulk_insert_batch_concurrency": {
                        "display_name": "Bulk Insert Batch Concurrency",
                        "info": "Optional concurrency level for bulk insert operations.",
                        "advanced": True,
                    },
                    "bulk_insert_overwrite_concurrency": {
                        "display_name": "Bulk Insert Overwrite Concurrency",
                        "info": "Optional concurrency level for bulk insert operations that overwrite existing records.",
                        "advanced": True,
                    },
                    "bulk_delete_concurrency": {
                        "display_name": "Bulk Delete Concurrency",
                        "info": "Optional concurrency level for bulk delete operations.",
                        "advanced": True,
                    },
                    "setup_mode": {
                        "display_name": "Setup Mode",
                        "info": "Configuration mode for setting up the vector store, with options likeSync,Async, orOff”.",
                        "options": ["Sync", "Async", "Off"],
                        "advanced": True,
                    },
                    "pre_delete_collection": {
                        "display_name": "Pre Delete Collection",
                        "info": "Boolean flag to determine whether to delete the collection before creating a new one.",
                        "advanced": True,
                    },
                    "metadata_indexing_include": {
                        "display_name": "Metadata Indexing Include",
                        "info": "Optional list of metadata fields to include in the indexing.",
                        "advanced": True,
                    },
                    "metadata_indexing_exclude": {
                        "display_name": "Metadata Indexing Exclude",
                        "info": "Optional list of metadata fields to exclude from the indexing.",
                        "advanced": True,
                    },
                    "collection_indexing_policy": {
                        "display_name": "Collection Indexing Policy",
                        "info": "Optional dictionary defining the indexing policy for the collection.",
                        "advanced": True,
                    },
                }
        
            def build(
                self,
                embedding: Embeddings,
                token: str,
                api_endpoint: str,
                collection_name: str,
                inputs: Optional[List[Record]] = None,
                namespace: Optional[str] = None,
                metric: Optional[str] = None,
                batch_size: Optional[int] = None,
                bulk_insert_batch_concurrency: Optional[int] = None,
                bulk_insert_overwrite_concurrency: Optional[int] = None,
                bulk_delete_concurrency: Optional[int] = None,
                setup_mode: str = "Sync",
                pre_delete_collection: bool = False,
                metadata_indexing_include: Optional[List[str]] = None,
                metadata_indexing_exclude: Optional[List[str]] = None,
                collection_indexing_policy: Optional[dict] = None,
            ) -> Union[VectorStore, BaseRetriever]:
                try:
                    setup_mode_value = SetupMode[setup_mode.upper()]
                except KeyError:
                    raise ValueError(f"Invalid setup mode: {setup_mode}")
                if inputs:
                    documents = [_input.to_lc_document() for _input in inputs]
        
                    vector_store = AstraDBVectorStore.from_documents(
                        documents=documents,
                        embedding=embedding,
                        collection_name=collection_name,
                        token=token,
                        api_endpoint=api_endpoint,
                        namespace=namespace,
                        metric=metric,
                        batch_size=batch_size,
                        bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                        bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                        bulk_delete_concurrency=bulk_delete_concurrency,
                        setup_mode=setup_mode_value,
                        pre_delete_collection=pre_delete_collection,
                        metadata_indexing_include=metadata_indexing_include,
                        metadata_indexing_exclude=metadata_indexing_exclude,
                        collection_indexing_policy=collection_indexing_policy,
                    )
                else:
                    vector_store = AstraDBVectorStore(
                        embedding=embedding,
                        collection_name=collection_name,
                        token=token,
                        api_endpoint=api_endpoint,
                        namespace=namespace,
                        metric=metric,
                        batch_size=batch_size,
                        bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                        bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                        bulk_delete_concurrency=bulk_delete_concurrency,
                        setup_mode=setup_mode_value,
                        pre_delete_collection=pre_delete_collection,
                        metadata_indexing_include=metadata_indexing_include,
                        metadata_indexing_exclude=metadata_indexing_exclude,
                        collection_indexing_policy=collection_indexing_policy,
                    )
        
                return vector_store
        

        Method: build_config:

        • This method defines the configuration options for the component.
        • Each configuration option includes a display_name and info, which provides details about the option.
        • Some options are marked as advanced, indicating they are optional and more complex.

        Method: build:

        • This method is used to create an instance of the Astra DB Vector Store.
        • It takes several parameters, including embedding, token, api_endpoint, collection_name, and various optional parameters.
        • It converts the setup_mode string to an enum value.
        • If inputs are provided, they are converted to a format suitable for storing in the vector store.
        • Depending on whether inputs are provided, a new vector store from documents can be created, or an empty vector store can be initialized with the given configurations.
        • Finally, it returns the created vector store instance.

        And, here is the the screenshot of your run –

        And, this is the last steps to run the Integrated Chatbot as shown below –

        As one can see the left side highlighted shows the reference text & chunks & the right side actual response.


        So, we’ve done it. And, you know the fun fact. I did this entire workflow within 35 minutes alone. 😛

        I’ll bring some more exciting topics in the coming days from the Python verse.

        To learn more about LangFlow, please click here.

        To learn about Astra DB, you need to click the following link.

        To learn about my blog & photography, you can click the following url.

        Till then, Happy Avenging!  🙂