This site mainly deals with various use cases demonstrated using Python, Data Science, Cloud basics, SQL Server, Oracle, Teradata along with SQL & their implementation. Expecting yours active participation & time. This blog can be access from your TP, Tablet & mobile also. Please provide your feedback.
This is a continuation of my previous post, which can be found here. This will be our last post of this series.
Let us recap the key takaways from our previous post –
Two cloud patterns show how MCP standardizes safe AI-to-system work. Azure “agent factory”: You ask in Teams; Azure AI Foundry dispatches a specialist agent (HR/Sales). The agent calls a specific MCP server (Functions/Logic Apps) for CRM, SharePoint, or SQL via API Management. Entra ID enforces access; Azure Monitor audits. AWS “composable serverless agents”: In Bedrock, domain agents (Financial/IT Ops) invoke Lambda-based MCP tools for DynamoDB, S3, or CloudWatch through API Gateway with IAM and optional VPC. In both, agents never hold credentials; tools map one-to-one to systems, improving security, clarity, scalability, and compliance.
In this post, we’ll discuss the GCP factory pattern.
Unified Workbench Pattern (GCP):
The GCP “unified workbench” pattern prioritizes a unified, data-centric platform for AI development, integrating seamlessly with Vertex AI and Google’s expertise in AI and data analytics. This approach is well-suited for AI-first companies and data-intensive organizations that want to build agents that leverage cutting-edge research tools.
Let’s explore the following diagram based on this –
Imagine Mia, a clinical operations lead, opens a simple app and asks: “Which clinics had the longest wait times this week? Give me a quick summary I can share.”
The app quietly sends Mia’s request to Vertex AI Agent Builder—think of it as the switchboard operator.
Vertex AI picks the Data Analysis agent (the “specialist” for questions like Mia’s).
That agent doesn’t go rummaging through databases. Instead, it uses a safe, preapproved tool—an MCP Server—to query BigQuery, where the data lives.
The tool fetches results and returns them to Mia—no passwords in the open, no risky shortcuts—just the answer, fast and safely.
Now meet Ravi, a developer who asks: “Show me the latest app metrics and confirm yesterday’s patch didn’t break the login table.”
The app routes Ravi’s request to Vertex AI.
Vertex AI chooses the Developer agent.
That agent calls a different tool—an MCP Server designed for Cloud SQL—to check the login table and run a safe query.
Results come back with guardrails intact. If the agent ever needs files, there’s also a Cloud Storage tool ready to fetch or store documents.
Let us understand how the underlying flow of activities took place –
User Interface:
Entry point: Vertex AI console or a custom app.
Sends a single request; no direct credentials or system access exposed to the user.
Orchestration: Vertex AI Agent Builder (MCP Host)
Routes the request to the most suitable agent:
Agent A (Data Analysis) for analytics/BI-style questions.
Agent B (Developer) for application/data-ops tasks.
Tooling via MCP Servers on Cloud Run
Each MCP Server is a purpose-built adapter with least-privilege access to exactly one service:
Server1 → BigQuery (analytics/warehouse) — used by Agent A in this diagram.
Server2 → Cloud Storage (GCS) (files/objects) — available when file I/O is needed.
Server3 → Cloud SQL (relational DB) — used by Agent B in this diagram.
Agents never hold database credentials; they request actions from the right tool.
Enterprise Systems
BigQuery, Cloud Storage, and Cloud SQL are the systems of record that the tools interact with.
Security, Networking, and Observability
GCP IAM: AuthN/AuthZ for Vertex AI and each MCP Server (fine-grained roles, least privilege).
GCP VPC: Private network paths for all Cloud Run MCP Servers (isolation, egress control).
Cloud Monitoring: Metrics, logs, and alerts across agents and tools (auditability, SLOs).
Return Path
Results flow back from the service → MCP Server → Agent → Vertex AI → UI.
Policies and logs track who requested what, when, and how.
Why does this design work?
One entry point for questions.
Clear accountability: specialists (agents) act within guardrails.
Built-in safety (IAM/VPC) and visibility (Monitoring) for trust.
Separation of concerns: agents decide what to do; tools (MCP Servers) decide how to do it.
Scalable: add a new tool (e.g., Pub/Sub or Vertex AI Feature Store) without changing the UI or agents.
Auditable & maintainable: each tool maps to one service with explicit IAM and VPC controls.
So, we’ve concluded the series with the above post. I hope you like it.
I’ll bring some more exciting topics in the coming days from the new advanced world of technology.
Till then, Happy Avenging! 🙂
Note: All the data & scenarios posted here are representative of data & scenarios available on the internet for educational purposes only. There is always room for improvement in this kind of model & the solution associated with it. I’ve shown the basic ways to achieve the same for educational purposes only.
This is a continuation of my previous post, which can be found here.
Let us recap the key takaways from our previous post –
Enterprise AI, utilizing the Model Context Protocol (MCP), leverages an open standard that enables AI systems to securely and consistently access enterprise data and tools. MCP replaces brittle “N×M” integrations between models and systems with a standardized client–server pattern: an MCP host (e.g., IDE or chatbot) runs an MCP client that communicates with lightweight MCP servers, which wrap external systems via JSON-RPC. Servers expose three assets—Resources (data), Tools (actions), and Prompts (templates)—behind permissions, access control, and auditability. This design enables real-time context, reduces hallucinations, supports model- and cloud-agnostic interoperability, and accelerates “build once, integrate everywhere” deployment. A typical flow (e.g., retrieving a customer’s latest order) encompasses intent parsing, authorized tool invocation, query translation/execution, and the return of a normalized JSON result to the model for natural-language delivery. Performance introduces modest overhead (RPC hops, JSON (de)serialization, network transit) and scale considerations (request volume, significant results, context-window pressure). Mitigations include in-memory/semantic caching, optimized SQL with indexing, pagination, and filtering, connection pooling, and horizontal scaling with load balancing. In practice, small latency costs are often outweighed by the benefits of higher accuracy, stronger governance, and a decoupled, scalable architecture.
How does MCP compare with other AI integration approaches?
Compared to other approaches, the Model Context Protocol (MCP) offers a uniquely standardized and secure framework for AI-tool integration, shifting from brittle, custom-coded connections to a universal plug-and-play model. It is not a replacement for underlying systems, such as APIs or databases, but instead acts as an intelligent, secure abstraction layer designed explicitly for AI agents.
MCP vs. Custom API integrations:
This approach was the traditional method for AI integration before standards like MCP emerged.
Custom API integrations (traditional): Each AI application requires a custom-built connector for every external system it needs to access, leading to an N x M integration problem (the number of connectors grows exponentially with the number of models and systems). This approach is resource-intensive, challenging to maintain, and prone to breaking when underlying APIs change.
MCP: The standardized protocol eliminates the N x M problem by creating a universal interface. Tool creators build a single MCP server for their system, and any MCP-compatible AI agent can instantly access it. This process decouples the AI model from the underlying implementation details, drastically reducing integration and maintenance costs.
For more detailed information, please refer to the following link.
MCP vs. Retrieval-Augmented Generation (RAG):
RAG is a technique that retrieves static documents to augment an LLM’s knowledge, while MCP focuses on live interactions. They are complementary, not competing.
RAG:
Focus: Retrieving and summarizing static, unstructured data, such as documents, manuals, or knowledge bases.
Best for: Providing background knowledge and general information, as in a policy lookup tool or customer service bot.
Data type: Unstructured, static knowledge.
MCP:
Focus: Accessing and acting on real-time, structured, and dynamic data from databases, APIs, and business systems.
Best for: Agentic use cases involving real-world actions, like pulling live sales reports from a CRM or creating a ticket in a project management tool.
Data type: Structured, real-time, and dynamic data.
MCP vs. LLM plugins and extensions:
Before MCP, platforms like OpenAI offered proprietary plugin systems to extend LLM capabilities.
LLM plugins:
Proprietary: Tied to a specific AI vendor (e.g., OpenAI).
Limited: Rely on the vendor’s API function-calling mechanism, which focuses on call formatting but not standardized execution.
Centralized: Managed by the AI vendor, creating a risk of vendor lock-in.
MCP:
Open standard: Based on a public, interoperable protocol (JSON-RPC 2.0), making it model-agnostic and usable across different platforms.
Infrastructure layer: Provides a standardized infrastructure for agents to discover and use any compliant tool, regardless of the underlying LLM.
Decentralized: Promotes a flexible ecosystem and reduces the risk of vendor lock-in.
How enterprise AI with MCP has opened up a specific Architecture pattern for Azure, AWS & GCP?
Microsoft Azure:
The “agent factory” pattern: Azure focuses on providing managed services for building and orchestrating AI agents, tightly integrated with its enterprise security and governance features. The MCP architecture is a core component of the Azure AI Foundry, serving as a secure, managed “agent factory.”
Azure architecture pattern with MCP:
AI orchestration layer: The Azure AI Agent Service, within Azure AI Foundry, acts as the central host and orchestrator. It provides the control plane for creating, deploying, and managing multiple specialized agents, and it natively supports the MCP standard.
AI model layer: Agents in the Foundry can be powered by various models, including those from Azure OpenAI Service, commercial models from partners, or open-source models.
MCP server and tool layer: MCP servers are deployed using serverless functions, such as Azure Functions or Azure Logic Apps, to wrap existing enterprise systems. These servers expose tools for interacting with enterprise data sources like SharePoint, Azure AI Search, and Azure Blob Storage.
Data and security layer: Data is secured using Microsoft Entra ID (formerly Azure AD) for authentication and access control, with robust security policies enforced via Azure API Management. Access to data sources, such as databases and storage, is managed securely through private networks and Managed Identity.
Amazon Web Services (AWS):
The “composable serverless agent” pattern: AWS emphasizes a modular, composable, and serverless approach, leveraging its extensive portfolio of services to build sophisticated, flexible, and scalable AI solutions. The MCP architecture here aligns with the principle of creating lightweight, event-driven services that AI agents can orchestrate.
AWS architecture pattern with MCP:
The AI orchestration layer, which includesAmazon Bedrock Agents or custom agent frameworks deployed via AWS Fargate or Lambda, acts as the MCP hosts. Bedrock Agents provide built-in orchestration, while custom agents offer greater flexibility and customization options.
AI model layer: The models are sourced from Amazon Bedrock, which provides a wide selection of foundation models.
MCP server and tool layer: MCP servers are deployed as serverless AWS Lambda functions. AWS offers pre-built MCP servers for many of its services, including the AWS Serverless MCP Server for managing serverless applications and the AWS Lambda Tool MCP Server for invoking existing Lambda functions as tools.
Data and security layer: Access is tightly controlled using AWS Identity and Access Management (IAM) roles and policies, with fine-grained permissions for each MCP server. Private data sources like databases (Amazon DynamoDB) and storage (Amazon S3) are accessed securely within a Virtual Private Cloud (VPC).
Google Cloud Platform (GCP):
The “unified workbench” pattern: GCP focuses on providing a unified, open, and data-centric platform for AI development. The MCP architecture on GCP integrates natively with the Vertex AI platform, treating MCP servers as first-class tools that can be dynamically discovered and used within a single workbench.
GCP architecture pattern with MCP:
AI orchestration layer: The Vertex AI Agent Builder serves as the central environment for building and managing conversational AI and other agents. It orchestrates workflows and manages tool invocation for agents.
AI model layer: Agents use foundation models available through the Vertex AI Model Garden or the Gemini API.
MCP server and tool layer: MCP servers are deployed as containerized microservices on Cloud Run or managed by services like App Engine. These servers contain tools that interact with GCP services, such as BigQuery, Cloud Storage, and Cloud SQL. GCP offers pre-built MCP server implementations, such as the GCP MCP Toolbox, for integration with its databases.
Data and security layer:Vertex AI Vector Search and other data sources are encapsulated within the MCP server tools to provide contextual information. Access to these services is managed by Identity and Access Management (IAM) and secured through virtual private clouds. The MCP server can leverage Vertex AI Context Caching for improved performance.
Note that all the native technology is referred to in each respective cloud. Hence, some of the better technologies can be used in place of the tool mentioned here. This is more of a concept-level comparison rather than industry-wise implementation approaches.
We’ll go ahead and conclude this post here & continue discussing on a further deep dive in the next post.
Till then, Happy Avenging! 🙂
Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. There is always room for improvement in this kind of model & the solution associated with it. I’ve shown the basic ways to achieve the same for educational purposes only.
As a continuation of the previous post, I would like to continue my discussion about the implementation of MCP protocols among agents. But before that, I want to add the quick demo one more time to recap our objectives.
Let us recap the process flow –
Also, understand the groupings of scripts by each group as posted in the previous post –
Great! Now, we’ll continue with the main discussion.
CODE:
clsYouTubeVideoProcessor.py (This class processes the transcripts from YouTube. It may also translate them into English for non-native speakers.)
defextract_youtube_id(youtube_url):"""Extract YouTube video ID from URL""" youtube_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url)if youtube_id_match:return youtube_id_match.group(1)returnNonedefget_youtube_transcript(youtube_url):"""Get transcript from YouTube video""" video_id =extract_youtube_id(youtube_url)ifnot video_id:return{"error":"Invalid YouTube URL or ID"}try: transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)# First try to get manual transcriptstry: transcript = transcript_list.find_manually_created_transcript(["en"]) transcript_data = transcript.fetch()print(f"Debug - Manual transcript format: {type(transcript_data)}")if transcript_data andlen(transcript_data)>0:print(f"Debug - First item type: {type(transcript_data[0])}")print(f"Debug - First item sample: {transcript_data[0]}")return{"text": transcript_data,"language":"en","auto_generated":False}exceptExceptionas e:print(f"Debug - No manual transcript: {str(e)}")# If no manual English transcript, try any available transcripttry: available_transcripts =list(transcript_list)if available_transcripts: transcript = available_transcripts[0]print(f"Debug - Using transcript in language: {transcript.language_code}") transcript_data = transcript.fetch()print(f"Debug - Auto transcript format: {type(transcript_data)}")if transcript_data andlen(transcript_data)>0:print(f"Debug - First item type: {type(transcript_data[0])}")print(f"Debug - First item sample: {transcript_data[0]}")return{"text": transcript_data,"language": transcript.language_code,"auto_generated": transcript.is_generated}else:return{"error":"No transcripts available for this video"}exceptExceptionas e:return{"error":f"Error getting transcript: {str(e)}"}exceptExceptionas e:return{"error":f"Error getting transcript list: {str(e)}"}# ----------------------------------------------------------------------------------# YouTube Video Processor# ----------------------------------------------------------------------------------classclsYouTubeVideoProcessor:"""Process YouTube videos using the agent system"""def__init__(self,documentation_agent,translation_agent,research_agent):self.documentation_agent = documentation_agentself.translation_agent = translation_agentself.research_agent = research_agentdefprocess_youtube_video(self,youtube_url):"""Process a YouTube video"""print(f"Processing YouTube video: {youtube_url}")# Extract transcript transcript_result =get_youtube_transcript(youtube_url)if"error"in transcript_result:return{"error": transcript_result["error"]}# Start a new conversation conversation_id =self.documentation_agent.start_processing()# Process transcript segments transcript_data = transcript_result["text"] transcript_language = transcript_result["language"]print(f"Debug - Type of transcript_data: {type(transcript_data)}")# For each segment, detect language and translate if needed processed_segments =[]try:# Make sure transcript_data is a list of dictionaries with text and start fieldsifisinstance(transcript_data,list):for idx, segment inenumerate(transcript_data):print(f"Debug - Processing segment {idx}, type: {type(segment)}")# Extract text properly based on the typeifisinstance(segment,dict)and"text"in segment: text = segment["text"] start = segment.get("start",0)else:# Try to access attributes for non-dict typestry: text = segment.text start =getattr(segment,"start",0)exceptAttributeError:# If all else fails, convert to string text =str(segment) start = idx *5# Arbitrary timestampprint(f"Debug - Extracted text: {text[:30]}...")# Create a standardized segment std_segment ={"text": text,"start": start}# Process through translation agent translation_result =self.translation_agent.process_text(text, conversation_id)# Update segment with translation information segment_with_translation ={**std_segment,"translation_info": translation_result}# Use translated text for documentationif"final_text"in translation_result and translation_result["final_text"]!= text: std_segment["processed_text"]= translation_result["final_text"]else: std_segment["processed_text"]= text processed_segments.append(segment_with_translation)else:# If transcript_data is not a list, treat it as a single text blockprint(f"Debug - Transcript is not a list, treating as single text") text =str(transcript_data) std_segment ={"text": text,"start":0} translation_result =self.translation_agent.process_text(text, conversation_id) segment_with_translation ={**std_segment,"translation_info": translation_result}if"final_text"in translation_result and translation_result["final_text"]!= text: std_segment["processed_text"]= translation_result["final_text"]else: std_segment["processed_text"]= text processed_segments.append(segment_with_translation)exceptExceptionas e:print(f"Debug - Error processing transcript: {str(e)}")return{"error":f"Error processing transcript: {str(e)}"}# Process the transcript with the documentation agent documentation_result =self.documentation_agent.process_transcript( processed_segments, conversation_id)return{"youtube_url": youtube_url,"transcript_language": transcript_language,"processed_segments": processed_segments,"documentation": documentation_result,"conversation_id": conversation_id}
Let us understand this step-by-step:
Part 1: Getting the YouTube Transcript
defextract_youtube_id(youtube_url): ...
This extracts the unique video ID from any YouTube link.
defget_youtube_transcript(youtube_url): ...
This gets the actual spoken content of the video.
It tries to get a manual transcript first (created by humans).
If not available, it falls back to an auto-generated version (created by YouTube’s AI).
If nothing is found, it gives back an error message like: “Transcript not available.”
Part 2: Processing the Video with Agents
classclsYouTubeVideoProcessor: ...
This is like the control center that tells each intelligent agent what to do with the transcript. Here are the detailed steps:
1. Start the Process
defprocess_youtube_video(self,youtube_url): ...
The system starts with a YouTube video link.
It prints a message like: “Processing YouTube video: [link]”
2. Extract the Transcript
The system runs the get_youtube_transcript() function.
If it fails, it returns an error (e.g., invalid link or no subtitles available).
3. Start a “Conversation”
The documentation agent begins a new session, tracked by a unique conversation ID.
Think of this like opening a new folder in a shared team workspace to store everything related to this video.
4. Go Through Each Segment of the Transcript
The spoken text is often broken into small parts (segments), like subtitles.
For each part:
It checks the text.
It finds out the time that part was spoken.
It sends it to the translation agent to clean up or translate the text.
5. Translate (if needed)
If the translation agent finds a better or translated version, it replaces the original.
Otherwise, it keeps the original.
6. Prepare for Documentation
After translation, the segment is passed to the documentation agent.
This agent might:
Summarize the content,
Highlight important terms,
Structure it into a readable format.
7. Return the Final Result
The system gives back a structured package with:
The video link
The original language
The transcript in parts (processed and translated)
A documentation summary
The conversation ID (for tracking or further updates)
clsDocumentationAgent.py (This is the main class that will be part of the document agents.)
classclsDocumentationAgent:"""Documentation Agent built with LangChain"""def__init__(self,agent_id:str,broker: clsMCPBroker):self.agent_id = agent_idself.broker = brokerself.broker.register_agent(agent_id)# Initialize LangChain componentsself.llm =ChatOpenAI(model="gpt-4-0125-preview",temperature=0.1,api_key=OPENAI_API_KEY)# Create toolsself.tools =[clsSendMessageTool(sender_id=self.agent_id,broker=self.broker)]# Set up LLM with toolsself.llm_with_tools =self.llm.bind(tools=[tool.tool_config for tool inself.tools])# Setup memoryself.memory =ConversationBufferMemory(memory_key="chat_history",return_messages=True)# Create promptself.prompt = ChatPromptTemplate.from_messages([("system","""You are a Documentation Agent for YouTube video transcripts. Your responsibilities include: 1. Process YouTube video transcripts 2. Identify key points, topics, and main ideas 3. Organize content into a coherent and structured format 4. Create concise summaries 5. Request research information when necessary When you need additional context or research, send a request to the Research Agent. Always maintain a professional tone and ensure your documentation is clear and organized."""),MessagesPlaceholder(variable_name="chat_history"),("human","{input}"),MessagesPlaceholder(variable_name="agent_scratchpad"),])# Create agentself.agent =({"input":lambdax: x["input"],"chat_history":lambdax:self.memory.load_memory_variables({})["chat_history"],"agent_scratchpad":lambdax:format_to_openai_tool_messages(x["intermediate_steps"]),}|self.prompt|self.llm_with_tools|OpenAIToolsAgentOutputParser())# Create agent executorself.agent_executor =AgentExecutor(agent=self.agent,tools=self.tools,verbose=True,memory=self.memory)# Video dataself.current_conversation_id =Noneself.video_notes ={}self.key_points =[]self.transcript_segments =[]defstart_processing(self)->str:"""Start processing a new video"""self.current_conversation_id =str(uuid.uuid4())self.video_notes ={}self.key_points =[]self.transcript_segments =[]returnself.current_conversation_iddefprocess_transcript(self,transcript_segments,conversation_id=None):"""Process a YouTube transcript"""ifnot conversation_id: conversation_id =self.start_processing()self.current_conversation_id = conversation_id# Store transcript segmentsself.transcript_segments = transcript_segments# Process segments processed_segments =[]for segment in transcript_segments: processed_result =self.process_segment(segment) processed_segments.append(processed_result)# Generate summary summary =self.generate_summary()return{"processed_segments": processed_segments,"summary": summary,"conversation_id": conversation_id}defprocess_segment(self,segment):"""Process individual transcript segment""" text = segment.get("text","") start = segment.get("start",0)# Use LangChain agent to process the segment result =self.agent_executor.invoke({"input":f"Process this video transcript segment at timestamp {start}s: {text}. If research is needed, send a request to the research_agent."})# Update video notes timestamp = startself.video_notes[timestamp]={"text": text,"analysis": result["output"]}return{"timestamp": timestamp,"text": text,"analysis": result["output"]}defhandle_mcp_message(self,message: clsMCPMessage)-> Optional[clsMCPMessage]:"""Handle an incoming MCP message"""if message.message_type =="research_response":# Process research information received from Research Agent research_info = message.content.get("text","") result =self.agent_executor.invoke({"input":f"Incorporate this research information into video analysis: {research_info}"})# Send acknowledgment back to Research Agent response =clsMCPMessage(sender=self.agent_id,receiver=message.sender,message_type="acknowledgment",content={"text":"Research information incorporated into video analysis."},reply_to=message.id,conversation_id=message.conversation_id)self.broker.publish(response)return responseelif message.message_type =="translation_response":# Process translation response from Translation Agent translation_result = message.content# Process the translated textif"final_text"in translation_result: text = translation_result["final_text"] original_text = translation_result.get("original_text","") language_info = translation_result.get("language",{}) result =self.agent_executor.invoke({"input":f"Process this translated text: {text}\nOriginal language: {language_info.get('language','unknown')}\nOriginal text: {original_text}"})# Update notes with translation informationfor timestamp, note inself.video_notes.items():if note["text"]== original_text: note["translated_text"]= text note["language"]= language_infobreakreturnNonereturnNonedefrun(self):"""Run the agent to listen for MCP messages"""print(f"Documentation Agent {self.agent_id} is running...")whileTrue: message =self.broker.get_message(self.agent_id,timeout=1)if message:self.handle_mcp_message(message) time.sleep(0.1)defgenerate_summary(self)->str:"""Generate a summary of the video"""ifnotself.video_notes:return"No video data available to summarize." all_notes ="\n".join([f"{ts}: {note['text']}"for ts, note inself.video_notes.items()]) result =self.agent_executor.invoke({"input":f"Generate a concise summary of this YouTube video, including key points and topics:\n{all_notes}"})return result["output"]
Let us understand the key methods in a step-by-step manner:
The Documentation Agent is like a smart assistant that watches a YouTube video, takes notes, pulls out important ideas, and creates a summary — almost like a professional note-taker trained to help educators, researchers, and content creators. It works with a team of other assistants, like a Translator Agent and a Research Agent, and they all talk to each other through a messaging system.
1. Starting to Work on a New Video
defstart_processing(self)->str
When a new video is being processed:
A new project ID is created.
Old notes and transcripts are cleared to start fresh.
2. Processing the Whole Transcript
defprocess_transcript(...)
This is where the assistant:
Takes in the full transcript (what was said in the video).
Breaks it into small parts (like subtitles).
Sends each part to the smart brain for analysis.
Collects the results.
Finally, a summary of all the main ideas is created.
3. Processing One Transcript Segment at a Time
defprocess_segment(self,segment)
For each chunk of the video:
The assistant reads the text and timestamp.
It asks GPT-4 to analyze it and suggest important insights.
It saves that insight along with the original text and timestamp.
4. Handling Incoming Messages from Other Agents
defhandle_mcp_message(self,message)
The assistant can also receive messages from teammates (other agents):
If the message is from the Research Agent:
It reads new information and adds it to its notes.
It replies with a thank-you message to say it got the research.
If the message is from the Translation Agent:
It takes the translated version of a transcript.
Updates its notes to reflect the translated text and its language.
This is like a team of assistants emailing back and forth to make sure the notes are complete and accurate.
5. Summarizing the Whole Video
defgenerate_summary(self)
After going through all the transcript parts, the agent asks GPT-4 to create a short, clean summary — identifying:
Main ideas
Key talking points
Structure of the content
The final result is clear, professional, and usable in learning materials or documentation.
clsResearchAgent.py (This is the main class that implements the research agent.)
classclsResearchAgent:"""Research Agent built with AutoGen"""def__init__(self,agent_id:str,broker: clsMCPBroker):self.agent_id = agent_idself.broker = brokerself.broker.register_agent(agent_id)# Configure AutoGen directly with API keyifnot OPENAI_API_KEY:print("Warning: OPENAI_API_KEY not set for ResearchAgent")# Create config list directly instead of loading from file config_list =[{"model":"gpt-4-0125-preview","api_key": OPENAI_API_KEY}]# Create AutoGen assistant for researchself.assistant =AssistantAgent(name="research_assistant",system_message="""You are a Research Agent for YouTube videos. Your responsibilities include: 1. Research topics mentioned in the video 2. Find relevant information, facts, references, or context 3. Provide concise, accurate information to support the documentation 4. Focus on delivering high-quality, relevant information Respond directly to research requests with clear, factual information.""",llm_config={"config_list": config_list,"temperature":0.1})# Create user proxy to handle message passingself.user_proxy =UserProxyAgent(name="research_manager",human_input_mode="NEVER",code_execution_config={"work_dir":"coding","use_docker":False},default_auto_reply="Working on the research request...")# Current conversation trackingself.current_requests ={}defhandle_mcp_message(self,message: clsMCPMessage)-> Optional[clsMCPMessage]:"""Handle an incoming MCP message"""if message.message_type =="request":# Process research request from Documentation Agent request_text = message.content.get("text","")# Use AutoGen to process the research requestdefresearch_task():self.user_proxy.initiate_chat(self.assistant,message=f"Research request for YouTube video content: {request_text}. Provide concise, factual information.")# Return last assistant messagereturnself.assistant.chat_messages[self.user_proxy.name][-1]["content"]# Execute research task research_result =research_task()# Send research results back to Documentation Agent response =clsMCPMessage(sender=self.agent_id,receiver=message.sender,message_type="research_response",content={"text": research_result},reply_to=message.id,conversation_id=message.conversation_id)self.broker.publish(response)return responsereturnNonedefrun(self):"""Run the agent to listen for MCP messages"""print(f"Research Agent {self.agent_id} is running...")whileTrue: message =self.broker.get_message(self.agent_id,timeout=1)if message:self.handle_mcp_message(message) time.sleep(0.1)
Let us understand the key methods in detail.
1. Receiving and Responding to Research Requests
defhandle_mcp_message(self,message)
When the Research Agent gets a message (like a question or request for info), it:
Reads the message to see what needs to be researched.
Asks GPT-4 to find helpful, accurate info about that topic.
Sends the answer back to whoever asked the question (usually the Documentation Agent).
clsTranslationAgent.py (This is the main class that represents the translation agent)
classclsTranslationAgent:"""Agent for language detection and translation"""def__init__(self,agent_id:str,broker: clsMCPBroker):self.agent_id = agent_idself.broker = brokerself.broker.register_agent(agent_id)# Initialize language detectorself.language_detector =clsLanguageDetector()# Initialize translation serviceself.translation_service =clsTranslationService()defprocess_text(self,text,conversation_id=None):"""Process text: detect language and translate if needed, handling mixed language content"""ifnot conversation_id: conversation_id =str(uuid.uuid4())# Detect language with support for mixed language content language_info =self.language_detector.detect(text)# Decide if translation is needed needs_translation =True# Pure English content doesn't need translationif language_info["language_code"]=="en-IN"or language_info["language_code"]=="unknown": needs_translation =False# For mixed language, check if it's primarily Englishif language_info.get("is_mixed",False)and language_info.get("languages",[]): english_langs =[ lang for lang in language_info.get("languages",[])if lang["language_code"]=="en-IN"or lang["language_code"].startswith("en-")]# If the highest confidence language is English and > 60% confident, don't translateif english_langs and english_langs[0].get("confidence",0)>0.6: needs_translation =Falseif needs_translation:# Translate using the appropriate service based on language detection translation_result =self.translation_service.translate(text, language_info)return{"original_text": text,"language": language_info,"translation": translation_result,"final_text": translation_result.get("translated_text", text),"conversation_id": conversation_id}else:# Already English or unknown language, return as isreturn{"original_text": text,"language": language_info,"translation":{"provider":"none"},"final_text": text,"conversation_id": conversation_id}defhandle_mcp_message(self,message: clsMCPMessage)-> Optional[clsMCPMessage]:"""Handle an incoming MCP message"""if message.message_type =="translation_request":# Process translation request from Documentation Agent text = message.content.get("text","")# Process the text result =self.process_text(text, message.conversation_id)# Send translation results back to requester response =clsMCPMessage(sender=self.agent_id,receiver=message.sender,message_type="translation_response",content=result,reply_to=message.id,conversation_id=message.conversation_id)self.broker.publish(response)return responsereturnNonedefrun(self):"""Run the agent to listen for MCP messages"""print(f"Translation Agent {self.agent_id} is running...")whileTrue: message =self.broker.get_message(self.agent_id,timeout=1)if message:self.handle_mcp_message(message) time.sleep(0.1)
Let us understand the key methods in step-by-step manner:
1. Understanding and Translating Text:
defprocess_text(...)
This is the core job of the agent. Here’s what it does with any piece of text:
Step 1: Detect the Language
It tries to figure out the language of the input text.
It can handle cases where more than one language is mixed together, which is common in casual speech or subtitles.
Step 2: Decide Whether to Translate
If the text is clearly in English, or it’s unclear what the language is, it decides not to translate.
If the text is mostly in another language or has less than 60% confidence in being English, it will translate it into English.
Step 3: Translate (if needed)
If translation is required, it uses the translation service to do the job.
Then it packages all the information: the original text, detected language, the translated version, and a unique conversation ID.
Step 4: Return the Results
If no translation is needed, it returns the original text and a note saying “no translation was applied.”
2. Receiving Messages and Responding
defhandle_mcp_message(...)
The agent listens for messages from other agents. When someone asks it to translate something:
It takes the text from the message.
Runs it through the process_text function (as explained above).
Sends the translated (or original) result to the person who asked.
clsTranslationService.py (This is the actual work process of translation by the agent)
classclsTranslationService:"""Translation service using multiple providers with support for mixed languages"""def__init__(self):# Initialize Sarvam AI clientself.sarvam_api_key = SARVAM_API_KEYself.sarvam_url ="https://api.sarvam.ai/translate"# Initialize Google Cloud Translation client using simple HTTP requestsself.google_api_key = GOOGLE_API_KEYself.google_translate_url ="https://translation.googleapis.com/language/translate/v2"deftranslate_with_sarvam(self,text,source_lang,target_lang="en-IN"):"""Translate text using Sarvam AI (for Indian languages)"""ifnotself.sarvam_api_key:return{"error":"Sarvam API key not set"} headers ={"Content-Type":"application/json","api-subscription-key":self.sarvam_api_key} payload ={"input": text,"source_language_code": source_lang,"target_language_code": target_lang,"speaker_gender":"Female","mode":"formal","model":"mayura:v1"}try: response = requests.post(self.sarvam_url,headers=headers,json=payload)if response.status_code ==200:return{"translated_text": response.json().get("translated_text",""),"provider":"sarvam"}else:return{"error":f"Sarvam API error: {response.text}","provider":"sarvam"}exceptExceptionas e:return{"error":f"Error calling Sarvam API: {str(e)}","provider":"sarvam"}deftranslate_with_google(self,text,target_lang="en"):"""Translate text using Google Cloud Translation API with direct HTTP request"""ifnotself.google_api_key:return{"error":"Google API key not set"}try:# Using the translation API v2 with API key params ={"key":self.google_api_key,"q": text,"target": target_lang} response = requests.post(self.google_translate_url,params=params)if response.status_code ==200: data = response.json() translation = data.get("data",{}).get("translations",[{}])[0]return{"translated_text": translation.get("translatedText",""),"detected_source_language": translation.get("detectedSourceLanguage",""),"provider":"google"}else:return{"error":f"Google API error: {response.text}","provider":"google"}exceptExceptionas e:return{"error":f"Error calling Google Translation API: {str(e)}","provider":"google"}deftranslate(self,text,language_info):"""Translate text to English based on language detection info"""# If already English or unknown language, return as isif language_info["language_code"]=="en-IN"or language_info["language_code"]=="unknown":return{"translated_text": text,"provider":"none"}# Handle mixed language contentif language_info.get("is_mixed",False)and language_info.get("languages",[]):# Strategy for mixed language: # 1. If one of the languages is English, don't translate the entire text, as it might distort English portions# 2. If no English but contains Indian languages, use Sarvam as it handles code-mixing better# 3. Otherwise, use Google Translate for the primary detected language has_english =False has_indian =Falsefor lang in language_info.get("languages",[]):if lang["language_code"]=="en-IN"or lang["language_code"].startswith("en-"): has_english =Trueif lang.get("is_indian",False): has_indian =Trueif has_english:# Contains English - use Google for full text as it handles code-mixing wellreturnself.translate_with_google(text)elif has_indian:# Contains Indian languages - use Sarvam# Use the highest confidence Indian language as source indian_langs =[lang for lang in language_info.get("languages",[])if lang.get("is_indian",False)]if indian_langs:# Sort by confidence indian_langs.sort(key=lambdax: x.get("confidence",0),reverse=True) source_lang = indian_langs[0]["language_code"]returnself.translate_with_sarvam(text, source_lang)else:# Fallback to primary languageif language_info["is_indian"]:returnself.translate_with_sarvam(text, language_info["language_code"])else:returnself.translate_with_google(text)else:# No English, no Indian languages - use Google for primary languagereturnself.translate_with_google(text)else:# Not mixed language - use standard approachif language_info["is_indian"]:# Use Sarvam AI for Indian languagesreturnself.translate_with_sarvam(text, language_info["language_code"])else:# Use Google for other languagesreturnself.translate_with_google(text)
This Translation Service is like a smart translator that knows how to:
Detect what language the text is written in,
Choose the best translation provider depending on the language (especially for Indian languages),
And then translate the text into English.
It supports mixed-language content (such as Hindi-English in one sentence) and uses either Google Translate or Sarvam AI, a translation service designed for Indian languages.
Now, let us understand the key methods in a step-by-step manner:
1. Translating Using Google Translate
deftranslate_with_google(...)
This function uses Google Translate:
It sends the text, asks for English as the target language, and gets a translation back.
It also detects the source language automatically.
If successful, it returns the translated text and the detected original language.
If there’s an error, it returns a message saying what went wrong.
Best For: Non-Indian languages (like Spanish, French, Chinese) and content that is not mixed with English.
2. Main Translation Logic
deftranslate(self,text,language_info)
This is the decision-maker. Here’s how it works:
Case 1: No Translation Needed
If the text is already in English or the language is unknown, it simply returns the original text.
Case 2: Mixed Language (e.g., Hindi + English)
If the text contains more than one language:
✅ If one part is English → use Google Translate (it’s good with mixed languages).
✅ If it includes Indian languages only → use Sarvam AI (better at handling Indian content).
✅ If it’s neither English nor Indian → use Google Translate.
The service checks how confident it is about each language in the mix and chooses the most likely one to translate from.
Case 3: Single Language
If the text is only in one language:
✅ If it’s an Indian language (like Bengali, Tamil, or Marathi), use Sarvam AI.
✅ If it’s any other language, use Google Translate.
So, we’ve done it.
I’ve included the complete working solutions for you in the GitHub Link.
We’ll cover the detailed performance testing, Optimized configurations & many other useful details in our next post.
Till then, Happy Avenging! 🙂
Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. There is always room for improvement in this kind of model & the solution associated with it. I’ve shown the basic ways to achieve the same for educational purposes only.
Today, we’ll discuss another topic in our two-part series. We will understand the importance of the MCP protocol for communicating between agents.
This will be an in-depth highly technical as well as depicting using easy-to-understand visuals.
But, before that, let us understand the demo first.
Isn’t it exciting?
MCP Protocol:
Let us first understand in easy language about the MCP protocol.
MCP (Multi-Agent Communication Protocol) is a custom message exchange system that facilitates structured and scalable communication among multiple AI agents operating within an application. These agents collaborate asynchronously or in real-time to complete complex tasks by sharing results, context, and commands through a common messaging layer.
How MCP Protocol Helps:
Feature
Benefit
Agent-Oriented Architecture
Each agent handles a focused task, improving modularity and scalability.
Event-Driven Message Passing
Agents communicate based on triggers, not polling—leading to faster and efficient responses.
Structured Communication Format
All messages follow a standard format (e.g., JSON) with metadata for sender, recipient, type, and payload.
State Preservation
Agents maintain context across messages using memory (e.g., ConversationBufferMemory) to ensure coherence.
How It Works (Step-by-Step):
📥 User uploads or streams a video.
🧑💻 MCP Protocol triggers the Transcription Agent to start converting audio into text.
🌐 Translation Agent receives this text (if a different language is needed).
🧾 Summarization Agent receives the translated or original transcript and generates a concise summary.
📚 Research Agent checks for references or terminology used in the video.
📄 Documentation Agent compiles the output into a structured report.
🔁 All communication between agents flows through MCP, ensuring consistent message delivery and coordination.
Now, let us understand the solution that we intend to implement for our solutions:
This app provides live summarization and contextual insights from videos such as webinars, interviews, or YouTube recordings using multiple cooperating AI agents. These agents may include:
Transcription Agent: Converts spoken words to text.
Translation Agent: Translates text to different languages (if needed).
Summarization Agent: Generates concise summaries.
Research Agent: Finds background or supplementary data related to the discussion.
Documentation Agent: Converts outputs into structured reports or learning materials.
We need to understand one more thing before deep diving into the code. Part of your conversation may be mixed, like part Hindi & part English. So, in that case, it will break the sentences into chunks & then convert all of them into the same language. Hence, the following rules are applied while translating the sentences –
Now, we will go through the basic frame of the system & try to understand how it fits all the principles that we discussed above for this particular solution mapped against the specific technology –
Documentation Agent built with the LangChain framework
Research Agent built with the AutoGen framework
MCP Broker for seamless communication between agents
Process Flow:
Let us understand from the given picture the flow of the process that our app is trying to implement –
Great! So, now, we’ll focus on some of the key Python scripts & go through their key features.
But, before that, we share the group of scripts that belong to specific tasks.
Message-Chaining Protocol (MCP) Implementation:
clsMCPMessage.py
clsMCPBroker.py
YouTube Transcript Extraction:
clsYouTubeVideoProcessor.py
Language Detection:
clsLanguageDetector.py
Translation Services & Agents:
clsTranslationAgent.py
clsTranslationService.py
Documentation Agent:
clsDocumentationAgent.py
Research Agent:
clsResearchAgent.py
Now, we’ll review some of the script in this post, along with the next post, as a continuation from this post.
CODE:
clsMCPMessage.py (This is one of the main or key scripts that will help enable implementation of the MCP protocols)
classclsMCPMessage(BaseModel):"""Message format for MCP protocol"""id:str=Field(default_factory=lambda:str(uuid.uuid4())) timestamp:float=Field(default_factory=time.time) sender:str receiver:str message_type:str# "request", "response", "notification" content: Dict[str, Any] reply_to: Optional[str]=None conversation_id:str metadata: Dict[str, Any]={}classclsMCPBroker:"""Message broker for MCP protocol communication between agents"""def__init__(self):self.message_queues: Dict[str, queue.Queue]={}self.subscribers: Dict[str, List[str]]={}self.conversation_history: Dict[str, List[clsMCPMessage]]={}defregister_agent(self,agent_id:str)->None:"""Register an agent with the broker"""if agent_id notinself.message_queues:self.message_queues[agent_id]= queue.Queue()self.subscribers[agent_id]=[]defsubscribe(self,subscriber_id:str,publisher_id:str)->None:"""Subscribe an agent to messages from another agent"""if publisher_id inself.subscribers:if subscriber_id notinself.subscribers[publisher_id]:self.subscribers[publisher_id].append(subscriber_id)defpublish(self,message: clsMCPMessage)->None:"""Publish a message to its intended receiver"""# Store in conversation historyif message.conversation_id notinself.conversation_history:self.conversation_history[message.conversation_id]=[]self.conversation_history[message.conversation_id].append(message)# Deliver to direct receiverif message.receiver inself.message_queues:self.message_queues[message.receiver].put(message)# Deliver to subscribers of the senderfor subscriber inself.subscribers.get(message.sender,[]):if subscriber != message.receiver:# Avoid duplicatesself.message_queues[subscriber].put(message)defget_message(self,agent_id:str,timeout: Optional[float]=None)-> Optional[clsMCPMessage]:"""Get a message for the specified agent"""try:returnself.message_queues[agent_id].get(timeout=timeout)except(queue.Empty,KeyError):returnNonedefget_conversation_history(self,conversation_id:str)-> List[clsMCPMessage]:"""Get the history of a conversation"""returnself.conversation_history.get(conversation_id,[])
Imagine a system where different virtual agents (like robots or apps) need to talk to each other. To do that, they send messages back and forth—kind of like emails or text messages. This code is responsible for:
Making sure those messages are properly written (like filling out all parts of a form).
Making sure messages are delivered to the right people.
Keeping a record of conversations so you can go back and review what was said.
This part (clsMCPMessage) is like a template or a form that every message needs to follow. Each message has:
ID: A unique number so every message is different (like a serial number).
Time Sent: When the message was created.
Sender & Receiver: Who sent the message and who is supposed to receive it.
Type of Message: Is it a request, a response, or just a notification?
Content: The actual information or question the message is about.
Reply To: If this message is answering another one, this tells which one.
Conversation ID: So we know which group of messages belongs to the same conversation.
Extra Info (Metadata): Any other small details that might help explain the message.
This (clsMCPBroker) is the system (or “post office”) that makes sure messages get to where they’re supposed to go. Here’s what it does:
1. Registering an Agent
Think of this like signing up a new user in the system.
Each agent gets their own personal mailbox (called a “message queue”) so others can send them messages.
2. Subscribing to Another Agent
If Agent A wants to receive copies of messages from Agent B, they can “subscribe” to B.
This is like signing up for B’s newsletter—whenever B sends something, A gets a copy.
3. Sending a Message
When someone sends a message:
It is saved into a conversation history (like keeping emails in your inbox).
It is delivered to the main person it was meant for.
And, if anyone subscribed to the sender, they get a copy too—unless they’re already the main receiver (to avoid sending duplicates).
4. Receiving Messages
Each agent can check their personal mailbox to see if they got any new messages.
If there are no messages, they’ll either wait for some time or move on.
5. Viewing Past Conversations
You can look up all messages that were part of a specific conversation.
This is helpful for remembering what was said earlier.
In systems where many different smart tools or services need to work together and communicate, this kind of communication system makes sure everything is:
Organized
Delivered correctly
Easy to trace back when needed
So, in this post, we’ll finish it here. We’ll cover the rest of the post in the next post.
I’ll bring some more exciting topics in the coming days from the Python verse.
Till then, Happy Avenging! 🙂
Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. There is always room for improvement in this kind of model & the solution associated with it. I’ve shown the basic ways to achieve the same for educational purposes only.
You must be logged in to post a comment.