Real-time video summary assistance App – Part 2

As a continuation of the previous post, I would like to continue my discussion about the implementation of MCP protocols among agents. But before that, I want to add the quick demo one more time to recap our objectives.

Let us recap the process flow –

Also, understand the groupings of scripts by each group as posted in the previous post –

Message-Chaining Protocol (MCP) Implementation:

    clsMCPMessage.py
    clsMCPBroker.py

YouTube Transcript Extraction:

    clsYouTubeVideoProcessor.py

Language Detection:

    clsLanguageDetector.py

Translation Services & Agents:

    clsTranslationAgent.py
    clsTranslationService.py

Documentation Agent:

    clsDocumentationAgent.py
    
Research Agent:

    clsDocumentationAgent.py

Great! Now, we’ll continue with the main discussion.


def extract_youtube_id(youtube_url):
    """Extract YouTube video ID from URL"""
    youtube_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url)
    if youtube_id_match:
        return youtube_id_match.group(1)
    return None

def get_youtube_transcript(youtube_url):
    """Get transcript from YouTube video"""
    video_id = extract_youtube_id(youtube_url)
    if not video_id:
        return {"error": "Invalid YouTube URL or ID"}
    
    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        
        # First try to get manual transcripts
        try:
            transcript = transcript_list.find_manually_created_transcript(["en"])
            transcript_data = transcript.fetch()
            print(f"Debug - Manual transcript format: {type(transcript_data)}")
            if transcript_data and len(transcript_data) > 0:
                print(f"Debug - First item type: {type(transcript_data[0])}")
                print(f"Debug - First item sample: {transcript_data[0]}")
            return {"text": transcript_data, "language": "en", "auto_generated": False}
        except Exception as e:
            print(f"Debug - No manual transcript: {str(e)}")
            # If no manual English transcript, try any available transcript
            try:
                available_transcripts = list(transcript_list)
                if available_transcripts:
                    transcript = available_transcripts[0]
                    print(f"Debug - Using transcript in language: {transcript.language_code}")
                    transcript_data = transcript.fetch()
                    print(f"Debug - Auto transcript format: {type(transcript_data)}")
                    if transcript_data and len(transcript_data) > 0:
                        print(f"Debug - First item type: {type(transcript_data[0])}")
                        print(f"Debug - First item sample: {transcript_data[0]}")
                    return {
                        "text": transcript_data, 
                        "language": transcript.language_code, 
                        "auto_generated": transcript.is_generated
                    }
                else:
                    return {"error": "No transcripts available for this video"}
            except Exception as e:
                return {"error": f"Error getting transcript: {str(e)}"}
    except Exception as e:
        return {"error": f"Error getting transcript list: {str(e)}"}

# ----------------------------------------------------------------------------------
# YouTube Video Processor
# ----------------------------------------------------------------------------------

class clsYouTubeVideoProcessor:
    """Process YouTube videos using the agent system"""
    
    def __init__(self, documentation_agent, translation_agent, research_agent):
        self.documentation_agent = documentation_agent
        self.translation_agent = translation_agent
        self.research_agent = research_agent
    
    def process_youtube_video(self, youtube_url):
        """Process a YouTube video"""
        print(f"Processing YouTube video: {youtube_url}")
        
        # Extract transcript
        transcript_result = get_youtube_transcript(youtube_url)
        
        if "error" in transcript_result:
            return {"error": transcript_result["error"]}
        
        # Start a new conversation
        conversation_id = self.documentation_agent.start_processing()
        
        # Process transcript segments
        transcript_data = transcript_result["text"]
        transcript_language = transcript_result["language"]
        
        print(f"Debug - Type of transcript_data: {type(transcript_data)}")
        
        # For each segment, detect language and translate if needed
        processed_segments = []
        
        try:
            # Make sure transcript_data is a list of dictionaries with text and start fields
            if isinstance(transcript_data, list):
                for idx, segment in enumerate(transcript_data):
                    print(f"Debug - Processing segment {idx}, type: {type(segment)}")
                    
                    # Extract text properly based on the type
                    if isinstance(segment, dict) and "text" in segment:
                        text = segment["text"]
                        start = segment.get("start", 0)
                    else:
                        # Try to access attributes for non-dict types
                        try:
                            text = segment.text
                            start = getattr(segment, "start", 0)
                        except AttributeError:
                            # If all else fails, convert to string
                            text = str(segment)
                            start = idx * 5  # Arbitrary timestamp
                    
                    print(f"Debug - Extracted text: {text[:30]}...")
                    
                    # Create a standardized segment
                    std_segment = {
                        "text": text,
                        "start": start
                    }
                    
                    # Process through translation agent
                    translation_result = self.translation_agent.process_text(text, conversation_id)
                    
                    # Update segment with translation information
                    segment_with_translation = {
                        **std_segment,
                        "translation_info": translation_result
                    }
                    
                    # Use translated text for documentation
                    if "final_text" in translation_result and translation_result["final_text"] != text:
                        std_segment["processed_text"] = translation_result["final_text"]
                    else:
                        std_segment["processed_text"] = text
                    
                    processed_segments.append(segment_with_translation)
            else:
                # If transcript_data is not a list, treat it as a single text block
                print(f"Debug - Transcript is not a list, treating as single text")
                text = str(transcript_data)
                std_segment = {
                    "text": text,
                    "start": 0
                }
                
                translation_result = self.translation_agent.process_text(text, conversation_id)
                segment_with_translation = {
                    **std_segment,
                    "translation_info": translation_result
                }
                
                if "final_text" in translation_result and translation_result["final_text"] != text:
                    std_segment["processed_text"] = translation_result["final_text"]
                else:
                    std_segment["processed_text"] = text
                
                processed_segments.append(segment_with_translation)
                
        except Exception as e:
            print(f"Debug - Error processing transcript: {str(e)}")
            return {"error": f"Error processing transcript: {str(e)}"}
        
        # Process the transcript with the documentation agent
        documentation_result = self.documentation_agent.process_transcript(
            processed_segments,
            conversation_id
        )
        
        return {
            "youtube_url": youtube_url,
            "transcript_language": transcript_language,
            "processed_segments": processed_segments,
            "documentation": documentation_result,
            "conversation_id": conversation_id
        }

Let us understand this step-by-step:

Part 1: Getting the YouTube Transcript

def extract_youtube_id(youtube_url):
    ...

This extracts the unique video ID from any YouTube link. 

def get_youtube_transcript(youtube_url):
    ...
  • This gets the actual spoken content of the video.
  • It tries to get a manual transcript first (created by humans).
  • If not available, it falls back to an auto-generated version (created by YouTube’s AI).
  • If nothing is found, it gives back an error message like: “Transcript not available.”

Part 2: Processing the Video with Agents

class clsYouTubeVideoProcessor:
    ...

This is like the control center that tells each intelligent agent what to do with the transcript. Here are the detailed steps:

1. Start the Process

def process_youtube_video(self, youtube_url):
    ...
  • The system starts with a YouTube video link.
  • It prints a message like: “Processing YouTube video: [link]”

2. Extract the Transcript

  • The system runs the get_youtube_transcript() function.
  • If it fails, it returns an error (e.g., invalid link or no subtitles available).

3. Start a “Conversation”

  • The documentation agent begins a new session, tracked by a unique conversation ID.
  • Think of this like opening a new folder in a shared team workspace to store everything related to this video.

4. Go Through Each Segment of the Transcript

  • The spoken text is often broken into small parts (segments), like subtitles.
  • For each part:
    • It checks the text.
    • It finds out the time that part was spoken.
    • It sends it to the translation agent to clean up or translate the text.

5. Translate (if needed)

  • If the translation agent finds a better or translated version, it replaces the original.
  • Otherwise, it keeps the original.

6. Prepare for Documentation

  • After translation, the segment is passed to the documentation agent.
  • This agent might:
    • Summarize the content,
    • Highlight important terms,
    • Structure it into a readable format.

7. Return the Final Result

The system gives back a structured package with:

  • The video link
  • The original language
  • The transcript in parts (processed and translated)
  • A documentation summary
  • The conversation ID (for tracking or further updates)

class clsDocumentationAgent:
    """Documentation Agent built with LangChain"""
    
    def __init__(self, agent_id: str, broker: clsMCPBroker):
        self.agent_id = agent_id
        self.broker = broker
        self.broker.register_agent(agent_id)
        
        # Initialize LangChain components
        self.llm = ChatOpenAI(
            model="gpt-4-0125-preview",
            temperature=0.1,
            api_key=OPENAI_API_KEY
        )
        
        # Create tools
        self.tools = [
            clsSendMessageTool(sender_id=self.agent_id, broker=self.broker)
        ]
        
        # Set up LLM with tools
        self.llm_with_tools = self.llm.bind(
            tools=[tool.tool_config for tool in self.tools]
        )
        
        # Setup memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        
        # Create prompt
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a Documentation Agent for YouTube video transcripts. Your responsibilities include:
                1. Process YouTube video transcripts
                2. Identify key points, topics, and main ideas
                3. Organize content into a coherent and structured format
                4. Create concise summaries
                5. Request research information when necessary
                
                When you need additional context or research, send a request to the Research Agent.
                Always maintain a professional tone and ensure your documentation is clear and organized.
            """),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        # Create agent
        self.agent = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: self.memory.load_memory_variables({})["chat_history"],
                "agent_scratchpad": lambda x: format_to_openai_tool_messages(x["intermediate_steps"]),
            }
            | self.prompt
            | self.llm_with_tools
            | OpenAIToolsAgentOutputParser()
        )
        
        # Create agent executor
        self.agent_executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            verbose=True,
            memory=self.memory
        )
        
        # Video data
        self.current_conversation_id = None
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
    def start_processing(self) -> str:
        """Start processing a new video"""
        self.current_conversation_id = str(uuid.uuid4())
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
        return self.current_conversation_id
    
    def process_transcript(self, transcript_segments, conversation_id=None):
        """Process a YouTube transcript"""
        if not conversation_id:
            conversation_id = self.start_processing()
        self.current_conversation_id = conversation_id
        
        # Store transcript segments
        self.transcript_segments = transcript_segments
        
        # Process segments
        processed_segments = []
        for segment in transcript_segments:
            processed_result = self.process_segment(segment)
            processed_segments.append(processed_result)
        
        # Generate summary
        summary = self.generate_summary()
        
        return {
            "processed_segments": processed_segments,
            "summary": summary,
            "conversation_id": conversation_id
        }
    
    def process_segment(self, segment):
        """Process individual transcript segment"""
        text = segment.get("text", "")
        start = segment.get("start", 0)
        
        # Use LangChain agent to process the segment
        result = self.agent_executor.invoke({
            "input": f"Process this video transcript segment at timestamp {start}s: {text}. If research is needed, send a request to the research_agent."
        })
        
        # Update video notes
        timestamp = start
        self.video_notes[timestamp] = {
            "text": text,
            "analysis": result["output"]
        }
        
        return {
            "timestamp": timestamp,
            "text": text,
            "analysis": result["output"]
        }
    
    def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
        """Handle an incoming MCP message"""
        if message.message_type == "research_response":
            # Process research information received from Research Agent
            research_info = message.content.get("text", "")
            
            result = self.agent_executor.invoke({
                "input": f"Incorporate this research information into video analysis: {research_info}"
            })
            
            # Send acknowledgment back to Research Agent
            response = clsMCPMessage(
                sender=self.agent_id,
                receiver=message.sender,
                message_type="acknowledgment",
                content={"text": "Research information incorporated into video analysis."},
                reply_to=message.id,
                conversation_id=message.conversation_id
            )
            
            self.broker.publish(response)
            return response
        
        elif message.message_type == "translation_response":
            # Process translation response from Translation Agent
            translation_result = message.content
            
            # Process the translated text
            if "final_text" in translation_result:
                text = translation_result["final_text"]
                original_text = translation_result.get("original_text", "")
                language_info = translation_result.get("language", {})
                
                result = self.agent_executor.invoke({
                    "input": f"Process this translated text: {text}\nOriginal language: {language_info.get('language', 'unknown')}\nOriginal text: {original_text}"
                })
                
                # Update notes with translation information
                for timestamp, note in self.video_notes.items():
                    if note["text"] == original_text:
                        note["translated_text"] = text
                        note["language"] = language_info
                        break
            
            return None
        
        return None
    
    def run(self):
        """Run the agent to listen for MCP messages"""
        print(f"Documentation Agent {self.agent_id} is running...")
        while True:
            message = self.broker.get_message(self.agent_id, timeout=1)
            if message:
                self.handle_mcp_message(message)
            time.sleep(0.1)
    
    def generate_summary(self) -> str:
        """Generate a summary of the video"""
        if not self.video_notes:
            return "No video data available to summarize."
        
        all_notes = "\n".join([f"{ts}: {note['text']}" for ts, note in self.video_notes.items()])
        
        result = self.agent_executor.invoke({
            "input": f"Generate a concise summary of this YouTube video, including key points and topics:\n{all_notes}"
        })
        
        return result["output"]

Let us understand the key methods in a step-by-step manner:

The Documentation Agent is like a smart assistant that watches a YouTube video, takes notes, pulls out important ideas, and creates a summary — almost like a professional note-taker trained to help educators, researchers, and content creators. It works with a team of other assistants, like a Translator Agent and a Research Agent, and they all talk to each other through a messaging system.

1. Starting to Work on a New Video

    def start_processing(self) -> str
    

    When a new video is being processed:

    • A new project ID is created.
    • Old notes and transcripts are cleared to start fresh.

    2. Processing the Whole Transcript

    def process_transcript(...)
    

    This is where the assistant:

    • Takes in the full transcript (what was said in the video).
    • Breaks it into small parts (like subtitles).
    • Sends each part to the smart brain for analysis.
    • Collects the results.
    • Finally, a summary of all the main ideas is created.

    3. Processing One Transcript Segment at a Time

    def process_segment(self, segment)
    

    For each chunk of the video:

    • The assistant reads the text and timestamp.
    • It asks GPT-4 to analyze it and suggest important insights.
    • It saves that insight along with the original text and timestamp.

    4. Handling Incoming Messages from Other Agents

    def handle_mcp_message(self, message)
    

    The assistant can also receive messages from teammates (other agents):

    If the message is from the Research Agent:

    • It reads new information and adds it to its notes.
    • It replies with a thank-you message to say it got the research.

    If the message is from the Translation Agent:

    • It takes the translated version of a transcript.
    • Updates its notes to reflect the translated text and its language.

    This is like a team of assistants emailing back and forth to make sure the notes are complete and accurate.

    5. Summarizing the Whole Video

    def generate_summary(self)
    

    After going through all the transcript parts, the agent asks GPT-4 to create a short, clean summary — identifying:

    • Main ideas
    • Key talking points
    • Structure of the content

    The final result is clear, professional, and usable in learning materials or documentation.


    class clsResearchAgent:
        """Research Agent built with AutoGen"""
        
        def __init__(self, agent_id: str, broker: clsMCPBroker):
            self.agent_id = agent_id
            self.broker = broker
            self.broker.register_agent(agent_id)
            
            # Configure AutoGen directly with API key
            if not OPENAI_API_KEY:
                print("Warning: OPENAI_API_KEY not set for ResearchAgent")
                
            # Create config list directly instead of loading from file
            config_list = [
                {
                    "model": "gpt-4-0125-preview",
                    "api_key": OPENAI_API_KEY
                }
            ]
            # Create AutoGen assistant for research
            self.assistant = AssistantAgent(
                name="research_assistant",
                system_message="""You are a Research Agent for YouTube videos. Your responsibilities include:
                    1. Research topics mentioned in the video
                    2. Find relevant information, facts, references, or context
                    3. Provide concise, accurate information to support the documentation
                    4. Focus on delivering high-quality, relevant information
                    
                    Respond directly to research requests with clear, factual information.
                """,
                llm_config={"config_list": config_list, "temperature": 0.1}
            )
            
            # Create user proxy to handle message passing
            self.user_proxy = UserProxyAgent(
                name="research_manager",
                human_input_mode="NEVER",
                code_execution_config={"work_dir": "coding", "use_docker": False},
                default_auto_reply="Working on the research request..."
            )
            
            # Current conversation tracking
            self.current_requests = {}
        
        def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
            """Handle an incoming MCP message"""
            if message.message_type == "request":
                # Process research request from Documentation Agent
                request_text = message.content.get("text", "")
                
                # Use AutoGen to process the research request
                def research_task():
                    self.user_proxy.initiate_chat(
                        self.assistant,
                        message=f"Research request for YouTube video content: {request_text}. Provide concise, factual information."
                    )
                    # Return last assistant message
                    return self.assistant.chat_messages[self.user_proxy.name][-1]["content"]
                
                # Execute research task
                research_result = research_task()
                
                # Send research results back to Documentation Agent
                response = clsMCPMessage(
                    sender=self.agent_id,
                    receiver=message.sender,
                    message_type="research_response",
                    content={"text": research_result},
                    reply_to=message.id,
                    conversation_id=message.conversation_id
                )
                
                self.broker.publish(response)
                return response
            
            return None
        
        def run(self):
            """Run the agent to listen for MCP messages"""
            print(f"Research Agent {self.agent_id} is running...")
            while True:
                message = self.broker.get_message(self.agent_id, timeout=1)
                if message:
                    self.handle_mcp_message(message)
                time.sleep(0.1)
    

    Let us understand the key methods in detail.

    1. Receiving and Responding to Research Requests

      def handle_mcp_message(self, message)
      

      When the Research Agent gets a message (like a question or request for info), it:

      1. Reads the message to see what needs to be researched.
      2. Asks GPT-4 to find helpful, accurate info about that topic.
      3. Sends the answer back to whoever asked the question (usually the Documentation Agent).

      class clsTranslationAgent:
          """Agent for language detection and translation"""
          
          def __init__(self, agent_id: str, broker: clsMCPBroker):
              self.agent_id = agent_id
              self.broker = broker
              self.broker.register_agent(agent_id)
              
              # Initialize language detector
              self.language_detector = clsLanguageDetector()
              
              # Initialize translation service
              self.translation_service = clsTranslationService()
          
          def process_text(self, text, conversation_id=None):
              """Process text: detect language and translate if needed, handling mixed language content"""
              if not conversation_id:
                  conversation_id = str(uuid.uuid4())
              
              # Detect language with support for mixed language content
              language_info = self.language_detector.detect(text)
              
              # Decide if translation is needed
              needs_translation = True
              
              # Pure English content doesn't need translation
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  needs_translation = False
              
              # For mixed language, check if it's primarily English
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  english_langs = [
                      lang for lang in language_info.get("languages", []) 
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-")
                  ]
                  
                  # If the highest confidence language is English and > 60% confident, don't translate
                  if english_langs and english_langs[0].get("confidence", 0) > 0.6:
                      needs_translation = False
              
              if needs_translation:
                  # Translate using the appropriate service based on language detection
                  translation_result = self.translation_service.translate(text, language_info)
                  
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": translation_result,
                      "final_text": translation_result.get("translated_text", text),
                      "conversation_id": conversation_id
                  }
              else:
                  # Already English or unknown language, return as is
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": {"provider": "none"},
                      "final_text": text,
                      "conversation_id": conversation_id
                  }
          
          def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
              """Handle an incoming MCP message"""
              if message.message_type == "translation_request":
                  # Process translation request from Documentation Agent
                  text = message.content.get("text", "")
                  
                  # Process the text
                  result = self.process_text(text, message.conversation_id)
                  
                  # Send translation results back to requester
                  response = clsMCPMessage(
                      sender=self.agent_id,
                      receiver=message.sender,
                      message_type="translation_response",
                      content=result,
                      reply_to=message.id,
                      conversation_id=message.conversation_id
                  )
                  
                  self.broker.publish(response)
                  return response
              
              return None
          
          def run(self):
              """Run the agent to listen for MCP messages"""
              print(f"Translation Agent {self.agent_id} is running...")
              while True:
                  message = self.broker.get_message(self.agent_id, timeout=1)
                  if message:
                      self.handle_mcp_message(message)
                  time.sleep(0.1)

      Let us understand the key methods in step-by-step manner:

      1. Understanding and Translating Text:

      def process_text(...)
      

      This is the core job of the agent. Here’s what it does with any piece of text:

      Step 1: Detect the Language

      • It tries to figure out the language of the input text.
      • It can handle cases where more than one language is mixed together, which is common in casual speech or subtitles.

      Step 2: Decide Whether to Translate

      • If the text is clearly in English, or it’s unclear what the language is, it decides not to translate.
      • If the text is mostly in another language or has less than 60% confidence in being English, it will translate it into English.

      Step 3: Translate (if needed)

      • If translation is required, it uses the translation service to do the job.
      • Then it packages all the information: the original text, detected language, the translated version, and a unique conversation ID.

      Step 4: Return the Results

      • If no translation is needed, it returns the original text and a note saying “no translation was applied.”

      2. Receiving Messages and Responding

      def handle_mcp_message(...)
      

      The agent listens for messages from other agents. When someone asks it to translate something:

      • It takes the text from the message.
      • Runs it through the process_text function (as explained above).
      • Sends the translated (or original) result to the person who asked.
      class clsTranslationService:
          """Translation service using multiple providers with support for mixed languages"""
          
          def __init__(self):
              # Initialize Sarvam AI client
              self.sarvam_api_key = SARVAM_API_KEY
              self.sarvam_url = "https://api.sarvam.ai/translate"
              
              # Initialize Google Cloud Translation client using simple HTTP requests
              self.google_api_key = GOOGLE_API_KEY
              self.google_translate_url = "https://translation.googleapis.com/language/translate/v2"
          
          def translate_with_sarvam(self, text, source_lang, target_lang="en-IN"):
              """Translate text using Sarvam AI (for Indian languages)"""
              if not self.sarvam_api_key:
                  return {"error": "Sarvam API key not set"}
              
              headers = {
                  "Content-Type": "application/json",
                  "api-subscription-key": self.sarvam_api_key
              }
              
              payload = {
                  "input": text,
                  "source_language_code": source_lang,
                  "target_language_code": target_lang,
                  "speaker_gender": "Female",
                  "mode": "formal",
                  "model": "mayura:v1"
              }
              
              try:
                  response = requests.post(self.sarvam_url, headers=headers, json=payload)
                  if response.status_code == 200:
                      return {"translated_text": response.json().get("translated_text", ""), "provider": "sarvam"}
                  else:
                      return {"error": f"Sarvam API error: {response.text}", "provider": "sarvam"}
              except Exception as e:
                  return {"error": f"Error calling Sarvam API: {str(e)}", "provider": "sarvam"}
          
          def translate_with_google(self, text, target_lang="en"):
              """Translate text using Google Cloud Translation API with direct HTTP request"""
              if not self.google_api_key:
                  return {"error": "Google API key not set"}
              
              try:
                  # Using the translation API v2 with API key
                  params = {
                      "key": self.google_api_key,
                      "q": text,
                      "target": target_lang
                  }
                  
                  response = requests.post(self.google_translate_url, params=params)
                  if response.status_code == 200:
                      data = response.json()
                      translation = data.get("data", {}).get("translations", [{}])[0]
                      return {
                          "translated_text": translation.get("translatedText", ""),
                          "detected_source_language": translation.get("detectedSourceLanguage", ""),
                          "provider": "google"
                      }
                  else:
                      return {"error": f"Google API error: {response.text}", "provider": "google"}
              except Exception as e:
                  return {"error": f"Error calling Google Translation API: {str(e)}", "provider": "google"}
          
          def translate(self, text, language_info):
              """Translate text to English based on language detection info"""
              # If already English or unknown language, return as is
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  return {"translated_text": text, "provider": "none"}
              
              # Handle mixed language content
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  # Strategy for mixed language: 
                  # 1. If one of the languages is English, don't translate the entire text, as it might distort English portions
                  # 2. If no English but contains Indian languages, use Sarvam as it handles code-mixing better
                  # 3. Otherwise, use Google Translate for the primary detected language
                  
                  has_english = False
                  has_indian = False
                  
                  for lang in language_info.get("languages", []):
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-"):
                          has_english = True
                      if lang.get("is_indian", False):
                          has_indian = True
                  
                  if has_english:
                      # Contains English - use Google for full text as it handles code-mixing well
                      return self.translate_with_google(text)
                  elif has_indian:
                      # Contains Indian languages - use Sarvam
                      # Use the highest confidence Indian language as source
                      indian_langs = [lang for lang in language_info.get("languages", []) if lang.get("is_indian", False)]
                      if indian_langs:
                          # Sort by confidence
                          indian_langs.sort(key=lambda x: x.get("confidence", 0), reverse=True)
                          source_lang = indian_langs[0]["language_code"]
                          return self.translate_with_sarvam(text, source_lang)
                      else:
                          # Fallback to primary language
                          if language_info["is_indian"]:
                              return self.translate_with_sarvam(text, language_info["language_code"])
                          else:
                              return self.translate_with_google(text)
                  else:
                      # No English, no Indian languages - use Google for primary language
                      return self.translate_with_google(text)
              else:
                  # Not mixed language - use standard approach
                  if language_info["is_indian"]:
                      # Use Sarvam AI for Indian languages
                      return self.translate_with_sarvam(text, language_info["language_code"])
                  else:
                      # Use Google for other languages
                      return self.translate_with_google(text)

      This Translation Service is like a smart translator that knows how to:

      • Detect what language the text is written in,
      • Choose the best translation provider depending on the language (especially for Indian languages),
      • And then translate the text into English.

      It supports mixed-language content (such as Hindi-English in one sentence) and uses either Google Translate or Sarvam AI, a translation service designed for Indian languages.

      Now, let us understand the key methods in a step-by-step manner:

      1. Translating Using Google Translate

      def translate_with_google(...)
      

      This function uses Google Translate:

      • It sends the text, asks for English as the target language, and gets a translation back.
      • It also detects the source language automatically.
      • If successful, it returns the translated text and the detected original language.
      • If there’s an error, it returns a message saying what went wrong.

      Best For: Non-Indian languages (like Spanish, French, Chinese) and content that is not mixed with English.

      2. Main Translation Logic

      def translate(self, text, language_info)
      

      This is the decision-maker. Here’s how it works:

      Case 1: No Translation Needed

      If the text is already in English or the language is unknown, it simply returns the original text.

      Case 2: Mixed Language (e.g., Hindi + English)

      If the text contains more than one language:

      • ✅ If one part is English → use Google Translate (it’s good with mixed languages).
      • ✅ If it includes Indian languages only → use Sarvam AI (better at handling Indian content).
      • ✅ If it’s neither English nor Indian → use Google Translate.

      The service checks how confident it is about each language in the mix and chooses the most likely one to translate from.

      Case 3: Single Language

      If the text is only in one language:

      • ✅ If it’s an Indian language (like Bengali, Tamil, or Marathi), use Sarvam AI.
      • ✅ If it’s any other language, use Google Translate.

      So, we’ve done it.

      I’ve included the complete working solutions for you in the GitHub Link.

      We’ll cover the detailed performance testing, Optimized configurations & many other useful details in our next post.

      Till then, Happy Avenging! 🙂

      Monitoring & evaluating the leading LLMs (both the established & new) by Python-based evaluator

      As we’re leaping more & more into the field of Generative AI, one of the frequent questions or challenges people are getting more & more is the performance & other evaluation factors. These factors will eventually bring the fruit of this technology; otherwise, you will end up in technical debt.

      This post will discuss the key snippets of the monitoring app based on the Python-based AI app. But before that, let us first view the demo.

      Isn’t it exciting?


      Let us deep dive into it. But, here is the flow this solution will follow.

      So, the current application will invoke the industry bigshots and some relatively unknown or new LLMs.

      In this case, we’ll evaluate Anthropic, Open AI, DeepSeek, and Bharat GPT’s various models. However, Bharat GPT is open source, so we’ll use the Huggingface library and execute it locally against my MacBook Pro M4 Max.

      The following are the KPIs we’re going to evaluate:

      Here are the lists of dependant python packages that is require to run this application –

      pip install certifi==2024.8.30
      pip install anthropic==0.42.0
      pip install huggingface-hub==0.27.0
      pip install nltk==3.9.1
      pip install numpy==2.2.1
      pip install moviepy==2.1.1
      pip install numpy==2.1.3
      pip install openai==1.59.3
      pip install pandas==2.2.3
      pip install pillow==11.1.0
      pip install pip==24.3.1
      pip install psutil==6.1.1
      pip install requests==2.32.3
      pip install rouge_score==0.1.2
      pip install scikit-learn==1.6.0
      pip install setuptools==70.2.0
      pip install tokenizers==0.21.0
      pip install torch==2.6.0.dev20250104
      pip install torchaudio==2.6.0.dev20250104
      pip install torchvision==0.22.0.dev20250104
      pip install tqdm==4.67.1
      pip install transformers==4.47.1
          @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
          def get_claude_response(self, prompt: str) -> str:
              response = self.anthropic_client.messages.create(
                  model=anthropic_model,
                  max_tokens=maxToken,
                  messages=[{"role": "user", "content": prompt}]
              )
              return response.content[0].text
      1. The Retry Mechanism
        • The @retry line means this function will automatically try again if it fails.
        • It will stop retrying after 3 attempts (stop_after_attempt(3)).
        • It will wait longer between retries, starting at 4 seconds and increasing up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).
      2. The Function Purpose
        • The function takes a message, called prompt, as input (a string of text).
        • It uses a service (likely an AI system like Claude) to generate a response to this prompt.
      3. Sending the Message
        • Inside the function, the code self.anthropic_client.messages.create is the part that actually sends the prompt to the AI.
        • It specifies:Which AI model to use (e.g., anthropic_model).
        • The maximum length of the response (controlled by maxToken).
        • The input message for the AI has a “role” (user), as well as the content of the prompt.
      4. Getting the Response
        • Once the AI generates a response, it’s saved as response.
        • The code retrieves the first part of the response (response.content[0].text) and sends it back to whoever called the function.

      Similarly, it will work for Open AI as well.

          @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
          def get_deepseek_response(self, prompt: str) -> tuple:
              deepseek_api_key = self.deepseek_api_key
      
              headers = {
                  "Authorization": f"Bearer {deepseek_api_key}",
                  "Content-Type": "application/json"
                  }
              
              payload = {
                  "model": deepseek_model,  
                  "messages": [{"role": "user", "content": prompt}],
                  "max_tokens": maxToken
                  }
              
              response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload)
      
              if response.status_code == 200:
                  res = response.json()["choices"][0]["message"]["content"]
              else:
                  res = "API request failed with status code " + str(response.status_code) + ":" + str(response.text)
      
              return res
      1. Retry Mechanism:
        • The @retry line ensures the function will try again if it fails.
        • It will stop retrying after 3 attempts (stop_after_attempt(3)).
        • It waits between retries, starting at 4 seconds and increasing up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).

      1. What the Function Does:
        • The function takes one input, prompt, which is the message or question you want to send to the AI.
        • It returns the AI’s response or an error message.

      1. Preparing to Communicate with the API:
        • API Key: It gets the API key for the DeepSeek service from self.deepseek_api_key.
        • Headers: These tell the API that the request will use the API key (for security) and that the data format is JSON (structured text).
        • Payload: This is the information sent to the AI. It includes:
          • Model: Specifies which version of the AI to use (deepseek_model).
          • Messages: The input message with the role “user” and your prompt.
          • Max Tokens: Defines the maximum size of the AI’s response (maxToken).

      1. Sending the Request:
        • It uses the requests.post() method to send the payload and headers to the DeepSeek API using the URL DEEPSEEK_API_URL.

      1. Processing the Response:
        • If the API responds successfully (status_code == 200):
          • It extracts the AI’s reply from the response data.
          • Specifically, it gets the first choice’s message content: response.json()["choices"][0]["message"]["content"].
        • If there’s an error:
          • It constructs an error message with the status code and detailed error text from the API.

      1. Returning the Result:
        • The function outputs either the AI’s response or the error message.
          @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
          def get_bharatgpt_response(self, prompt: str) -> tuple:
              try:
                  messages = [[{"role": "user", "content": prompt}]]
                  
                  response = pipe(messages, max_new_tokens=maxToken,)
      
                  # Extract 'content' field safely
                  res = next((entry.get("content", "")
                              for entry in response[0][0].get("generated_text", [])
                              if isinstance(entry, dict) and entry.get("role") == "assistant"
                              ),
                              None,
                              )
                  
                  return res
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return ""
      1. Retry Mechanism:The @retry ensures the function will try again if it fails.
        • It will stop retrying after 3 attempts (stop_after_attempt(3)).
        • The waiting time between retries starts at 4 seconds and increases exponentially up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).
      2. What the Function Does:The function takes one input, prompt, which is the message or question you want to send to BharatGPT.
        • It returns the AI’s response or an empty string if something goes wrong.
      3. Sending the Prompt:Messages Structure: The function wraps the user’s prompt in a format that the BharatGPT AI understands:
        • messages = [[{"role": "user", "content": prompt}]]
        • This tells the AI that the prompt is coming from the “user.”
      4. Pipe Function: It uses a pipe() method to send the messages to the AI system.
        • max_new_tokens=maxToken: Limits how long the AI’s response can be.
      5. Extracting the Response:The response from the AI is in a structured format. The code looks for the first piece of text where:
        • The role is “assistant” (meaning it’s the AI’s reply).
        • The text is in the “content” field.
        • The next() function safely extracts this “content” field or returns None if it can’t find it.
      6. Error Handling:If something goes wrong (e.g., the AI doesn’t respond or there’s a technical issue), the code:
        • Captures the error message in e.
        • Prints the error message: print('Error: ', x).
        • Returns an empty string ("") instead of crashing.
      7. Returning the Result:If everything works, the function gives you the AI’s response as plain text.
        • If there’s an error, it gives you an empty string, indicating no response was received.

          def get_model_response(self, model_name: str, prompt: str) -> ModelResponse:
              """Get response from specified model with metrics"""
              start_time = time.time()
              start_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
      
              try:
                  if model_name == "claude-3":
                      response_content = self.get_claude_response(prompt)
                  elif model_name == "gpt4":
                      response_content = self.get_gpt4_response(prompt)
                  elif model_name == "deepseek-chat":
                      response_content = self.get_deepseek_response(prompt)
                  elif model_name == "bharat-gpt":
                      response_content = self.get_bharatgpt_response(prompt)
      
                  # Model-specific API calls 
                  token_count = len(self.bert_tokenizer.encode(response_content))
                  
                  end_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
                  memory_usage = end_memory - start_memory
                  
                  return ModelResponse(
                      content=response_content,
                      response_time=time.time() - start_time,
                      token_count=token_count,
                      memory_usage=memory_usage
                  )
              except Exception as e:
                  logging.error(f"Error getting response from {model_name}: {str(e)}")
                  return ModelResponse(
                      content="",
                      response_time=0,
                      token_count=0,
                      memory_usage=0,
                      error=str(e)
                  )

      Start Tracking Time and Memory:

        • The function starts a timer (start_time) to measure how long it takes to get a response.
        • It also checks how much memory is being used at the beginning (start_memory).

        Choose the AI Model:

        • Based on the model_name provided, the function selects the appropriate method to get a response:
          • "claude-3" → Calls get_claude_response(prompt).
          • "gpt4" → Calls get_gpt4_response(prompt).
          • "deepseek-chat" → Calls get_deepseek_response(prompt).
          • "bharat-gpt" → Calls get_bharatgpt_response(prompt).

        Process the Response:

        • Once the response is received, the function calculates:
          • Token Count: The number of tokens (small chunks of text) in the response using a tokenizer.
          • Memory Usage: The difference between memory usage after the response (end_memory) and before it (start_memory).

        Return the Results:

        • The function bundles all the information into a ModelResponse object:
          • The AI’s reply (content).
          • How long the response took (response_time).
          • The number of tokens in the reply (token_count).
          • How much memory was used (memory_usage).

        Handle Errors:

        • If something goes wrong (e.g., the AI doesn’t respond), the function:
          • Logs the error message.
          • Returns an empty response with default values and the error message.
            def evaluate_text_quality(self, generated: str, reference: str) -> Dict[str, float]:
                """Evaluate text quality metrics"""
                # BERTScore
                gen_embedding = self.sentence_model.encode([generated])
                ref_embedding = self.sentence_model.encode([reference])
                bert_score = cosine_similarity(gen_embedding, ref_embedding)[0][0]
        
                # BLEU Score
                generated_tokens = word_tokenize(generated.lower())
                reference_tokens = word_tokenize(reference.lower())
                bleu = sentence_bleu([reference_tokens], generated_tokens)
        
                # METEOR Score
                meteor = meteor_score([reference_tokens], generated_tokens)
        
                return {
                    'bert_score': bert_score,
                    'bleu_score': bleu,
                    'meteor_score': meteor
                }

        Inputs:

        • generated: The text produced by the AI.
        • reference: The correct or expected version of the text.

        Calculating BERTScore:

        • Converts the generated and reference texts into numerical embeddings (mathematical representations) using a pre-trained model (self.sentence_model.encode).
        • Measures the similarity between the two embeddings using cosine similarity. This gives the bert_score, which ranges from -1 (completely different) to 1 (very similar).

        Calculating BLEU Score:

        • Breaks the generated and reference texts into individual words (tokens) using word_tokenize.
        • Converts both texts to lowercase for consistent comparison.
        • Calculates the BLEU Score (sentence_bleu), which checks how many words or phrases in the generated text overlap with the reference. BLEU values range from 0 (no match) to 1 (perfect match).

        Calculating METEOR Score:

        • Also uses the tokenized versions of generated and reference texts.
        • Calculates the METEOR Score (meteor_score), which considers exact matches, synonyms, and word order. Scores range from 0 (no match) to 1 (perfect match).

        Returning the Results:

        • Combines the three scores into a dictionary with the keys 'bert_score''bleu_score', and 'meteor_score'.

        Similarly, other functions are developed.

            def run_comprehensive_evaluation(self, evaluation_data: List[Dict]) -> pd.DataFrame:
                """Run comprehensive evaluation on all metrics"""
                results = []
                
                for item in evaluation_data:
                    prompt = item['prompt']
                    reference = item['reference']
                    task_criteria = item.get('task_criteria', {})
                    
                    for model_name in self.model_configs.keys():
                        # Get multiple responses to evaluate reliability
                        responses = [
                            self.get_model_response(model_name, prompt)
                            for _ in range(3)  # Get 3 responses for reliability testing
                        ]
                        
                        # Use the best response for other evaluations
                        best_response = max(responses, key=lambda x: len(x.content) if not x.error else 0)
                        
                        if best_response.error:
                            logging.error(f"Error in model {model_name}: {best_response.error}")
                            continue
                        
                        # Gather all metrics
                        metrics = {
                            'model': model_name,
                            'prompt': prompt,
                            'response': best_response.content,
                            **self.evaluate_text_quality(best_response.content, reference),
                            **self.evaluate_factual_accuracy(best_response.content, reference),
                            **self.evaluate_task_performance(best_response.content, task_criteria),
                            **self.evaluate_technical_performance(best_response),
                            **self.evaluate_reliability(responses),
                            **self.evaluate_safety(best_response.content)
                        }
                        
                        # Add business impact metrics using task performance
                        metrics.update(self.evaluate_business_impact(
                            best_response,
                            metrics['task_completion']
                        ))
                        
                        results.append(metrics)
                
                return pd.DataFrame(results)
        • Input:
          • evaluation_data: A list of test cases, where each case is a dictionary containing:
            • prompt: The question or input to the AI model.
            • reference: The ideal or expected answer.
            • task_criteria (optional): Additional rules or requirements for the task.
        • Initialize Results:
          • An empty list results is created to store the evaluation metrics for each model and test case.
        • Iterate Through Test Cases:
          • For each item in the evaluation_data:
            • Extract the promptreference, and task_criteria.
        • Evaluate Each Model:
          • Loop through all available AI models (self.model_configs.keys()).
          • Generate three responses for each model to test reliability.
        • Select the Best Response:
          • Out of the three responses, pick the one with the most content (best_response), ignoring responses with errors.
        • Handle Errors:
          • If a response has an error, log the issue and skip further evaluation for that model.
        • Evaluate Metrics:
          • Using the best_response, calculate a variety of metrics, including:
            • Text Quality: How similar the response is to the reference.
            • Factual Accuracy: Whether the response is factually correct.
            • Task Performance: How well it meets task-specific criteria.
            • Technical Performance: Evaluate time, memory, or other system-related metrics.
            • Reliability: Check consistency across multiple responses.
            • Safety: Ensure the response is safe and appropriate.
        • Evaluate Business Impact:
          • Add metrics for business impact (e.g., how well the task was completed, using task_completion as a key factor).
        • Store Results:
          • Add the calculated metrics for this model and prompt to the results list.
        • Return Results as a DataFrame:
          • Convert the results list into a structured table (a pandas DataFrame) for easy analysis and visualization.

        Great! So, now, we’ve explained the code.

        Let us understand the final outcome of this run & what we can conclude from that.

        1. BERT Score (Semantic Understanding):
          • GPT4 leads slightly at 0.8322 (83.22%)
          • Bharat-GPT close second at 0.8118 (81.18%)
          • Claude-3 at 0.8019 (80.19%)
          • DeepSeek-Chat at 0.7819 (78.19%) Think of this like a “comprehension score” – how well the models understand the context. All models show strong understanding, with only a 5% difference between best and worst.
        2. BLEU Score (Word-for-Word Accuracy):
          • Bharat-GPT leads at 0.0567 (5.67%)
          • Claude-3 at 0.0344 (3.44%)
          • GPT4 at 0.0306 (3.06%)
          • DeepSeek-Chat lowest at 0.0189 (1.89%) These low scores suggest models use different wording than references, which isn’t necessarily bad.
        3. METEOR Score (Meaning Preservation):
          • Bharat-GPT leads at 0.4684 (46.84%)
          • Claude-3 close second at 0.4507 (45.07%)
          • GPT4 at 0.2960 (29.60%)
          • DeepSeek-Chat at 0.2652 (26.52%) This shows how well models maintain meaning while using different words.
        4. Response Time (Speed):
          • Claude-3 fastest: 4.40 seconds
          • Bharat-GPT: 6.35 seconds
          • GPT4: 6.43 seconds
          • DeepSeek-Chat slowest: 8.52 seconds
        5. Safety and Reliability:
          • Error Rate: Perfect 0.0 for all models
          • Toxicity: All very safe (below 0.15%) 
            • Claude-3 safest at 0.0007GPT4 at 0.0008Bharat-GPT at 0.0012
            • DeepSeek-Chat at 0.0014
        6. Cost Efficiency:
          • Claude-3 most economical: $0.0019 per response
          • Bharat-GPT close: $0.0021
          • GPT4: $0.0038
          • DeepSeek-Chat highest: $0.0050

        Key Takeaways by Model:

        1. Claude-3: ✓ Fastest responses ✓ Most cost-effective ✓ Excellent meaning preservation ✓ Lowest toxicity
        2. Bharat-GPT: ✓ Best BLEU and METEOR scores ✓ Strong semantic understanding ✓ Cost-effective ✗ Moderate response time
        3. GPT4: ✓ Best semantic understanding ✓ Good safety metrics ✗ Higher cost ✗ Moderate response time
        4. DeepSeek-Chat: ✗ Generally lower performance ✗ Slowest responses ✗ Highest cost ✗ Slightly higher toxicity

        Reliability of These Statistics:

        Strong Points:

        • Comprehensive metric coverage
        • Consistent patterns across evaluations
        • Zero error rates show reliability
        • Clear differentiation between models

        Limitations:

        • BLEU scores are quite low across all models
        • Doesn’t measure creative or innovative responses
        • May not reflect specific use case performance
        • Single snapshot rather than long-term performance

        Final Observation:

        1. Best Overall Value: Claude-3
          • Fast, cost-effective, safe, good performance
        2. Best for Accuracy: Bharat-GPT
          • Highest meaning preservation and precision
        3. Best for Understanding: GPT4
          • Strongest semantic comprehension
        4. Consider Your Priorities: 
          • Speed → Choose Claude-3
          • Cost → Choose Claude-3 or Bharat-GPT
          • Accuracy → Choose Bharat-GPT
          • Understanding → Choose GPT4

        These statistics provide reliable comparative data but should be part of a broader decision-making process that includes your specific needs, budget, and use cases.


        For the Bharat GPT model, we’ve tested this locally on my MacBook Pro 4 Max. And, the configuration is as follows –

        I’ve tried the API version locally, & it provided a similar performance against the stats that we received by running locally. Unfortunately, they haven’t made the API version public yet.

        So, apart from the Anthropic & Open AI, I’ll watch this new LLM (Bharat GPT) for overall stats in the coming days.


        So, we’ve done it.

        You can find the detailed code at the GitHub link.

        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Building a real-time Gen AI Improvement Matrices (GAIIM) using Python, UpTrain, Open AI & React

        How does the RAG work better for various enterprise-level Gen AI use cases? What needs to be there to make the LLM model work more efficiently & able to check the response & validate their response, including the bias, hallucination & many more?

        This is my post (after a slight GAP), which will capture and discuss some of the burning issues that many AI architects are trying to explore. In this post, I’ve considered a newly formed AI start-up from India, which developed an open-source framework that can easily evaluate all the challenges that one is facing with their LLMs & easily integrate with your existing models for better understanding including its limitations. You will get plenty of insights about it.

        But, before we dig deep, why not see the demo first –

        Isn’t it exciting? Let’s deep dive into the flow of events.


        Let’s explore the broad-level architecture/flow –

        Let us understand the steps of the above architecture. First, our Python application needs to trigger and enable the API, which will interact with the Open AI and UpTrain AI to fetch all the LLM KPIs based on the input from the React app named “Evaluation.”

        Once the response is received from UpTrain AI, the Python application then organizes the results in a better readable manner without changing the core details coming out from their APIs & then shares that back with the react interface.

        Let’s examine the react app’s sample inputs to better understand the input that will be passed to the Python-based API solution, which is wrapper capability to call multiple APIs from the UpTrain & then accumulate them under one response by parsing the data & reorganizing the data with the help of Open AI & sharing that back.

        Highlighted in RED are some of the critical inputs you need to provide to get most of the KPIs. And, here are the sample text inputs for your reference –

        Q. Enter input question.
        A. What are the four largest moons of Jupiter?
        Q. Enter the context document.
        A. Jupiter, the largest planet in our solar system, boasts a fascinating array of moons. Among these, the four largest are collectively known as the Galilean moons, named after the renowned astronomer Galileo Galilei, who first observed them in 1610. These four moons, Io, Europa, Ganymede, and Callisto, hold significant scientific interest due to their unique characteristics and diverse geological features.
        Q. Enter LLM response.
        A. The four largest moons of Jupiter, known as the Galilean moons, are Io, Europa, Ganymede, and Marshmello.
        Q. Enter the persona response.
        A. strict and methodical teacher
        Q. Enter the guideline.
        A. Response shouldn’t contain any specific numbers
        Q. Enter the ground truth.
        A. The Jupiter is the largest & gaseous planet in the solar system.
        Q. Choose the evaluation method.
        A. llm

        Once you fill in the App should look like this –

        Once you fill in, the app should look like the below screenshot –


        Let us understand the sample packages that are required for this task.

        pip install Flask==3.0.3
        pip install Flask-Cors==4.0.0
        pip install numpy==1.26.4
        pip install openai==1.17.0
        pip install pandas==2.2.2
        pip install uptrain==0.6.13

        Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.

        def askFeluda(context, question):
            try:
                # Combine the context and the question into a single prompt.
                prompt_text = f"{context}\n\n Question: {question}\n Answer:"
        
                # Retrieve conversation history from the session or database
                conversation_history = []
        
                # Add the new message to the conversation history
                conversation_history.append(prompt_text)
        
                # Call OpenAI API with the updated conversation
                response = client.with_options(max_retries=0).chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": prompt_text,
                        }
                    ],
                    model=cf.conf['MODEL_NAME'],
                    max_tokens=150,  # You can adjust this based on how long you expect the response to be
                    temperature=0.3,  # Adjust for creativity. Lower values make responses more focused and deterministic
                    top_p=1,
                    frequency_penalty=0,
                    presence_penalty=0
                )
        
                # Extract the content from the first choice's message
                chat_response = response.choices[0].message.content
        
                # Print the generated response text
                return chat_response.strip()
            except Exception as e:
                return f"An error occurred: {str(e)}"

        This function will ask the supplied questions with contexts or it will supply the UpTrain results to summarize the JSON into more easily readable plain texts. For our test, we’ve used “gpt-3.5-turbo”.

        def evalContextRelevance(question, context, resFeluda, personaResponse):
            try:
                data = [{
                    'question': question,
                    'context': context,
                    'response': resFeluda
                }]
        
                results = eval_llm.evaluate(
                    data=data,
                    checks=[Evals.CONTEXT_RELEVANCE, Evals.FACTUAL_ACCURACY, Evals.RESPONSE_COMPLETENESS, Evals.RESPONSE_RELEVANCE, CritiqueTone(llm_persona=personaResponse), Evals.CRITIQUE_LANGUAGE, Evals.VALID_RESPONSE, Evals.RESPONSE_CONCISENESS]
                )
        
                return results
            except Exception as e:
                x = str(e)
        
                return x

        The above methods initiate the model from UpTrain to get all the stats, which will be helpful for your LLM response. In this post, we’ve captured the following KPIs –

        - Context Relevance Explanation
        - Factual Accuracy Explanation
        - Guideline Adherence Explanation
        - Response Completeness Explanation
        - Response Fluency Explanation
        - Response Relevance Explanation
        - Response Tonality Explanation
        # Function to extract and print all the keys and their values
        def extractPrintedData(data):
            for entry in data:
                print("Parsed Data:")
                for key, value in entry.items():
        
        
                    if key == 'score_context_relevance':
                        s_1_key_val = value
                    elif key == 'explanation_context_relevance':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_1_val = cleaned_value
                    elif key == 'score_factual_accuracy':
                        s_2_key_val = value
                    elif key == 'explanation_factual_accuracy':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_2_val = cleaned_value
                    elif key == 'score_response_completeness':
                        s_3_key_val = value
                    elif key == 'explanation_response_completeness':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_3_val = cleaned_value
                    elif key == 'score_response_relevance':
                        s_4_key_val = value
                    elif key == 'explanation_response_relevance':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_4_val = cleaned_value
                    elif key == 'score_critique_tone':
                        s_5_key_val = value
                    elif key == 'explanation_critique_tone':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_5_val = cleaned_value
                    elif key == 'score_fluency':
                        s_6_key_val = value
                    elif key == 'explanation_fluency':
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_6_val = cleaned_value
                    elif key == 'score_valid_response':
                        s_7_key_val = value
                    elif key == 'score_response_conciseness':
                        s_8_key_val = value
                    elif key == 'explanation_response_conciseness':
                        print('Raw Value: ', value)
                        cleaned_value = preprocessParseData(value)
                        print(f"{key}: {cleaned_value}\n")
                        s_8_val = cleaned_value
        
            print('$'*200)
        
            results = {
                "Factual_Accuracy_Score": s_2_key_val,
                "Factual_Accuracy_Explanation": s_2_val,
                "Context_Relevance_Score": s_1_key_val,
                "Context_Relevance_Explanation": s_1_val,
                "Response_Completeness_Score": s_3_key_val,
                "Response_Completeness_Explanation": s_3_val,
                "Response_Relevance_Score": s_4_key_val,
                "Response_Relevance_Explanation": s_4_val,
                "Response_Fluency_Score": s_6_key_val,
                "Response_Fluency_Explanation": s_6_val,
                "Response_Tonality_Score": s_5_key_val,
                "Response_Tonality_Explanation": s_5_val,
                "Guideline_Adherence_Score": s_8_key_val,
                "Guideline_Adherence_Explanation": s_8_val,
                "Response_Match_Score": s_7_key_val
                # Add other evaluations similarly
            }
        
            return results

        The above method parsed the initial data from UpTrain before sending it to OpenAI for a better summary without changing any text returned by it.

        @app.route('/evaluate', methods=['POST'])
        def evaluate():
            data = request.json
        
            if not data:
                return {jsonify({'error': 'No data provided'}), 400}
        
            # Extracting input data for processing (just an example of logging received data)
            question = data.get('question', '')
            context = data.get('context', '')
            llmResponse = ''
            personaResponse = data.get('personaResponse', '')
            guideline = data.get('guideline', '')
            groundTruth = data.get('groundTruth', '')
            evaluationMethod = data.get('evaluationMethod', '')
        
            print('question:')
            print(question)
        
            llmResponse = askFeluda(context, question)
            print('='*200)
            print('Response from Feluda::')
            print(llmResponse)
            print('='*200)
        
            # Getting Context LLM
            cLLM = evalContextRelevance(question, context, llmResponse, personaResponse)
        
            print('&'*200)
            print('cLLM:')
            print(cLLM)
            print(type(cLLM))
            print('&'*200)
        
            results = extractPrintedData(cLLM)
        
            print('JSON::')
            print(results)
        
            resJson = jsonify(results)
        
            return resJson

        The above function is the main method, which first receives all the input parameters from the react app & then invokes one-by-one functions to get the LLM response, and LLM performance & finally summarizes them before sending it to react-app.

        For any other scripts, please refer to the above-mentioned GitHub link.


        Let us see some of the screenshots of the test run –


        So, we’ve done it.

        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Enabling & Exploring Stable Defussion – Part 1

        This new solution will evaluate the power of Stable Defussion, which is created solutions as we progress & refine our prompt from scratch by using Stable Defussion & Python. This post opens new opportunities for IT companies & business start-ups looking to deliver solutions & have better performance compared to the paid version of Stable Defussion AI’s API performance. This project is for the advanced Python, Stable Defussion for data Science Newbies & AI evangelists.

        In a series of posts, I’ll explain and focus on the Stable Defussion API and custom solution using the Python-based SDK of Stable Defussion.

        But, before that, let us view the video that it generates from the prompt by using the third-party API:

        Prompt to Video

        And, let us understand the prompt that we supplied to create the above video –

        Isn’t it exciting?

        However, I want to stress this point: the video generated by the Stable Defusion (Stability AI) API was able to partially apply the animation effect. Even though the animation applies to the cloud, It doesn’t apply the animation to the wave. But, I must admit, the quality of the video is quite good.


        Let us understand the code and how we run the solution, and then we can try to understand its performance along with the other solutions later in the subsequent series.

        As you know, we’re exploring the code base of the third-party API, which will actually execute a series of API calls that create a video out of the prompt.

        Let us understand some of the important snippet –

        class clsStabilityAIAPI:
            def __init__(self, STABLE_DIFF_API_KEY, OUT_DIR_PATH, FILE_NM, VID_FILE_NM):
                self.STABLE_DIFF_API_KEY = STABLE_DIFF_API_KEY
                self.OUT_DIR_PATH = OUT_DIR_PATH
                self.FILE_NM = FILE_NM
                self.VID_FILE_NM = VID_FILE_NM
        
            def delFile(self, fileName):
                try:
                    # Deleting the intermediate image
                    os.remove(fileName)
        
                    return 0 
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 1
        
            def generateText2Image(self, inputDescription):
                try:
                    STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
                    fullFileName = self.OUT_DIR_PATH + self.FILE_NM
                    
                    if STABLE_DIFF_API_KEY is None:
                        raise Exception("Missing Stability API key.")
                    
                    response = requests.post(f"{api_host}/v1/generation/{engine_id}/text-to-image",
                                            headers={
                                                "Content-Type": "application/json",
                                                "Accept": "application/json",
                                                "Authorization": f"Bearer {STABLE_DIFF_API_KEY}"
                                                },
                                                json={
                                                    "text_prompts": [{"text": inputDescription}],
                                                    "cfg_scale": 7,
                                                    "height": 1024,
                                                    "width": 576,
                                                    "samples": 1,
                                                    "steps": 30,
                                                    },)
                    
                    if response.status_code != 200:
                        raise Exception("Non-200 response: " + str(response.text))
                    
                    data = response.json()
        
                    for i, image in enumerate(data["artifacts"]):
                        with open(fullFileName, "wb") as f:
                            f.write(base64.b64decode(image["base64"]))      
                    
                    return fullFileName
        
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 'N/A'
        
            def image2VideoPassOne(self, imgNameWithPath):
                try:
                    STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
        
                    response = requests.post(f"https://api.stability.ai/v2beta/image-to-video",
                                            headers={"authorization": f"Bearer {STABLE_DIFF_API_KEY}"},
                                            files={"image": open(imgNameWithPath, "rb")},
                                            data={"seed": 0,"cfg_scale": 1.8,"motion_bucket_id": 127},
                                            )
                    
                    print('First Pass Response:')
                    print(str(response.text))
                    
                    genID = response.json().get('id')
        
                    return genID 
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 'N/A'
        
            def image2VideoPassTwo(self, genId):
                try:
                    generation_id = genId
                    STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
                    fullVideoFileName = self.OUT_DIR_PATH + self.VID_FILE_NM
        
                    response = requests.request("GET", f"https://api.stability.ai/v2beta/image-to-video/result/{generation_id}",
                                                headers={
                                                    'accept': "video/*",  # Use 'application/json' to receive base64 encoded JSON
                                                    'authorization': f"Bearer {STABLE_DIFF_API_KEY}"
                                                    },) 
                    
                    print('Retrieve Status Code: ', str(response.status_code))
                    
                    if response.status_code == 202:
                        print("Generation in-progress, try again in 10 seconds.")
        
                        return 5
                    elif response.status_code == 200:
                        print("Generation complete!")
                        with open(fullVideoFileName, 'wb') as file:
                            file.write(response.content)
        
                        print("Successfully Retrieved the video file!")
        
                        return 0
                    else:
                        raise Exception(str(response.json()))
                    
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 1

        Now, let us understand the code –

        This function is called when an object of the class is created. It initializes four properties:

        • STABLE_DIFF_API_KEY: the API key for Stability AI services.
        • OUT_DIR_PATH: the folder path to save files.
        • FILE_NM: the name of the generated image file.
        • VID_FILE_NM: the name of the generated video file.

        This function deletes a file specified by fileName.

        • If successful, it returns 0.
        • If an error occurs, it logs the error and returns 1.

        This function generates an image based on a text description:

        • Sends a request to the Stability AI text-to-image endpoint using the API key.
        • Saves the resulting image to a file.
        • Returns the file’s path on success or 'N/A' if an error occurs.

        This function uploads an image to create a video in its first phase:

        • Sends the image to Stability AI’s image-to-video endpoint.
        • Logs the response and extracts the id (generation ID) for the next phase.
        • Returns the id if successful or 'N/A' on failure.

        This function retrieves the video created in the second phase using the genId:

        • Checks the video generation status from the Stability AI endpoint.
        • If complete, saves the video file and returns 0.
        • If still processing, returns 5.
        • Logs and returns 1 for any errors.

        As you can see, the code is pretty simple to understand & we’ve taken all the necessary actions in case of any unforeseen network issues or even if the video is not ready after our job submission in the following lines of the main calling script (generateText2VideoAPI.py) –

        waitTime = 10
        time.sleep(waitTime)
        
        # Failed case retry
        retries = 1
        success = False
        
        try:
            while not success:
                try:
                    z = r1.image2VideoPassTwo(gID)
                except Exception as e:
                    success = False
        
                if z == 0:
                    success = True
                else:
                    wait = retries * 2 * 15
                    str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"
        
                    print(str_R1)
        
                    time.sleep(wait)
                    retries += 1
        
                # Checking maximum retries
                if retries >= maxRetryNo:
                    success = True
                    raise  Exception
        except:
            print()

        And, let us see how the run looks like –

        Let us understand the CPU utilization –

        As you can see, CPU utilization is minimal since most tasks are at the API end.


        So, we’ve done it. 🙂

        Please find the next series on this topic below:

        Enabling & Exploring Stable Defussion – Part 2

        Enabling & Exploring Stable Defussion – Part 3

        Please let me know your feedback after reviewing all the posts! 🙂

        Building solutions using LLM AutoGen in Python – Part 1

        Today, I’ll be publishing a series of posts on LLM agents and how they can help you improve your delivery capabilities for various tasks.

        Also, we’re providing the demo here –

        Isn’t it exciting?


        The application will interact with the AutoGen agents, use underlying Open AI APIs to follow the instructions, generate the steps, and then follow that path to generate the desired code. Finally, it will execute the generated scripts if the first outcome of the demo satisfies users.


        Let us understand some of the key snippets –

        # Create the assistant agent
        assistant = autogen.AssistantAgent(
            name="AI_Assistant",
            llm_config={
                "config_list": config_list,
            }
        )

        Purpose: This line creates an AI assistant agent named “AI_Assistant”.

        Function: It uses a language model configuration provided in config_list to define how the assistant behaves.

        Role: The assistant serves as the primary agent who will coordinate with other agents to solve problems.

        user_proxy = autogen.UserProxyAgent(
            name="Admin",
            system_message=templateVal_1,
            human_input_mode="TERMINATE",
            max_consecutive_auto_reply=10,
            is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"),
            code_execution_config={
                "work_dir": WORK_DIR,
                "use_docker": False,
            },
        )

        Purpose: This code creates a user proxy agent named “Admin”.

        Function:

        • System Message: Uses templateVal_1 as its initial message to set the context.
        • Human Input Mode: Set to "TERMINATE", meaning it will keep interacting until a termination condition is met.
        • Auto-Reply Limit: Can automatically reply up to 10 times without human intervention.
        • Termination Condition: A message is considered a termination message if it ends with the word “TERMINATE”.
        • Code Execution: Configured to execute code in the directory specified by WORK_DIR without using Docker.

        Role: Acts as an intermediary between the user and the assistant, handling interactions and managing the conversation flow.

        engineer = autogen.AssistantAgent(
            name="Engineer",
            llm_config={
                "config_list": config_list,
            },
            system_message=templateVal_2,
        )

        Purpose: Creates an assistant agent named “Engineer”.

        Function: Uses templateVal_2 as its system message to define its expertise in engineering matters.

        Role: Specializes in technical and engineering aspects of the problem.

        game_designer = autogen.AssistantAgent(
            name="GameDesigner",
            llm_config={
                "config_list": config_list,
            },
            system_message=templateVal_3,
        )

        Purpose: Creates an assistant agent named “GameDesigner”.

        Function: Uses templateVal_3 to set its focus on game design.

        Role: Provides insights and solutions related to game design aspects.

        planner = autogen.AssistantAgent(
            name="Planer",
            llm_config={
                "config_list": config_list,
            },
            system_message=templateVal_4,
        )

        Purpose: Creates an assistant agent named “Planer” (likely intended to be “Planner”).

        Function: Uses templateVal_4 to define its role in planning.

        Role: Responsible for organizing and planning tasks to solve the problem.

        critic = autogen.AssistantAgent(
            name="Critic",
            llm_config={
                "config_list": config_list,
            },
            system_message=templateVal_5,
        )

        Purpose: Creates an assistant agent named “Critic”.

        Function: Uses templateVal_5 to set its function as a critic.

        Role: Provide feedback, critique solutions, and help improve the overall response.

        logging.basicConfig(level=logging.ERROR)
        logger = logging.getLogger(__name__)

        Purpose: Configures the logging system.

        Function: Sets the logging level to only capture error messages to avoid cluttering the output.

        Role: Helps in debugging by capturing and displaying error messages.

        def buildAndPlay(self, inputPrompt):
            try:
                user_proxy.initiate_chat(
                    assistant,
                    message=f"We need to solve the following problem: {inputPrompt}. "
                            "Please coordinate with the admin, engineer, game_designer, planner and critic to provide a comprehensive solution. "
                )
        
                return 0
            except Exception as e:
                x = str(e)
                print('Error: <<Real-time Translation>>: ', x)
        
                return 1

        Purpose: Defines a method to initiate the problem-solving process.

        Function:

        • Parameters: Takes inputPrompt, which is the problem to be solved.
        • Action:
          • Calls user_proxy.initiate_chat() to start a conversation between the user proxy agent and the assistant agent.
          • Sends a message requesting coordination among all agents to provide a comprehensive solution to the problem.
        • Error Handling: If an exception occurs, it prints an error message and returns 1.

        Role: Initiates collaboration among all agents to solve the provided problem.

        Agents Setup: Multiple agents with specialized roles are created.
        Initiating Conversation: The buildAndPlay method starts a conversation, asking agents to collaborate.
        Problem Solving: Agents communicate and coordinate to provide a comprehensive solution to the input problem.
        Error Handling: The system captures and logs any errors that occur during execution.


        We’ll continue to discuss this topic in the upcoming post.

        I’ll bring some more exciting topics in the coming days from the Python verse.

        Till then, Happy Avenging! 🙂

        Navigating the Future of Work: Insights from the Argyle AI Summit

        At the recent Argyle AI Summit, a prestigious event in the AI industry, I had the honor of participating as a speaker alongside esteemed professionals like Misha Leybovich from Google Labs. The summit, coordinated by Sylvia Das Chagas, a former senior AI conversation designer at CVS Health, provided an enlightening platform to discuss the evolving role of AI in talent management. Our session focused on the theme “Driving Talent with AI,” addressing some of the most pressing questions in the field. Frequently, relevant use cases were shared in detail to support these threads.

        To view the actual page, please click the following link.

        One of the critical topics we explored was AI’s impact on talent management in the upcoming year. AI’s influence in hiring and retention is becoming increasingly significant. For example, AI-powered tools can now analyze vast amounts of data to identify the best candidates for a role, going beyond traditional resume screening. In retention, AI is instrumental in identifying patterns that indicate an employee’s likelihood to leave, enabling proactive measures.

        A burning question in AI is how leaders address fears that AI might replace manual jobs. We discussed the importance of leaders framing AI as a complement to human skills rather than a replacement. AI enhances employee capabilities by automating mundane tasks, allowing employees to focus on more creative and strategic work.

        Regarding new AI tools that organizations should watch out for, the conversation highlighted tools that enhance remote collaboration and workplace inclusivity. Tools like virtual meeting assistants that can transcribe, translate, and summarize meetings in real time are becoming invaluable in today’s global work environment.

        AI’s role in boosting employee motivation and productivity was another focal point. We discussed how AI-driven career development programs can offer personalized learning paths, helping employees grow and stay motivated.

        Incorporating multiple languages in tools like ChatGPT was highlighted as a critical step towards inclusivity. This expansion allows a broader range of employees to interact with AI tools in their native language, fostering a more inclusive workplace environment.

        Lastly, we tackled the challenge of addressing employees’ reluctance to change. Emphasizing the importance of transparent communication and education about AI’s benefits was identified as key. Organizations can alleviate fears and encourage a more accepting attitude towards AI by involving employees in the AI implementation process and providing training.

        The Argyle AI Summit offered a compelling glimpse into the future of AI in talent management. The session provided valuable insights for leaders looking to harness AI’s potential to enhance talent management strategies by discussing real-world examples and strategies. To gain more in-depth knowledge and perspectives shared during this summit, I encourage interested parties to visit the recorded session link for a more comprehensive understanding.

        Or, you can directly view it from here –


        I would greatly appreciate your feedback on the insights shared during the summit. Your thoughts and perspectives are invaluable as we continue to explore and navigate the evolving landscape of AI in the workplace.

        Validating source data against RAG-response using Open AI, GloVe, FAISS using Python

        Today, I’ll be presenting another exciting capability of architecture in the world of LLMs, where you need to answer one crucial point & that is how valid the response generated by these LLMs is against your data. This response is critical when discussing business growth & need to take the right action at the right time.

        Why not view the demo before going through it?

        Demo

        Isn’t it exciting? Great! Let us understand this in detail.

        The first dotted box (extreme-left) represents the area that talks about the data ingestion from different sources, including third-party PDFs. It is expected that organizations should have ready-to-digest data sources. Examples: Data Lake, Data Mart, One Lake, or any other equivalent platforms. Those PDFs will provide additional insights beyond the conventional advanced analytics.

        You need to have some kind of OCR solution that will extract all the relevant information in the form of text from the documents. 

        The next important part is how you define the chunking & embedding of data chunks into Vector DB. Chunking & indexing strategies, along with the overlapping chain, play a crucial importance in tying that segregated piece of context into a single context that will be fed into the source for your preferred LLMs.

        This system employs a vector similarity search to browse through unstructured information and concurrently accesses the database to retrieve the context, ensuring that the responses are not only comprehensive but also anchored in validated knowledge.

        This approach is particularly vital for addressing multi-hop questions, where a single query can be broken down into multiple sub-questions and may require information from numerous documents to generate an accurate answer.


        pip install openai==0.27.8
        pip install pandas==2.0.3
        pip install tensorflow==2.11.1
        pip install faiss-cpu==1.7.4
        pip install gensim==4.3.2

        Let us understand the key class & snippets.

        • clsFeedVectorDB.py (This is the main class that will invoke the Faiss framework to contextualize the docs inside the vector DB with the source file name to validate the answer from Gen AI using Globe.6B embedding models.)

        Let us understand some of the key snippets from the above script (Full scripts will be available in the GitHub Repo) –

        # Sample function to convert text to a vector
        def text2Vector(self, text):
            # Encode the text using the tokenizer
            words = [word for word in text.lower().split() if word in self.model]
        
            # If no words in the model, return a zero vector
            if not words:
                return np.zeros(self.model.vector_size)
        
            # Compute the average of the word vectors
            vector = np.mean([self.model[word] for word in words], axis=0)
            return vector.reshape(1, -1)

        This code is for a function called “text2Vector” that takes some text as input and converts it into a numerical vector. Let me break it down step by step:

        • It starts by taking some text as input, and this text is expected to be a sentence or a piece of text.
        • The text is then split into individual words, and each word is converted to lowercase.
        • It checks if each word is present in a pre-trained language model (probably a word embedding model like Word2Vec or GloVe). If a word is not in the model, it’s ignored.
        • If none of the words from the input text are found in the model, the function returns a vector filled with zeros. This vector has the same size as the word vectors in the model.
        • If there are words from the input text in the model, the function calculates the average vector of these words. It does this by taking the word vectors for each word found in the model and computing their mean (average). This results in a single vector that represents the input text.
        • Finally, the function reshapes this vector into a 2D array with one row and as many columns as there are elements in the vector. The reason for this reshaping is often related to compatibility with other parts of the code or libraries used in the project.

        So, in simple terms, this function takes a piece of text, looks up the word vectors for the words in that text, and calculates the average of those vectors to create a single numerical representation of the text. If none of the words are found in the model, it returns a vector of zeros.

            def genData(self):
                try:
                    basePath = self.basePath
                    modelFileName = self.modelFileName
                    vectorDBPath = self.vectorDBPath
                    vectorDBFileName = self.vectorDBFileName
        
                    # Create a FAISS index
                    dimension = int(cf.conf['NO_OF_MODEL_DIM'])  # Assuming 100-dimensional vectors 
                    index = faiss.IndexFlatL2(dimension)
        
                    print('*' * 240)
                    print('Vector Index Your Data for Retrieval:')
                    print('*' * 240)
        
                    FullVectorDBname = vectorDBPath + vectorDBFileName
                    indexFile = str(vectorDBPath) + str(vectorDBFileName) + '.index'
        
                    print('File: ', str(indexFile))
        
                    data = {}
                    # List all files in the specified directory
                    files = os.listdir(basePath)
        
                    # Filter out files that are not text files
                    text_files = [file for file in files if file.endswith('.txt')]
        
                    # Read each text file
                    for file in text_files:
                        file_path = os.path.join(basePath, file)
                        print('*' * 240)
                        print('Processing File:')
                        print(str(file_path))
                        try:
                            # Attempt to open with utf-8 encoding
                            with open(file_path, 'r', encoding='utf-8') as file:
                                for line_number, line in enumerate(file, start=1):
                                    # Assume each line is a separate document
                                    vector = self.text2Vector(line)
                                    vector = vector.reshape(-1)
                                    index_id = index.ntotal
        
                                    index.add(np.array([vector]))  # Adding the vector to the index
                                    data[index_id] = {'text': line, 'line_number': line_number, 'file_name': file_path}  # Storing the line and file name
                        except UnicodeDecodeError:
                            # If utf-8 fails, try a different encoding
                            try:
                                with open(file_path, 'r', encoding='ISO-8859-1') as file:
                                    for line_number, line in enumerate(file, start=1):
                                        # Assume each line is a separate document
                                        vector = self.text2Vector(line)
                                        vector = vector.reshape(-1)
                                        index_id = index.ntotal
                                        index.add(np.array([vector]))  # Adding the vector to the index
                                        data[index_id] = {'text': line, 'line_number': line_number, 'file_name': file_path}  # Storing the line and file name
                            except Exception as e:
                                print(f"Could not read file {file}: {e}")
                                continue
        
                        print('*' * 240)
        
                    # Save the data dictionary using pickle
                    dataCache = vectorDBPath + modelFileName
                    with open(dataCache, 'wb') as f:
                        pickle.dump(data, f)
        
                    # Save the index and data for later use
                    faiss.write_index(index, indexFile)
        
                    print('*' * 240)
        
                    return 0
        
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 1
        • This code defines a function called genData, and its purpose is to prepare and store data for later retrieval using a FAISS index. Let’s break down what it does step by step:
        • It starts by assigning several variables, such as basePath, modelFileName, vectorDBPath, and vectorDBFileName. These variables likely contain file paths and configuration settings.
        • It creates a FAISS index with a specified dimension (assuming 100-dimensional vectors in this case) using faiss.IndexFlatL2. FAISS is a library for efficient similarity search and clustering of high-dimensional data.
        • It prints the file name and lines where the index will be stored. It initializes an empty dictionary called data to store information about the processed text data.
        • It lists all the files in a directory specified by basePath. It filters out only the files that have a “.txt” extension as text files.
        • It then reads each of these text files one by one. For each file:
        1. It attempts to open the file with UTF-8 encoding.
          • It reads the file line by line.
          • For each line, it calls a function text2Vector to convert the text into a numerical vector representation. This vector is added to the FAISS index.
          • It also stores some information about the line, such as the line number and the file name, in the data dictionary.
          • If there is an issue with UTF-8 encoding, it tries to open the file with a different encoding, “ISO-8859-1”. The same process of reading and storing data continues.
        • If there are any exceptions (errors) during this process, it prints an error message but continues processing other files.
        • Once all the files are processed, it saves the data dictionary using the pickle library to a file specified by dataCache.
        • It also saves the FAISS index to a file specified by indexFile.
        • Finally, it returns 0 if the process completes successfully or 1 if there was an error during execution.

        In summary, this function reads text files, converts their contents into numerical vectors, and builds a FAISS index for efficient similarity search. It also saves the processed data and the index for later use. If there are any issues during the process, it prints error messages but continues processing other files.

        • clsRAGOpenAI.py (This is the main class that will invoke the RAG class, which will get the contexts with references including source files, line numbers, and source texts. This will help the customer to validate the source against the OpenAI response to understand & control the data bias & other potential critical issues.)

        Let us understand some of the key snippets from the above script (Full scripts will be available in the GitHub Repo) –

        def ragAnswerWithHaystackAndGPT3(self, queryVector, k, question):
            modelName = self.modelName
            maxToken = self.maxToken
            temp = self.temp
        
            # Assuming getTopKContexts is a method that returns the top K contexts
            contexts = self.getTopKContexts(queryVector, k)
            messages = []
        
            # Add contexts as system messages
            for file_name, line_number, text in contexts:
                messages.append({"role": "system", "content": f"Document: {file_name} \nLine Number: {line_number} \nContent: {text}"})
        
            prompt = self.generateOpenaiPrompt(queryVector, k)
            prompt = prompt + "Question: " + str(question) + ". \n Answer based on the above documents."
        
            # Add user question
            messages.append({"role": "user", "content": prompt})
        
            # Create chat completion
            completion = client.chat.completions.create(
            model=modelName,
            messages=messages,
            temperature = temp,
            max_tokens = maxToken
            )
        
            # Assuming the last message in the response is the answer
            last_response = completion.choices[0].message.content
            source_refernces = ['FileName: ' + str(context[0]) + ' - Line Numbers: ' + str(context[1]) + ' - Source Text (Reference): ' + str(context[2]) for context in contexts]
        
            return last_response, source_refernces
        • This code defines a function called ragAnswerWithHaystackAndGPT3. Its purpose is to use a combination of the Haystack search method and OpenAI’s GPT-3 model to generate an answer to a user’s question. Let’s break down what it does step by step:
        • It starts by assigning several variables, such as modelName, maxToken, and temp. These variables likely contain model-specific information and settings for GPT-3.
        • It calls a method getTopKContexts to retrieve the top K contexts (which are likely documents or pieces of text) related to the user’s query. These contexts are stored in the contexts variable.
        • It initializes an empty list called messages to store messages that will be used in the conversation with the GPT-3 model.
        • It iterates through each context and adds them as system messages to the messages list. These system messages provide information about the documents or sources being used in the conversation.
        • It creates a prompt that combines the query, retrieved contexts, and the user’s question. This prompt is then added as a user message to the messages list. It effectively sets up the conversation for GPT-3, where the user’s question is followed by context.
        • It makes a request to the GPT-3 model using the client.chat.completions.create method, passing in the model name, the constructed messages, and other settings such as temperature and maximum tokens.
        • After receiving a response from GPT-3, it assumes that the last message in the response contains the answer generated by the model.
        • It also constructs source_references, which is a list of references to the documents or sources used in generating the answer. This information includes the file name, line numbers, and source text for each context.
        • Finally, it returns the generated answer (last_response) and the source references to the caller.

        In summary, this function takes a user’s query, retrieves relevant contexts or documents, sets up a conversation with GPT-3 that includes the query and contexts, and then uses GPT-3 to generate an answer. It also provides references to the sources used in generating the answer.

            def getTopKContexts(self, queryVector, k):
                try:
                    distances, indices = index.search(queryVector, k)
                    resDict = [(data[i]['file_name'], data[i]['line_number'], data[i]['text']) for i in indices[0]]
                    return resDict
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return x

        This code defines a function called getTopKContexts. Its purpose is to retrieve the top K relevant contexts or pieces of information from a pre-built index based on a query vector. Here’s a breakdown of what it does:

        1. It takes two parameters as input: queryVector, which is a numerical vector representing a query, and k, which specifies how many relevant contexts to retrieve.
        2. Inside a try-except block, it attempts the following steps:
          • It uses the index.search method to find the top K closest contexts to the given queryVector. This method returns two arrays: distances (measuring how similar the contexts are to the query) and indices (indicating the positions of the closest contexts in the data).
          • It creates a list called “resDict", which contains tuples for each of the top K contexts. Each tuple contains three pieces of information: the file name (file_name), the line number (line_number), and the text content (text) of the context. These details are extracted from a data dictionary.
        3. If the process completes successfully, it returns the list of top K contexts (resDict) to the caller.
        4. If there’s an exception (an error) during this process, it captures the error message as a string (x), prints the error message, and then returns the error message itself.

        In summary, this function takes a query vector and finds the K most relevant contexts or pieces of information based on their similarity to the query. It returns these contexts as a list of tuples containing file names, line numbers, and text content. If there’s an error, it prints an error message and returns the error message string.

        def generateOpenaiPrompt(self, queryVector, k):
            contexts = self.getTopKContexts(queryVector, k)
            template = ct.templateVal_1
            prompt = template
            for file_name, line_number, text in contexts:
                prompt += f"Document: {file_name}\n Line Number: {line_number} \n Content: {text}\n\n"
            return prompt

        This code defines a function called generateOpenaiPrompt. Its purpose is to create a prompt or a piece of text that combines a template with information from the top K relevant contexts retrieved earlier. Let’s break down what it does:

        1. It starts by calling the getTopKContexts function to obtain the top K relevant contexts based on a given queryVector.
        2. It initializes a variable called template with a predefined template value (likely defined elsewhere in the code).
        3. It sets the prompt variable to the initial template.
        4. Then, it enters a loop where it iterates through each of the relevant contexts retrieved earlier (contexts are typically documents or text snippets).
        5. For each context, it appends information to the prompt. Specifically, it adds lines to the prompt that include:
          • The document’s file name (Document: [file_name]).
          • The line number within the document (Line Number: [line_number]).
          • The content of the context itself (Content: [text]).
        6. It adds some extra spacing (newlines) between each context to ensure readability.
        7. Finally, it returns the complete – prompt, which is a combination of the template and information from the relevant contexts.

        In summary, this function takes a query vector, retrieves relevant contexts, and creates a prompt by combining a template with information from these contexts. This prompt can then be used as input for an AI model or system, likely for generating responses or answers based on the provided context.

        Let us understand the directory structure of this entire application –


        To learn more about this package, please visit the following GitHub link.

        So, finally, we’ve done it. I know that this post is relatively smaller than my earlier post. But, I think, you can get a good hack to improve some of your long-running jobs by applying this trick.

        I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

        Till then, Happy Avenging! 🙂

        RAG implementation of LLMs by using Python, Haystack & React (Part – 2)

        Today, we’ll share the second installment of the RAG implementation. If you are new here, please visit the previous post for full context.

        In this post, we’ll be discussing the Haystack framework more. Again, before discussing the main context, I want to present the demo here.

        Demo

        Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process, where today, we’ll pay our primary attention.

        As you can see today, we’ll discuss the red dotted line, which contextualizes the source data into the Vector DBs.

        Let us understand the flow of events here –

        1. The main Python application will consume the nested JSON by invoking the museum API in multiple threads.
        2. The application will clean the nested data & extract the relevant attributes after flattening the JSON.
        3. It will create the unstructured text-based context, which is later fed to the Vector DB framework.

        pip install farm-haystack==1.19.0
        pip install Flask==2.2.5
        pip install Flask-Cors==4.0.0
        pip install Flask-JWT-Extended==4.5.2
        pip install Flask-Session==0.5.0
        pip install openai==0.27.8
        pip install pandas==2.0.3
        pip install tensorflow==2.11.1

        We’re using the Metropolitan Museum API to feed the data to our Vector DB. For more information, please visit the following link. And this is free to use & moreover, we’re using it for education scenarios.


        We’ll discuss the tokenization part highlighted in a red dotted line from the above picture.

        We’ll discuss the scripts in the diagram as part of the flow mentioned above.

        • clsExtractJSON.py (This is the main class that will extract the content from the museum API using parallel calls.)
        def genData(self):
            try:
                base_url = self.base_url
                header_token = self.header_token
                basePath = self.basePath
                outputPath = self.outputPath
                mergedFile = self.mergedFile
                subdir = self.subdir
                Ind = self.Ind
                var_1 = datetime.now().strftime("%H.%M.%S")
        
        
                devVal = list()
                objVal = list()
        
                # Main Details
                headers = {'Cookie':header_token}
                payload={}
        
                url = base_url + '/departments'
        
                date_ranges = self.generateFirstDayOfLastTenYears()
        
                # Getting all the departments
                try:
                    print('Department URL:')
                    print(str(url))
        
                    response = requests.request("GET", url, headers=headers, data=payload)
                    parsed_data = json.loads(response.text)
        
                    print('Department JSON:')
                    print(str(parsed_data))
        
                    # Extract the "departmentId" values into a Python list
                    for dept_det in parsed_data['departments']:
                        for info in dept_det:
                            if info == 'departmentId':
                                devVal.append(dept_det[info])
        
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
                    devVal = list()
        
                # List to hold thread objects
                threads = []
        
                # Calling the Data using threads
                for dep in devVal:
                    t = threading.Thread(target=self.getDataThread, args=(dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind,))
                    threads.append(t)
                    t.start()
        
                # Wait for all threads to complete
                for t in threads:
                    t.join()
        
                res = self.mergeCsvFilesInDirectory(basePath, outputPath, mergedFile)
        
                if res == 0:
                    print('Successful!')
                else:
                    print('Failure!')
        
                return 0
        
            except Exception as e:
                x = str(e)
                print('Error: ', x)
        
                return 1

        The above code translates into the following steps –

        1. The above method first calls the generateFirstDayOfLastTenYears() plan to populate records for every department after getting all the unique departments by calling another API.
        2. Then, it will call the getDataThread() methods to fetch all the relevant APIs simultaneously to reduce the overall wait time & create individual smaller files.
        3. Finally, the application will invoke the mergeCsvFilesInDirectory() method to merge all the chunk files into one extensive historical data.
        def generateFirstDayOfLastTenYears(self):
            yearRange = self.yearRange
            date_format = "%Y-%m-%d"
            current_year = datetime.now().year
        
            date_ranges = []
            for year in range(current_year - yearRange, current_year + 1):
                first_day_of_year_full = datetime(year, 1, 1)
                first_day_of_year = first_day_of_year_full.strftime(date_format)
                date_ranges.append(first_day_of_year)
        
            return date_ranges

        The first method will generate the first day of each year for the last ten years, including the current year.

        def getDataThread(self, dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind):
            try:
                cnt = 0
                cnt_x = 1
                var_1 = datetime.now().strftime("%H.%M.%S")
        
                for x_start_date in date_ranges:
                    try:
                        urlM = base_url + '/objects?metadataDate=' + str(x_start_date) + '&departmentIds=' + str(dep)
        
                        print('Nested URL:')
                        print(str(urlM))
        
                        response_obj = requests.request("GET", urlM, headers=headers, data=payload)
                        objectDets = json.loads(response_obj.text)
        
                        for obj_det in objectDets['objectIDs']:
                            objVal.append(obj_det)
        
                        for objId in objVal:
                            urlS = base_url + '/objects/' + str(objId)
        
                            print('Final URL:')
                            print(str(urlS))
        
                            response_det = requests.request("GET", urlS, headers=headers, data=payload)
                            objDetJSON = response_det.text
        
                            retDB = self.createData(objDetJSON)
                            retDB['departmentId'] = str(dep)
        
                            if cnt == 0:
                                df_M = retDB
                            else:
                                d_frames = [df_M, retDB]
                                df_M = pd.concat(d_frames)
        
                            if cnt == 1000:
                                cnt = 0
                                clog.logr('df_M_' + var_1 + '_' + str(cnt_x) + '_' + str(dep) +'.csv', Ind, df_M, subdir)
                                cnt_x += 1
                                df_M = pd.DataFrame()
        
                            cnt += 1
        
                    except Exception as e:
                        x = str(e)
                        print('Error X:', x)
                return 0
        
            except Exception as e:
                x = str(e)
                print('Error: ', x)
        
                return 1

        The above method will invoke the individual API call to fetch the relevant artifact information.

        def mergeCsvFilesInDirectory(self, directory_path, output_path, output_file):
            try:
                csv_files = [file for file in os.listdir(directory_path) if file.endswith('.csv')]
                data_frames = []
        
                for file in csv_files:
                    encodings_to_try = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252']
                    for encoding in encodings_to_try:
                        try:
                            FullFileName = directory_path + file
                            print('File Name: ', FullFileName)
                            df = pd.read_csv(FullFileName, encoding=encoding)
                            data_frames.append(df)
                            break  # Stop trying other encodings if the reading is successful
                        except UnicodeDecodeError:
                            continue
        
                if not data_frames:
                    raise Exception("Unable to read CSV files. Check encoding or file format.")
        
                merged_df = pd.concat(data_frames, ignore_index=True)
        
                merged_full_name = os.path.join(output_path, output_file)
                merged_df.to_csv(merged_full_name, index=False)
        
                for file in csv_files:
                    os.remove(os.path.join(directory_path, file))
        
                return 0
        
            except Exception as e:
                x = str(e)
                print('Error: ', x)
                return 1

        The above method will merge all the small files into a single, more extensive historical data that contains over ten years of data (the first day of ten years of data, to be precise).

        For the complete code, please visit the GitHub.

        • 1_ReadMuseumJSON.py (This is the main class that will invoke the class, which will extract the content from the museum API using parallel calls.)
        #########################################################
        #### Written By: SATYAKI DE                          ####
        #### Written On: 27-Jun-2023                         ####
        #### Modified On 28-Jun-2023                         ####
        ####                                                 ####
        #### Objective: This is the main calling             ####
        #### python script that will invoke the              ####
        #### shortcut application created inside MAC         ####
        #### enviornment including MacBook, IPad or IPhone.  ####
        ####                                                 ####
        #########################################################
        import datetime
        from clsConfigClient import clsConfigClient as cf
        
        import clsExtractJSON as cej
        
        ########################################################
        ################    Global Area   ######################
        ########################################################
        
        cJSON = cej.clsExtractJSON()
        
        basePath = cf.conf['DATA_PATH']
        outputPath = cf.conf['OUTPUT_PATH']
        mergedFile = cf.conf['MERGED_FILE']
        
        ########################################################
        ################  End Of Global Area   #################
        ########################################################
        
        # Disbling Warning
        def warn(*args, **kwargs):
            pass
        
        import warnings
        warnings.warn = warn
        
        def main():
            try:
                var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                print('*'*120)
                print('Start Time: ' + str(var))
                print('*'*120)
        
                r1 = cJSON.genData()
        
                if r1 == 0:
                    print()
                    print('Successfully Scrapped!')
                else:
                    print()
                    print('Failed to Scrappe!')
        
                print('*'*120)
                var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                print('End Time: ' + str(var1))
        
            except Exception as e:
                x = str(e)
                print('Error: ', x)
        
        if __name__ == '__main__':
            main()
        

        The above script calls the main class after instantiating the class.

        • clsCreateList.py (This is the main class that will extract the relevant attributes from the historical files & then create the right input text to create the documents for contextualize into the Vector DB framework.)
        def createRec(self):
            try:
                basePath = self.basePath
                fileName = self.fileName
                Ind = self.Ind
                subdir = self.subdir
                base_url = self.base_url
                outputPath = self.outputPath
                mergedFile = self.mergedFile
                cleanedFile = self.cleanedFile
        
                FullFileName = outputPath + mergedFile
        
                df = pd.read_csv(FullFileName)
                df2 = df[listCol]
                dfFin = df2.drop_duplicates().reset_index(drop=True)
        
                dfFin['artist_URL'] = dfFin['artistWikidata_URL'].combine_first(dfFin['artistULAN_URL'])
                dfFin['object_URL'] = dfFin['objectURL'].combine_first(dfFin['objectWikidata_URL'])
                dfFin['Wiki_URL'] = dfFin['Wikidata_URL'].combine_first(dfFin['AAT_URL']).combine_first(dfFin['URL']).combine_first(dfFin['object_URL'])
        
                # Dropping the old Dtype Columns
                dfFin.drop(['artistWikidata_URL'], axis=1, inplace=True)
                dfFin.drop(['artistULAN_URL'], axis=1, inplace=True)
                dfFin.drop(['objectURL'], axis=1, inplace=True)
                dfFin.drop(['objectWikidata_URL'], axis=1, inplace=True)
                dfFin.drop(['AAT_URL'], axis=1, inplace=True)
                dfFin.drop(['Wikidata_URL'], axis=1, inplace=True)
                dfFin.drop(['URL'], axis=1, inplace=True)
        
                # Save the filtered DataFrame to a new CSV file
                #clog.logr(cleanedFile, Ind, dfFin, subdir)
                res = self.addHash(dfFin)
        
                if res == 0:
                    print('Added Hash!')
                else:
                    print('Failed to add hash!')
        
                # Generate the text for each row in the dataframe
                for _, row in dfFin.iterrows():
                    x = self.genPrompt(row)
                    self.addDocument(x, cleanedFile)
        
                return documents
        
            except Exception as e:
                x = str(e)
                print('Record Error: ', x)
        
                return documents

        The above code will read the data from the extensive historical file created from the earlier steps & then it will clean the file by removing all the duplicate records (if any) & finally, it will create three unique URLs that constitute artist, object & wiki.

        Also, this application will remove the hyperlink with a specific hash value, which will feed into the vector DB. Vector DB could be better with the URLs. Hence, we will store the URLs in a separate file by storing the associate hash value & later, we’ll fetch it in a lookup from the open AI response.

        Then, this application will generate prompts dynamically & finally create the documents for later steps of vector DB consumption by invoking the addDocument() methods.

        For more details, please visit the GitHub link.

        • 1_1_testCreateRec.py (This is the main class that will call the above class.)
        #########################################################
        #### Written By: SATYAKI DE                          ####
        #### Written On: 27-Jun-2023                         ####
        #### Modified On 28-Jun-2023                         ####
        ####                                                 ####
        #### Objective: This is the main calling             ####
        #### python script that will invoke the              ####
        #### shortcut application created inside MAC         ####
        #### enviornment including MacBook, IPad or IPhone.  ####
        ####                                                 ####
        #########################################################
        
        from clsConfigClient import clsConfigClient as cf
        import clsL as log
        import clsCreateList as ccl
        
        from datetime import datetime, timedelta
        
        # Disbling Warning
        def warn(*args, **kwargs):
            pass
        
        import warnings
        warnings.warn = warn
        
        ###############################################
        ###           Global Section                ###
        ###############################################
        
        #Initiating Logging Instances
        clog = log.clsL()
        cl = ccl.clsCreateList()
        
        var = datetime.now().strftime(".%H.%M.%S")
        
        documents = []
        
        ###############################################
        ###    End of Global Section                ###
        ###############################################
        def main():
            try:
                var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                print('*'*120)
                print('Start Time: ' + str(var))
                print('*'*120)
        
                print('*'*240)
                print('Creating Index store:: ')
                print('*'*240)
        
                documents = cl.createRec()
        
                print('Inserted Sample Records: ')
                print(str(documents))
                print('\n')
        
                r1 = len(documents)
        
                if r1 > 0:
                    print()
                    print('Successfully Indexed sample records!')
                else:
                    print()
                    print('Failed to sample Indexed recrods!')
        
                print('*'*120)
                var1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                print('End Time: ' + str(var1))
        
            except Exception as e:
                x = str(e)
                print('Error: ', x)
        
        if __name__ == '__main__':
            main()
        

        The above script invokes the main class after instantiating it & invokes the createRec() methods to tokenize the data into the vector DB.

        This above test script will be used to test the above clsCreateList class. However, the class will be used inside another class.

        – Satyaki
        • clsFeedVectorDB.py (This is the main class that will feed the documents into the vector DB.)
        #########################################################
        #### Written By: SATYAKI DE                          ####
        #### Written On: 27-Jun-2023                         ####
        #### Modified On 28-Sep-2023                         ####
        ####                                                 ####
        #### Objective: This is the main calling             ####
        #### python script that will invoke the              ####
        #### haystack frameowrk to contextulioze the docs    ####
        #### inside the vector DB.                           ####
        ####                                                 ####
        #########################################################
        
        from haystack.document_stores.faiss import FAISSDocumentStore
        from haystack.nodes import DensePassageRetriever
        import openai
        import pandas as pd
        import os
        import clsCreateList as ccl
        
        from clsConfigClient import clsConfigClient as cf
        import clsL as log
        
        from datetime import datetime, timedelta
        
        # Disbling Warning
        def warn(*args, **kwargs):
            pass
        
        import warnings
        warnings.warn = warn
        
        ###############################################
        ###           Global Section                ###
        ###############################################
        
        Ind = cf.conf['DEBUG_IND']
        openAIKey = cf.conf['OPEN_AI_KEY']
        
        os.environ["TOKENIZERS_PARALLELISM"] = "false"
        
        #Initiating Logging Instances
        clog = log.clsL()
        cl = ccl.clsCreateList()
        
        var = datetime.now().strftime(".%H.%M.%S")
        
        # Encode your data to create embeddings
        documents = []
        
        var_1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var_1))
        print('*'*120)
        
        print('*'*240)
        print('Creating Index store:: ')
        print('*'*240)
        
        documents = cl.createRec()
        
        print('Inserted Sample Records: ')
        print(documents[:5])
        print('\n')
        print('Type:')
        print(type(documents))
        
        r1 = len(documents)
        
        if r1 > 0:
            print()
            print('Successfully Indexed records!')
        else:
            print()
            print('Failed to Indexed recrods!')
        
        print('*'*120)
        var_2 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var_2))
        
        # Passing OpenAI API Key
        openai.api_key = openAIKey
        
        ###############################################
        ###    End of Global Section                ###
        ###############################################
        
        class clsFeedVectorDB:
            def __init__(self):
                self.basePath = cf.conf['DATA_PATH']
                self.modelFileName = cf.conf['CACHE_FILE']
                self.vectorDBPath = cf.conf['VECTORDB_PATH']
                self.vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
                self.queryModel = cf.conf['QUERY_MODEL']
                self.passageModel = cf.conf['PASSAGE_MODEL']
        
            def retrieveDocuments(self, question, retriever, top_k=3):
                return retriever.retrieve(question, top_k=top_k)
        
            def generateAnswerWithGPT3(self, retrievedDocs, question):
                documents_text = " ".join([doc.content for doc in retrievedDocs])
                prompt = f"Given the following documents: {documents_text}, answer the question: {question}"
        
                response = openai.Completion.create(
                    model="text-davinci-003",
                    prompt=prompt,
                    max_tokens=150
                )
                return response.choices[0].text.strip()
        
            def ragAnswerWithHaystackAndGPT3(self, question, retriever):
                retrievedDocs = self.retrieveDocuments(question, retriever)
                return self.generateAnswerWithGPT3(retrievedDocs, question)
        
            def genData(self, strVal):
                try:
                    basePath = self.basePath
                    modelFileName = self.modelFileName
                    vectorDBPath = self.vectorDBPath
                    vectorDBFileName = self.vectorDBFileName
                    queryModel = self.queryModel
                    passageModel = self.passageModel
        
                    print('*'*120)
                    print('Index Your Data for Retrieval:')
                    print('*'*120)
        
                    FullFileName = basePath + modelFileName
                    FullVectorDBname = vectorDBPath + vectorDBFileName
        
                    sqlite_path = "sqlite:///" + FullVectorDBname + '.db'
                    print('Vector DB Path: ', str(sqlite_path))
        
                    indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
                    indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"
        
                    print('File: ', str(indexFile))
                    print('Config: ', str(indexConfig))
        
                    # Initialize DocumentStore
                    document_store = FAISSDocumentStore(sql_url=sqlite_path)
        
                    libName = "vectorDB/" + str(vectorDBFileName) + '.faiss'
        
                    document_store.write_documents(documents)
        
                    # Initialize Retriever
                    retriever = DensePassageRetriever(document_store=document_store,
                                                      query_embedding_model=queryModel,
                                                      passage_embedding_model=passageModel,
                                                      use_gpu=False)
        
                    document_store.update_embeddings(retriever=retriever)
        
                    document_store.save(index_path=libName, config_path="vectorDB/" + str(vectorDBFileName) + ".json")
        
                    print('*'*120)
                    print('Testing with RAG & OpenAI...')
                    print('*'*120)
        
                    answer = self.ragAnswerWithHaystackAndGPT3(strVal, retriever)
        
                    print('*'*120)
                    print('Testing Answer:: ')
                    print(answer)
                    print('*'*120)
        
                    return 0
        
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return 1
        

        In the above script, the following essential steps took place –

        1. First, the application calls the clsCreateList class to store all the documents inside a dictionary.
        2. Then it stores the data inside the vector DB & creates & stores the model, which will be later reused (If you remember, we’ve used this as a model in our previous post).
        3. Finally, test with some sample use cases by providing the proper context to OpenAI & confirm the response.

        Here is a short clip of how the RAG models contextualize with the source data.

        RAG-Model Contextualization

        So, finally, we’ve done it.

        I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

        You will get the complete codebase in the following GitHub link.

        I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

        Till then, Happy Avenging! 🙂

        RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

        Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

        In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

        Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

        Demo

        Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

        As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

        Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

        Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

        The following are the important packages that are essential to this project –

        pip install farm-haystack==1.19.0
        pip install Flask==2.2.5
        pip install Flask-Cors==4.0.0
        pip install Flask-JWT-Extended==4.5.2
        pip install Flask-Session==0.5.0
        pip install openai==0.27.8
        pip install pandas==2.0.3
        pip install tensorflow==2.11.1

        We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

        Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

        • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
        #########################################################
        #### Written By: SATYAKI DE                          ####
        #### Written On: 27-Jun-2023                         ####
        #### Modified On 28-Jun-2023                         ####
        ####                                                 ####
        #### Objective: This is the main calling             ####
        #### python script that will invoke the              ####
        #### shortcut application created inside MAC         ####
        #### enviornment including MacBook, IPad or IPhone.  ####
        ####                                                 ####
        #########################################################
        
        from flask import Flask, jsonify, request, session
        from flask_cors import CORS
        from werkzeug.security import check_password_hash, generate_password_hash
        from flask_jwt_extended import JWTManager, jwt_required, create_access_token
        import pandas as pd
        from clsConfigClient import clsConfigClient as cf
        import clsL as log
        import clsContentScrapper as csc
        import clsRAGOpenAI as crao
        import csv
        from datetime import timedelta
        import os
        import re
        import json
        
        ########################################################
        ################    Global Area   ######################
        ########################################################
        #Initiating Logging Instances
        clog = log.clsL()
        
        admin_key = cf.conf['ADMIN_KEY']
        secret_key = cf.conf['SECRET_KEY']
        session_path = cf.conf['SESSION_PATH']
        sessionFile = cf.conf['SESSION_CACHE_FILE']
        
        app = Flask(__name__)
        CORS(app)  # This will enable CORS for all routes
        app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
        app.secret_key = secret_key
        
        jwt = JWTManager(app)
        
        users = cf.conf['USER_NM']
        passwd = cf.conf['USER_PWD']
        
        cCScrapper = csc.clsContentScrapper()
        cr = crao.clsRAGOpenAI()
        
        # Disbling Warning
        def warn(*args, **kwargs):
            pass
        
        import warnings
        warnings.warn = warn
        
        # Define the aggregation functions
        def join_unique(series):
            unique_vals = series.drop_duplicates().astype(str)
            return ', '.join(filter(lambda x: x != 'nan', unique_vals))
        
        # Building the preaggregate cache
        def groupImageWiki():
            try:
                base_path = cf.conf['OUTPUT_PATH']
                inputFile = cf.conf['CLEANED_FILE']
                outputFile = cf.conf['CLEANED_FILE_SHORT']
                subdir = cf.conf['SUBDIR_OUT']
                Ind = cf.conf['DEBUG_IND']
        
                inputCleanedFileLookUp = base_path + inputFile
        
                #Opening the file in dataframe
                df = pd.read_csv(inputCleanedFileLookUp)
                hash_values = df['Total_Hash'].unique()
        
                dFin = df[['primaryImage','Wiki_URL','Total_Hash']]
        
                # Ensure columns are strings and not NaN
                # Convert columns to string and replace 'nan' with an empty string
                dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
                dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')
        
                dFin.drop_duplicates()
        
                # Group by 'Total_Hash' and aggregate
                dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()
        
                return dfAgg
        
            except Exception as e:
                x = str(e)
                print('Error: ', x)
        
                df = pd.DataFrame()
        
                return df
        
        resDf = groupImageWiki()
        
        ########################################################
        ################  End  Global Area  ####################
        ########################################################
        
        def extractRemoveUrls(hash_value):
            image_urls = ''
            wiki_urls = ''
            # Parse the inner message JSON string
            try:
        
                resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
                filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]
        
                if not filtered_df.empty:
                    image_urls = filtered_df['primaryImage'].values[0]
                    wiki_urls = filtered_df['Wiki_URL'].values[0]
        
                return image_urls, wiki_urls
        
            except Exception as e:
                x = str(e)
                print('extractRemoveUrls Error: ', x)
                return image_urls, wiki_urls
        
        def isIncomplete(line):
            """Check if a line appears to be incomplete."""
        
            # Check if the line ends with certain patterns indicating it might be incomplete.
            incomplete_patterns = [': [Link](', ': Approximately ', ': ']
            return any(line.endswith(pattern) for pattern in incomplete_patterns)
        
        def filterData(data):
            """Return only the complete lines from the data."""
        
            lines = data.split('\n')
            complete_lines = [line for line in lines if not isIncomplete(line)]
        
            return '\n'.join(complete_lines)
        
        def updateCounter(sessionFile):
            try:
                counter = 0
        
                # Check if the CSV file exists
                if os.path.exists(sessionFile):
                    with open(sessionFile, 'r') as f:
                        reader = csv.reader(f)
                        for row in reader:
                            # Assuming the counter is the first value in the CSV
                            counter = int(row[0])
        
                # Increment counter
                counter += 1
        
                # Write counter back to CSV
                with open(sessionFile, 'w', newline='') as f:
                    writer = csv.writer(f)
                    writer.writerow([counter])
        
                return counter
            except Exception as e:
                x = str(e)
                print('Error: ', x)
        
                return 1
        
        def getPreviousResult():
            try:
                fullFileName = session_path + sessionFile
                newCounterValue = updateCounter(fullFileName)
        
                return newCounterValue
            except Exception as e:
                x = str(e)
                print('Error: ', x)
        
                return 1
        
        @app.route('/login', methods=['POST'])
        def login():
            username = request.json.get('username', None)
            password = request.json.get('password', None)
        
            print('User Name: ', str(username))
            print('Password: ', str(password))
        
            #if username not in users or not check_password_hash(users.get(username), password):
            if ((username not in users) or (password not in passwd)):
                return jsonify({'login': False}), 401
        
            access_token = create_access_token(identity=username)
            return jsonify(access_token=access_token)
        
        @app.route('/chat', methods=['POST'])
        def get_chat():
            try:
                #session["key"] = "1D98KI"
                #session_id = session.sid
                #print('Session Id: ', str(session_id))
        
                cnt = getPreviousResult()
                print('Running Session Count: ', str(cnt))
        
                username = request.json.get('username', None)
                message = request.json.get('message', None)
        
                print('User: ', str(username))
                print('Content: ', str(message))
        
                if cnt == 1:
                    retList = cCScrapper.extractCatalog()
                else:
                    hashValue, cleanedData = cr.getData(str(message))
                    print('Main Hash Value:', str(hashValue))
        
                    imageUrls, wikiUrls = extractRemoveUrls(hashValue)
                    print('Image URLs: ', str(imageUrls))
                    print('Wiki URLs: ', str(wikiUrls))
                    print('Clean Text:')
                    print(str(cleanedData))
                    retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'
        
                response = {
                    'message': retList
                }
        
                print('JSON: ', str(response))
                return jsonify(response)
        
            except Exception as e:
                x = str(e)
        
                response = {
                    'message': 'Error: ' + x
                }
                return jsonify(response)
        
        @app.route('/api/data', methods=['GET'])
        @jwt_required()
        def get_data():
            response = {
                'message': 'Hello from Flask!'
            }
            return jsonify(response)
        
        if __name__ == '__main__':
            app.run(debug=True)
        

        Let us understand some of the important sections of the above script –

        Function – login():

        The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

        Function – get_chat():

        The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

        Function – updateCounter():

        The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

        Function – extractRemoveUrls():

        The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

        • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
        #####################################################
        #### Written By: SATYAKI DE                      ####
        #### Written On: 27-May-2023                     ####
        #### Modified On 28-May-2023                     ####
        ####                                             ####
        #### Objective: This is the main calling         ####
        #### python class that will invoke the           ####
        #### LangChain of package to extract             ####
        #### the transcript from the YouTube videos &    ####
        #### then answer the questions based on the      ####
        #### topics selected by the users.               ####
        ####                                             ####
        #####################################################
        
        from langchain.document_loaders import YoutubeLoader
        from langchain.text_splitter import RecursiveCharacterTextSplitter
        from langchain.embeddings.openai import OpenAIEmbeddings
        from langchain.vectorstores import FAISS
        from langchain.chat_models import ChatOpenAI
        from langchain.chains import LLMChain
        
        from langchain.prompts.chat import (
            ChatPromptTemplate,
            SystemMessagePromptTemplate,
            HumanMessagePromptTemplate,
        )
        
        from googleapiclient.discovery import build
        
        import clsTemplate as ct
        from clsConfigClient import clsConfigClient as cf
        
        import os
        
        from flask import jsonify
        import requests
        
        ###############################################
        ###           Global Section                ###
        ###############################################
        open_ai_Key = cf.conf['OPEN_AI_KEY']
        os.environ["OPENAI_API_KEY"] = open_ai_Key
        embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)
        
        YouTube_Key = cf.conf['YOUTUBE_KEY']
        youtube = build('youtube', 'v3', developerKey=YouTube_Key)
        
        # Disbling Warning
        def warn(*args, **kwargs):
            pass
        
        import warnings
        warnings.warn = warn
        
        ###############################################
        ###    End of Global Section                ###
        ###############################################
        
        class clsContentScrapper:
            def __init__(self):
                self.model_name = cf.conf['MODEL_NAME']
                self.temp_val = cf.conf['TEMP_VAL']
                self.max_cnt = int(cf.conf['MAX_CNT'])
                self.url = cf.conf['BASE_URL']
                self.header_token = cf.conf['HEADER_TOKEN']
        
            def extractCatalog(self):
                try:
                    base_url = self.url
                    header_token = self.header_token
        
                    url = base_url + '/departments'
        
                    print('Full URL: ', str(url))
        
                    payload={}
                    headers = {'Cookie': header_token}
        
                    response = requests.request("GET", url, headers=headers, data=payload)
        
                    x = response.text
        
                    return x
                except Exception as e:
                    discussedTopic = []
                    x = str(e)
                    print('Error: ', x)
        
                    return x
        

        Let us understand the the core part that require from this class.

        Function – extractCatalog():

        The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

        • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
        #########################################################
        #### Written By: SATYAKI DE                          ####
        #### Written On: 27-Jun-2023                         ####
        #### Modified On 28-Jun-2023                         ####
        ####                                                 ####
        #### Objective: This is the main calling             ####
        #### python script that will invoke the              ####
        #### shortcut application created inside MAC         ####
        #### enviornment including MacBook, IPad or IPhone.  ####
        ####                                                 ####
        #########################################################
        
        from haystack.document_stores.faiss import FAISSDocumentStore
        from haystack.nodes import DensePassageRetriever
        import openai
        
        from clsConfigClient import clsConfigClient as cf
        import clsL as log
        
        # Disbling Warning
        def warn(*args, **kwargs):
            pass
        
        import warnings
        warnings.warn = warn
        
        import os
        import re
        ###############################################
        ###           Global Section                ###
        ###############################################
        Ind = cf.conf['DEBUG_IND']
        queryModel = cf.conf['QUERY_MODEL']
        passageModel = cf.conf['PASSAGE_MODEL']
        
        #Initiating Logging Instances
        clog = log.clsL()
        
        os.environ["TOKENIZERS_PARALLELISM"] = "false"
        
        vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
        
        indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
        indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"
        
        print('File: ', str(indexFile))
        print('Config: ', str(indexConfig))
        
        # Also, provide `config_path` parameter if you set it when calling the `save()` method:
        new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)
        
        # Initialize Retriever
        retriever = DensePassageRetriever(document_store=new_document_store,
                                          query_embedding_model=queryModel,
                                          passage_embedding_model=passageModel,
                                          use_gpu=False)
        
        
        ###############################################
        ###    End of Global Section                ###
        ###############################################
        
        class clsRAGOpenAI:
            def __init__(self):
                self.basePath = cf.conf['DATA_PATH']
                self.fileName = cf.conf['FILE_NAME']
                self.Ind = cf.conf['DEBUG_IND']
                self.subdir = str(cf.conf['OUT_DIR'])
                self.base_url = cf.conf['BASE_URL']
                self.outputPath = cf.conf['OUTPUT_PATH']
                self.vectorDBPath = cf.conf['VECTORDB_PATH']
                self.openAIKey = cf.conf['OPEN_AI_KEY']
                self.temp = cf.conf['TEMP_VAL']
                self.modelName = cf.conf['MODEL_NAME']
                self.maxToken = cf.conf['MAX_TOKEN']
        
            def extractHash(self, text):
                try:
                    # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
                    pattern = r"Ref: \{'(\d+)'\}"
                    match = re.search(pattern, text)
        
                    if match:
                        return match.group(1)
                    else:
                        return None
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return None
        
            def removeSentencesWithNaN(self, text):
                try:
                    # Split text into sentences using regular expression
                    sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
                    # Filter out sentences containing 'nan'
                    filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
                    # Rejoin the sentences
                    return ' '.join(filteredSentences)
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
        
                    return ''
        
            def retrieveDocumentsReader(self, question, top_k=9):
                return retriever.retrieve(question, top_k=top_k)
        
            def generateAnswerWithGPT3(self, retrieved_docs, question):
                try:
                    openai.api_key = self.openAIKey
                    temp = self.temp
                    modelName = self.modelName
                    maxToken = self.maxToken
        
                    documentsText = " ".join([doc.content for doc in retrieved_docs])
        
                    filteredDocs = self.removeSentencesWithNaN(documentsText)
                    hashValue = self.extractHash(filteredDocs)
        
                    print('RAG Docs:: ')
                    print(filteredDocs)
                    #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"
        
                    # Set up a chat-style prompt with your data
                    messages = [
                        {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                        {"role": "user", "content": filteredDocs}
                    ]
        
                    # Chat style invoking the latest model
                    response = openai.ChatCompletion.create(
                        model=modelName,
                        messages=messages,
                        temperature = temp,
                        max_tokens=maxToken
                    )
                    return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
                except Exception as e:
                    x = str(e)
                    print('failed to get from OpenAI: ', x)
                    return 'Not Available!'
        
            def ragAnswerWithHaystackAndGPT3(self, question):
                retrievedDocs = self.retrieveDocumentsReader(question)
                return self.generateAnswerWithGPT3(retrievedDocs, question)
        
            def getData(self, strVal):
                try:
                    print('*'*120)
                    print('Index Your Data for Retrieval:')
                    print('*'*120)
        
                    print('Response from New Docs: ')
                    print()
        
                    hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)
        
                    print('GPT3 Answer::')
                    print(answer)
                    print('Hash Value:')
                    print(str(hashValue))
        
                    print('*'*240)
                    print('End Of Use RAG to Generate Answers:')
                    print('*'*240)
        
                    return hashValue, answer
                except Exception as e:
                    x = str(e)
                    print('Error: ', x)
                    answer = x
                    hashValue = 1
        
                    return hashValue, answer
        

        Let us understand some of the important block –

        Function – ragAnswerWithHaystackAndGPT3():

        The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

        Function – generateAnswerWithGPT3():

        The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

        Function – retrieveDocumentsReader():

        The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

        • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
        // App.js
        import React, { useState } from 'react';
        import axios from 'axios';
        import './App.css';
        
        const App = () => {
          const [isLoggedIn, setIsLoggedIn] = useState(false);
          const [username, setUsername] = useState('');
          const [password, setPassword] = useState('');
          const [message, setMessage] = useState('');
          const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);
        
          const handleLogin = async (e) => {
            e.preventDefault();
            try {
              const response = await axios.post('http://localhost:5000/login', { username, password });
              if (response.status === 200) {
                setIsLoggedIn(true);
              }
            } catch (error) {
              console.error('Login error:', error);
            }
          };
        
          const sendMessage = async (username) => {
            if (message.trim() === '') return;
        
            // Create a new chat entry
            const newChatEntry = {
              sender: 'user',
              message: message.trim(),
            };
        
            // Clear the input field
            setMessage('');
        
            try {
              // Make API request to Python-based API
              const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
              const responseData = response.data;
        
              // Print the response to the console for debugging
              console.log('API Response:', responseData);
        
              // Parse the nested JSON from the 'message' attribute
              const jsonData = JSON.parse(responseData.message);
        
              // Check if the data contains 'departments'
              if (jsonData.departments) {
        
                // Extract the 'departments' attribute from the parsed data
                const departments = jsonData.departments;
        
                // Extract the department names and create a single string with line breaks
                const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');
        
                // Update the chat log with the bot's response
                setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
              }
              else if (jsonData.records)
              {
                // Data structure 2: Artwork information
                const records = jsonData.records;
        
                // Prepare chat entries
                const chatEntries = [];
        
                // Iterate through records and extract text, image, and wiki information
                records.forEach((record) => {
                  const textInfo = Object.entries(record).map(([key, value]) => {
                    if (key !== 'Image' && key !== 'Wiki') {
                      return `${key}: ${value}`;
                    }
                    return null;
                  }).filter((info) => info !== null).join('\n');
        
                  const imageLink = record.Image;
                  //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
                  //const wikiLinks = record.Wiki;
                  const wikiLinks = record.Wiki.split(',').map(link => link.trim());
        
                  console.log('Wiki:', wikiLinks);
        
                  // Check if there is a valid image link
                  const hasValidImage = imageLink && imageLink !== '[]';
        
                  const imageElement = hasValidImage ? (
                    <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
                  ) : null;
        
                  // Create JSX elements for rendering the wiki links (if available)
                  const wikiElements = wikiLinks.map((link, index) => (
                    <div key={index}>
                      <a href={link} target="_blank" rel="noopener noreferrer">
                        Wiki Link {index + 1}
                      </a>
                    </div>
                  ));
        
                  if (textInfo) {
                    chatEntries.push({ sender: 'bot', message: textInfo });
                  }
        
                  if (imageElement) {
                    chatEntries.push({ sender: 'bot', message: imageElement });
                  }
        
                  if (wikiElements.length > 0) {
                    chatEntries.push({ sender: 'bot', message: wikiElements });
                  }
                });
        
                // Update the chat log with the bot's response
                setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
              }
        
            } catch (error) {
              console.error('Error sending message:', error);
            }
          };
        
          if (!isLoggedIn) {
            return (
              <div className="login-container">
                <h2>Welcome to the MuBot</h2>
                <form onSubmit={handleLogin} className="login-form">
                  <input
                    type="text"
                    placeholder="Enter your name"
                    value={username}
                    onChange={(e) => setUsername(e.target.value)}
                    required
                  />
                  <input
                    type="password"
                    placeholder="Enter your password"
                    value={password}
                    onChange={(e) => setPassword(e.target.value)}
                    required
                  />
                  <button type="submit">Login</button>
                </form>
              </div>
            );
          }
        
          return (
            <div className="chat-container">
              <div className="chat-header">
                <h2>Hello, {username}</h2>
                <h3>Chat with MuBot</h3>
              </div>
              <div className="chat-log">
                {chatLog.map((chatEntry, index) => (
                  <div
                    key={index}
                    className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
                  >
                    <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
                    <p className="chat-message">{chatEntry.message}</p>
                  </div>
                ))}
              </div>
              <div className="chat-input">
                <input
                  type="text"
                  placeholder="Type your message..."
                  value={message}
                  onChange={(e) => setMessage(e.target.value)}
                  onKeyPress={(e) => {
                    if (e.key === 'Enter') {
                      sendMessage();
                    }
                  }}
                />
                <button onClick={sendMessage}>Send</button>
              </div>
            </div>
          );
        };
        
        export default App;
        

        Please find some of the important logic –

        Function – handleLogin():

        The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

        Function – sendMessage():

        The sendMessage asynchronous function is designed to handle the user’s chat interaction:

        1. If the message is empty (after trimming spaces), the function exits without further action.
        2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
        3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
        4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
        5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
        6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
        7. Errors, if encountered, are logged to the console.

        This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


        Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


        So, finally, we’ve done it.

        I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

        You will get the complete codebase in the following GitHub link.

        I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

        Till then, Happy Avenging! 🙂

        Enabling OpenAI-based NLP engine with SIRI (MacBook/iPad/iPhone) through a proxy-driven restricted API using Python.

        Today, I’m very excited to demonstrate an effortless & new way to integrate SIRI with a controlled Open-AI exposed through a proxy API. So, why this is important; this will give you options to control your ChatGPT environment as per your principles & then you can use a load-balancer (if you want) & exposed that through proxy.

        In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.


        Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

        Demo

        Isn’t it fascinating? This approach will lead to a whole new ballgame, where you can add SIRI with an entirely new world of knowledge as per your requirements & expose them in a controlled way.

        FLOW OF EVENTS:

        Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

        As you can see, Apple Shortcuts triggered the requests through its voice app, which then translates the question to text & then it will invoke the ngrok proxy API, which will eventually trigger the controlled custom API built using Flask & Python to start the Open AI API.


        CODE:

        Why don’t we go through the code made accessible due to this new library for this particular use case?

        • clsConfigClient.py (This is the main calling Python script for the input parameters.)


        ################################################
        #### Written By: SATYAKI DE ####
        #### Written On: 15-May-2020 ####
        #### Modified On: 27-Jun-2023 ####
        #### ####
        #### Objective: This script is a config ####
        #### file, contains all the keys for ####
        #### personal OpenAI-based MAC-shortcuts ####
        #### enable bot. ####
        #### ####
        ################################################
        import os
        import platform as pl
        class clsConfigClient(object):
        Curr_Path = os.path.dirname(os.path.realpath(__file__))
        os_det = pl.system()
        if os_det == "Windows":
        sep = '\\'
        else:
        sep = '/'
        conf = {
        'APP_ID': 1,
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'DATA_PATH': Curr_Path + sep + 'data' + sep,
        'MODEL_PATH': Curr_Path + sep + 'model' + sep,
        'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
        'MODEL_DIR': 'model',
        'APP_DESC_1': 'LangChain Demo!',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'FILE_NAME': 'Output.csv',
        'MODEL_NAME': 'gpt-3.5-turbo',
        'OPEN_AI_KEY': "sk-Jdhfdyruru9383474HHFJFJFJO6jrlxPKbv6Bgvv",
        'TITLE': "LangChain Demo!",
        'TEMP_VAL': 0.2,
        'PATH' : Curr_Path,
        'MAX_TOKEN' : 60,
        'OUT_DIR': 'data'
        }

        Some of the important entries from the above snippet are as follows –

                'MODEL_NAME': 'gpt-3.5-turbo',
                'OPEN_AI_KEY': "sk-Jdhfdyruru9383474HHFJFJFJO6jrlxPKbv6Bgvv",
                'TEMP_VAL': 0.2,

        TEMP_VAL will help you to control the response in a more authentic manner. It varies between 0 to 1.

        • clsJarvis.py (This is the main calling Python script for the input parameters.)


        #####################################################
        #### Written By: SATYAKI DE ####
        #### Written On: 27-Jun-2023 ####
        #### Modified On 28-Jun-2023 ####
        #### ####
        #### Objective: This is the main calling ####
        #### python class that will invoke the ####
        #### Flask framework to expose the OpenAI ####
        #### API with more control & encapsulate the ####
        #### server IPs with proxy layers. ####
        #### ####
        #####################################################
        import openai
        from flask import request, jsonify
        from clsConfigClient import clsConfigClient as cf
        import os
        import clsTemplate as ct
        ###############################################
        ### Global Section ###
        ###############################################
        open_ai_Key = cf.conf['OPEN_AI_KEY']
        openai.api_key = open_ai_Key
        # Disbling Warning
        def warn(*args, **kwargs):
        pass
        import warnings
        warnings.warn = warn
        ###############################################
        ### End of Global Section ###
        ###############################################
        class clsJarvis:
        def __init__(self):
        self.model_name = cf.conf['MODEL_NAME']
        self.max_token = cf.conf['MAX_TOKEN']
        self.temp_val = cf.conf['TEMP_VAL']
        def extractContentInText(self, query):
        try:
        model_name = self.model_name
        max_token = self.max_token
        temp_val = self.temp_val
        template = ct.templateVal_1
        response = openai.ChatCompletion.create(model=model_name, temperature=temp_val, messages=[{"role": "system", "content": template},{"role": "user", "content": query}])
        inputJson = {"text": response['choices'][0]['message']['content']}
        return jsonify(inputJson)
        except Exception as e:
        discussedTopic = []
        x = str(e)
        print('Error: ', x)
        template = ct.templateVal_2
        inputJson = {"text": template}
        return jsonify(inputJson)

        view raw

        clsJarvis.py

        hosted with ❤ by GitHub

        The key snippets from the above script are as follows –

        def extractContentInText(self, query):
            try:
                model_name = self.model_name
                max_token = self.max_token
                temp_val = self.temp_val
        
                template = ct.templateVal_1
        
                response = openai.ChatCompletion.create(model=model_name, temperature=temp_val, messages=[{"role": "system", "content": template},{"role": "user", "content": query}])
                inputJson = {"text": response['choices'][0]['message']['content']}
        
                return jsonify(inputJson)
            except Exception as e:
                discussedTopic = []
                x = str(e)
                print('Error: ', x)
                template = ct.templateVal_2
        
                inputJson = {"text": template}
        
                return jsonify(inputJson)

        The provided Python code snippet defines a method extractContentInText, which interacts with OpenAI’s API to generate a response from OpenAI’s chat model to a user’s query. Here’s a summary of what it does:

        1. It fetches some predefined model configurations (model_name, max_token, temp_val). These are class attributes defined elsewhere.
        2. It sets a system message template (initial instruction for the AI model) using ct.templateVal_1. The ct object isn’t defined within this snippet but is likely another predefined object or module in the more extensive program.
        3. It then calls openai.ChatCompletion.create() to send messages to the AI model and generate a response. The statements include an initial system message and a user’s query.
        4. The model’s response is extracted and formatted into a JSON object inputJson where the ‘text’ field holds the AI’s response.
        5. The input JSON object returns a JSON response.

        If an error occurs at any stage of this process (caught in the except block), it prints the error, sets a fallback message template using ct.templateVal_2, formats this into a JSON object, and returns it as a JSON response.

        Note: The max_token variable is fetched but not used within the function; it might be a remnant of previous code or meant to be used in further development. The code also assumes a predefined ct object and a method called jsonify(), possibly from Flask, for formatting Python dictionaries into JSON format.

        • testJarvis.py (This is the main calling Python script.)


        #########################################################
        #### Written By: SATYAKI DE ####
        #### Written On: 27-Jun-2023 ####
        #### Modified On 28-Jun-2023 ####
        #### ####
        #### Objective: This is the main calling ####
        #### python script that will invoke the ####
        #### shortcut application created inside MAC ####
        #### enviornment including MacBook, IPad or IPhone. ####
        #### ####
        #########################################################
        import clsL as cl
        from clsConfigClient import clsConfigClient as cf
        import clsJarvis as jv
        import datetime
        from flask import Flask, request, jsonify
        app = Flask(__name__)
        # Disbling Warning
        def warn(*args, **kwargs):
        pass
        import warnings
        warnings.warn = warn
        ######################################
        ### Get your global values ####
        ######################################
        debug_ind = 'Y'
        # Initiating Logging Instances
        clog = cl.clsL()
        cJarvis = jv.clsJarvis()
        ######################################
        #### Global Flag ########
        ######################################
        @app.route('/openai', methods=['POST'])
        def openai_call():
        try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)
        data = request.get_json()
        print('Data::')
        print(data)
        prompt = data.get('prompt', '')
        print('Prompt::')
        print(prompt)
        res = cJarvis.extractContentInText(str(prompt))
        return res
        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))
        except Exception as e:
        x = str(e)
        print('Error: ', x)
        if __name__ == "__main__":
        app.run(host='0.0.0.0')

        view raw

        testJarvis.py

        hosted with ❤ by GitHub

        Please find the key snippets –

        @app.route('/openai', methods=['POST'])
        def openai_call():
            try:
                var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                print('*'*120)
                print('Start Time: ' + str(var))
                print('*'*120)
        
                data = request.get_json()
                print('Data::')
                print(data)
                prompt = data.get('prompt', '')
        
                print('Prompt::')
                print(prompt)
        
                res = cJarvis.extractContentInText(str(prompt))
        
                return res
        
                print('*'*120)
                var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                print('End Time: ' + str(var1))
        
            except Exception as e:
                x = str(e)
                print('Error: ', x)

        The provided Python code defines a route in a Flask web server that listens for POST requests at the ‘/openai’ endpoint. Here’s what it does in detail:

        1. It records and prints the current time, marking the start of the request handling.
        2. It retrieves the incoming data from the POST request as JSON with the request.get_json().
        3. It then extracts the ‘prompt’ from the JSON data. The request defaults to an empty string if no ‘prompt’ is provided in the request.
        4. The prompt is passed as an argument to the method extractContentInText() object cJarvis. This method is expected to use OpenAI’s API to generate a response from a model given the prompt (as discussed in your previous question). The result of this method call is stored in the variable res.
        5. The res variable (the model’s response) returns the answer to the client requesting the POST.
        6. It prints the current time again, marking the end of the request handling (However, this part of the code will never be executed as it places after a return statement).
        7. If an error occurs during this process, it catches the exception, converts it to a string, and prints the error message.

        The cJarvis object used in the cJarvis.extractContentInText(str(prompt)) call is not defined within this code snippet. It is a global object likely defined elsewhere in the more extensive program. The extractContentInText method is the one you shared in your previous question.

        Apple Shortcuts:

        Now, let us understand the steps in Apple Shortcuts.

        You can now set up a Siri Shortcut to call the URL provided by ngrok:

        1. Open the Shortcuts app on your iPhone.
        2. Tap the ‘+’ to create a new Shortcut.
        3. Add an action, search for “URL,” and select the URL action. Enter your ngrok URL here, with the /openai endpoint.
        4. Add another action, search for “Get Contents of URL.” This step will send a POST request to the URL from the previous activity. Set the method to POST and add a request body with type ‘JSON,’ containing a key ‘prompt’ and a value being the input you want to send to your OpenAI model.
        5. Optionally, you can add another action, “Show Result” or “Speak Text” to see/hear the result returned from your server.
        6. Save your Shortcut and give it a name.

        You should now be able to activate Siri and say the name of your Shortcut to have it send a request to your server, which will then send a prompt to the OpenAI API and return the response.

        Let us understand the “Get contents of” with easy postman screenshots –

        As you can see that the newly exposed proxy-API will receive an input named prompt, which will be passed from “Dictate Text.”


        So, finally, we’ve done it.

        I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

        You will get the complete codebase in the following GitHub link.

        I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

        Till then, Happy Avenging! 🙂

        Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.