AGENTIC AI IN THE ENTERPRISE: STRATEGY, ARCHITECTURE, AND IMPLEMENTATION – PART 2

This is a continuation of my previous post, which can be found here.

Let us recap the key takaways from our previous post –

Agentic AI refers to autonomous systems that pursue goals with minimal supervision by planning, reasoning about next steps, utilizing tools, and maintaining context across sessions. Core capabilities include goal-directed autonomy, interaction with tools and environments (e.g., APIs, databases, devices), multi-step planning and reasoning under uncertainty, persistence, and choiceful decision-making.

Architecturally, three modules coordinate intelligent behavior: Sensing (perception pipelines that acquire multimodal data, extract salient patterns, and recognize entities/events); Observation/Deliberation (objective setting, strategy formation, and option evaluation relative to resources and constraints); and Action (execution via software interfaces, communications, or physical actuation to deliver outcomes). These functions are enabled by machine learning, deep learning, computer vision, natural language processing, planning/decision-making, uncertainty reasoning, and simulation/modeling.

At enterprise scale, open standards align autonomy with governance: the Model Context Protocol (MCP) grants an agent secure, principled access to enterprise tools and data (vertical integration), while Agent-to-Agent (A2A) enables specialized agents to coordinate, delegate, and exchange information (horizontal collaboration). Together, MCP and A2A help organizations transition from isolated pilots to scalable programs, delivering end-to-end automation, faster integration, enhanced security and auditability, vendor-neutral interoperability, and adaptive problem-solving that responds to real-time context.

Great! Let’s dive into this topic now.

Enterprise AI with MCP refers to the application of the Model Context Protocol (MCP), an open standard, to enable AI systems to securely and consistently access external enterprise data and applications. 

Before MCP, enterprise AI integration was characterized by a “many-to-many” or “N x M” problem. Companies had to build custom, fragile, and costly integrations between each AI model and every proprietary data source, which was not scalable. These limitations left AI agents with limited, outdated, or siloed information, restricting their potential impact. 
MCP addresses this by offering a standardized architecture for AI and data systems to communicate with each other.

The MCP framework uses a client-server architecture to enable communication between AI models and external tools and data sources. 

  • MCP Host: The AI-powered application or environment, such as an AI-enhanced IDE or a generative AI chatbot like Anthropic’s Claude or OpenAI’s ChatGPT, where the user interacts.
  • MCP Client: A component within the host application that manages the connection to MCP servers.
  • MCP Server: A lightweight service that wraps around an external system (e.g., a CRM, database, or API) and exposes its capabilities to the AI client in a standardized format, typically using JSON-RPC 2.0. 

An MCP server provides AI clients with three key resources: 

  • Resources: Structured or unstructured data that an AI can access, such as files, documents, or database records.
  • Tools: The functionality to perform specific actions within an external system, like running a database query or sending an email.
  • Prompts: Pre-defined text templates or workflows to help guide the AI’s actions. 
  • Standardized integration: Developers can build integrations against a single, open standard, which dramatically reduces the complexity and time required to deploy and scale AI initiatives.
  • Enhanced security and governance: MCP incorporates native support for security and compliance measures. It provides permission models, access control, and auditing capabilities to ensure AI systems only access data and tools within specified boundaries.
  • Real-time contextual awareness: By connecting AI agents to live enterprise data sources, MCP ensures they have access to the most current and relevant information, which reduces hallucinations and improves the accuracy of AI outputs.
  • Greater interoperability: MCP is model-agnostic & can be used with a variety of AI models (e.g., Anthropic’s Claude or OpenAI’s models) and across different cloud environments. This approach helps enterprises avoid vendor lock-in.
  • Accelerated development: The “build once, integrate everywhere” approach enables internal teams to focus on innovation instead of writing custom connectors for every system.

Let us understand one sample case & the flow of activities.

A customer support agent uses an AI assistant to get information about a customer’s recent orders. The AI assistant utilizes an MCP-compliant client to communicate with an MCP server, which is connected to the company’s PostgreSQL database.

1. User request: The support agent asks the AI assistant, “What was the most recent order placed by Priyanka Chopra Jonas?”

2. AI model processes intent: The AI assistant, running on an MCP host, analyzes the natural language query. It recognizes that to answer this question, it needs to perform a database query. It then identifies the appropriate tool from the MCP server’s capabilities. 

3. Client initiates tool call: The AI assistant’s MCP client sends a JSON-RPC request to the MCP server connected to the PostgreSQL database. The request specifies the tool to be used, such as get_customer_orders, and includes the necessary parameters: 

{
  "jsonrpc": "2.0",
  "method": "db_tools.get_customer_orders",
  "params": {
    "customer_name": "Priyanka Chopra Jonas",
    "sort_by": "order_date",
    "sort_order": "desc",
    "limit": 1
  },
  "id": "12345"
}

4. Server handles the request: The MCP server receives the request and performs several key functions: 

  • Authentication and authorization: The server verifies that the AI client and the user have permission to query the database.
  • Query translation: The server translates the standardized MCP request into a specific SQL query for the PostgreSQL database.
  • Query execution: The server executes the SQL query against the database.
SELECT order_id, order_date, total_amount
FROM orders
WHERE customer_name = 'Priyanka Chopra Jonas'
ORDER BY order_date DESC
LIMIT 1;

5. Database returns data: The PostgreSQL database executes the query and returns the requested data to the MCP server. 

6. Server formats the response: The MCP server receives the raw database output and formats it into a standardized JSON response that the MCP client can understand.

{
  "jsonrpc": "2.0",
  "result": {
    "data": [
      {
        "order_id": "98765",
        "order_date": "2025-08-25",
        "total_amount": 11025.50
      }
    ]
  },
  "id": "12345"
}

7. Client returns data to the model: The MCP client receives the JSON response and passes it back to the AI assistant’s language model. 

8. AI model generates final response: The language model incorporates this real-time data into its response and presents it to the user in a natural, conversational format. 

“Priyanka Chopra Jonas’s most recent order was placed on August 25, 2025, with an order ID of 98765, for a total of $11025.50.”

Using the Model Context Protocol (MCP) for database access introduces a layer of abstraction that affects performance in several ways. While it adds some latency and processing overhead, strategic implementation can mitigate these effects. For AI applications, the benefits often outweigh the costs, particularly in terms of improved accuracy, security, and scalability.

The MCP architecture introduces extra communication steps between the AI agent and the database, each adding a small amount of latency. 

  • RPC overhead: The JSON-RPC call from the AI’s client to the MCP server adds a small processing and network delay. This is an out-of-process request, as opposed to a simple local function call.
  • JSON serialization: Request and response data must be serialized and deserialized into JSON format, which requires processing time.
  • Network transit: For remote MCP servers, the data must travel over the network, adding latency. However, for a local or on-premise setup, this is minimal. The physical location of the MCP server relative to the AI model and the database is a significant factor.

The performance impact scales with the complexity and volume of the AI agent’s interactions. 

  • High request volume: A single AI agent working on a complex task might issue dozens of parallel database queries. In high-traffic scenarios, managing numerous simultaneous connections can strain system resources and require robust infrastructure.
  • Excessive data retrieval: A significant performance risk is an AI agent retrieving a massive dataset in a single query. This process can consume a large number of tokens, fill the AI’s context window, and cause bottlenecks at the database and client levels.
  • Context window usage: Tool definitions and the results of tool calls consume space in the AI’s context window. If a large number of tools are in use, this can limit the AI’s “working memory,” resulting in slower and less effective reasoning. 

Caching is a crucial strategy for mitigating the performance overhead of MCP. 

  • In-memory caching: The MCP server can cache results from frequent or expensive database queries in memory (e.g., using Redis or Memcached). This approach enables repeat requests to be served almost instantly without requiring a database hit.
  • Semantic caching: Advanced techniques can cache the results of previous queries and serve them for semantically similar future requests, reducing token consumption and improving speed for conversational applications. 

Designing the MCP server and its database interactions for efficiency is critical. 

  • Optimized SQL: The MCP server should generate optimized SQL queries. Database indexes should be utilized effectively to expedite lookups and minimize load.
  • Pagination and filtering: To prevent a single query from overwhelming the system, the MCP server should implement pagination. The AI agent can be prompted to use filtering parameters to retrieve only the necessary data.
  • Connection pooling: This technique reuses existing database connections instead of opening a new one for each request, thereby reducing latency and database load. 

For large-scale enterprise deployments, scaling is essential for maintaining performance. 

  • Multiple servers: The workload can be distributed across various MCP servers. One server could handle read requests, and another could handle writes.
  • Load balancing: A reverse proxy or other load-balancing solution can distribute incoming traffic across MCP server instances. Autoscaling can dynamically add or remove servers in response to demand.

For AI-driven tasks, a slight increase in latency for database access is often a worthwhile trade-off for significant gains. 

  • Improved accuracy: Accessing real-time, high-quality data through MCP leads to more accurate and relevant AI responses, reducing “hallucinations”.
  • Scalable ecosystem: The standardization of MCP reduces development overhead and allows for a more modular, scalable ecosystem, which saves significant engineering resources compared to building custom integrations.
  • Decoupled architecture: The MCP server decouples the AI model from the database, allowing each to be optimized and scaled independently. 

We’ll go ahead and conclude this post here & continue discussing on a further deep dive in the next post.

Till then, Happy Avenging! 🙂

Real-time video summary assistance App – Part 2

As a continuation of the previous post, I would like to continue my discussion about the implementation of MCP protocols among agents. But before that, I want to add the quick demo one more time to recap our objectives.

Let us recap the process flow –

Also, understand the groupings of scripts by each group as posted in the previous post –

Message-Chaining Protocol (MCP) Implementation:

    clsMCPMessage.py
    clsMCPBroker.py

YouTube Transcript Extraction:

    clsYouTubeVideoProcessor.py

Language Detection:

    clsLanguageDetector.py

Translation Services & Agents:

    clsTranslationAgent.py
    clsTranslationService.py

Documentation Agent:

    clsDocumentationAgent.py
    
Research Agent:

    clsDocumentationAgent.py

Great! Now, we’ll continue with the main discussion.


def extract_youtube_id(youtube_url):
    """Extract YouTube video ID from URL"""
    youtube_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url)
    if youtube_id_match:
        return youtube_id_match.group(1)
    return None

def get_youtube_transcript(youtube_url):
    """Get transcript from YouTube video"""
    video_id = extract_youtube_id(youtube_url)
    if not video_id:
        return {"error": "Invalid YouTube URL or ID"}
    
    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        
        # First try to get manual transcripts
        try:
            transcript = transcript_list.find_manually_created_transcript(["en"])
            transcript_data = transcript.fetch()
            print(f"Debug - Manual transcript format: {type(transcript_data)}")
            if transcript_data and len(transcript_data) > 0:
                print(f"Debug - First item type: {type(transcript_data[0])}")
                print(f"Debug - First item sample: {transcript_data[0]}")
            return {"text": transcript_data, "language": "en", "auto_generated": False}
        except Exception as e:
            print(f"Debug - No manual transcript: {str(e)}")
            # If no manual English transcript, try any available transcript
            try:
                available_transcripts = list(transcript_list)
                if available_transcripts:
                    transcript = available_transcripts[0]
                    print(f"Debug - Using transcript in language: {transcript.language_code}")
                    transcript_data = transcript.fetch()
                    print(f"Debug - Auto transcript format: {type(transcript_data)}")
                    if transcript_data and len(transcript_data) > 0:
                        print(f"Debug - First item type: {type(transcript_data[0])}")
                        print(f"Debug - First item sample: {transcript_data[0]}")
                    return {
                        "text": transcript_data, 
                        "language": transcript.language_code, 
                        "auto_generated": transcript.is_generated
                    }
                else:
                    return {"error": "No transcripts available for this video"}
            except Exception as e:
                return {"error": f"Error getting transcript: {str(e)}"}
    except Exception as e:
        return {"error": f"Error getting transcript list: {str(e)}"}

# ----------------------------------------------------------------------------------
# YouTube Video Processor
# ----------------------------------------------------------------------------------

class clsYouTubeVideoProcessor:
    """Process YouTube videos using the agent system"""
    
    def __init__(self, documentation_agent, translation_agent, research_agent):
        self.documentation_agent = documentation_agent
        self.translation_agent = translation_agent
        self.research_agent = research_agent
    
    def process_youtube_video(self, youtube_url):
        """Process a YouTube video"""
        print(f"Processing YouTube video: {youtube_url}")
        
        # Extract transcript
        transcript_result = get_youtube_transcript(youtube_url)
        
        if "error" in transcript_result:
            return {"error": transcript_result["error"]}
        
        # Start a new conversation
        conversation_id = self.documentation_agent.start_processing()
        
        # Process transcript segments
        transcript_data = transcript_result["text"]
        transcript_language = transcript_result["language"]
        
        print(f"Debug - Type of transcript_data: {type(transcript_data)}")
        
        # For each segment, detect language and translate if needed
        processed_segments = []
        
        try:
            # Make sure transcript_data is a list of dictionaries with text and start fields
            if isinstance(transcript_data, list):
                for idx, segment in enumerate(transcript_data):
                    print(f"Debug - Processing segment {idx}, type: {type(segment)}")
                    
                    # Extract text properly based on the type
                    if isinstance(segment, dict) and "text" in segment:
                        text = segment["text"]
                        start = segment.get("start", 0)
                    else:
                        # Try to access attributes for non-dict types
                        try:
                            text = segment.text
                            start = getattr(segment, "start", 0)
                        except AttributeError:
                            # If all else fails, convert to string
                            text = str(segment)
                            start = idx * 5  # Arbitrary timestamp
                    
                    print(f"Debug - Extracted text: {text[:30]}...")
                    
                    # Create a standardized segment
                    std_segment = {
                        "text": text,
                        "start": start
                    }
                    
                    # Process through translation agent
                    translation_result = self.translation_agent.process_text(text, conversation_id)
                    
                    # Update segment with translation information
                    segment_with_translation = {
                        **std_segment,
                        "translation_info": translation_result
                    }
                    
                    # Use translated text for documentation
                    if "final_text" in translation_result and translation_result["final_text"] != text:
                        std_segment["processed_text"] = translation_result["final_text"]
                    else:
                        std_segment["processed_text"] = text
                    
                    processed_segments.append(segment_with_translation)
            else:
                # If transcript_data is not a list, treat it as a single text block
                print(f"Debug - Transcript is not a list, treating as single text")
                text = str(transcript_data)
                std_segment = {
                    "text": text,
                    "start": 0
                }
                
                translation_result = self.translation_agent.process_text(text, conversation_id)
                segment_with_translation = {
                    **std_segment,
                    "translation_info": translation_result
                }
                
                if "final_text" in translation_result and translation_result["final_text"] != text:
                    std_segment["processed_text"] = translation_result["final_text"]
                else:
                    std_segment["processed_text"] = text
                
                processed_segments.append(segment_with_translation)
                
        except Exception as e:
            print(f"Debug - Error processing transcript: {str(e)}")
            return {"error": f"Error processing transcript: {str(e)}"}
        
        # Process the transcript with the documentation agent
        documentation_result = self.documentation_agent.process_transcript(
            processed_segments,
            conversation_id
        )
        
        return {
            "youtube_url": youtube_url,
            "transcript_language": transcript_language,
            "processed_segments": processed_segments,
            "documentation": documentation_result,
            "conversation_id": conversation_id
        }

Let us understand this step-by-step:

Part 1: Getting the YouTube Transcript

def extract_youtube_id(youtube_url):
    ...

This extracts the unique video ID from any YouTube link. 

def get_youtube_transcript(youtube_url):
    ...
  • This gets the actual spoken content of the video.
  • It tries to get a manual transcript first (created by humans).
  • If not available, it falls back to an auto-generated version (created by YouTube’s AI).
  • If nothing is found, it gives back an error message like: “Transcript not available.”

Part 2: Processing the Video with Agents

class clsYouTubeVideoProcessor:
    ...

This is like the control center that tells each intelligent agent what to do with the transcript. Here are the detailed steps:

1. Start the Process

def process_youtube_video(self, youtube_url):
    ...
  • The system starts with a YouTube video link.
  • It prints a message like: “Processing YouTube video: [link]”

2. Extract the Transcript

  • The system runs the get_youtube_transcript() function.
  • If it fails, it returns an error (e.g., invalid link or no subtitles available).

3. Start a “Conversation”

  • The documentation agent begins a new session, tracked by a unique conversation ID.
  • Think of this like opening a new folder in a shared team workspace to store everything related to this video.

4. Go Through Each Segment of the Transcript

  • The spoken text is often broken into small parts (segments), like subtitles.
  • For each part:
    • It checks the text.
    • It finds out the time that part was spoken.
    • It sends it to the translation agent to clean up or translate the text.

5. Translate (if needed)

  • If the translation agent finds a better or translated version, it replaces the original.
  • Otherwise, it keeps the original.

6. Prepare for Documentation

  • After translation, the segment is passed to the documentation agent.
  • This agent might:
    • Summarize the content,
    • Highlight important terms,
    • Structure it into a readable format.

7. Return the Final Result

The system gives back a structured package with:

  • The video link
  • The original language
  • The transcript in parts (processed and translated)
  • A documentation summary
  • The conversation ID (for tracking or further updates)

class clsDocumentationAgent:
    """Documentation Agent built with LangChain"""
    
    def __init__(self, agent_id: str, broker: clsMCPBroker):
        self.agent_id = agent_id
        self.broker = broker
        self.broker.register_agent(agent_id)
        
        # Initialize LangChain components
        self.llm = ChatOpenAI(
            model="gpt-4-0125-preview",
            temperature=0.1,
            api_key=OPENAI_API_KEY
        )
        
        # Create tools
        self.tools = [
            clsSendMessageTool(sender_id=self.agent_id, broker=self.broker)
        ]
        
        # Set up LLM with tools
        self.llm_with_tools = self.llm.bind(
            tools=[tool.tool_config for tool in self.tools]
        )
        
        # Setup memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        
        # Create prompt
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a Documentation Agent for YouTube video transcripts. Your responsibilities include:
                1. Process YouTube video transcripts
                2. Identify key points, topics, and main ideas
                3. Organize content into a coherent and structured format
                4. Create concise summaries
                5. Request research information when necessary
                
                When you need additional context or research, send a request to the Research Agent.
                Always maintain a professional tone and ensure your documentation is clear and organized.
            """),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        # Create agent
        self.agent = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: self.memory.load_memory_variables({})["chat_history"],
                "agent_scratchpad": lambda x: format_to_openai_tool_messages(x["intermediate_steps"]),
            }
            | self.prompt
            | self.llm_with_tools
            | OpenAIToolsAgentOutputParser()
        )
        
        # Create agent executor
        self.agent_executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            verbose=True,
            memory=self.memory
        )
        
        # Video data
        self.current_conversation_id = None
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
    def start_processing(self) -> str:
        """Start processing a new video"""
        self.current_conversation_id = str(uuid.uuid4())
        self.video_notes = {}
        self.key_points = []
        self.transcript_segments = []
        
        return self.current_conversation_id
    
    def process_transcript(self, transcript_segments, conversation_id=None):
        """Process a YouTube transcript"""
        if not conversation_id:
            conversation_id = self.start_processing()
        self.current_conversation_id = conversation_id
        
        # Store transcript segments
        self.transcript_segments = transcript_segments
        
        # Process segments
        processed_segments = []
        for segment in transcript_segments:
            processed_result = self.process_segment(segment)
            processed_segments.append(processed_result)
        
        # Generate summary
        summary = self.generate_summary()
        
        return {
            "processed_segments": processed_segments,
            "summary": summary,
            "conversation_id": conversation_id
        }
    
    def process_segment(self, segment):
        """Process individual transcript segment"""
        text = segment.get("text", "")
        start = segment.get("start", 0)
        
        # Use LangChain agent to process the segment
        result = self.agent_executor.invoke({
            "input": f"Process this video transcript segment at timestamp {start}s: {text}. If research is needed, send a request to the research_agent."
        })
        
        # Update video notes
        timestamp = start
        self.video_notes[timestamp] = {
            "text": text,
            "analysis": result["output"]
        }
        
        return {
            "timestamp": timestamp,
            "text": text,
            "analysis": result["output"]
        }
    
    def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
        """Handle an incoming MCP message"""
        if message.message_type == "research_response":
            # Process research information received from Research Agent
            research_info = message.content.get("text", "")
            
            result = self.agent_executor.invoke({
                "input": f"Incorporate this research information into video analysis: {research_info}"
            })
            
            # Send acknowledgment back to Research Agent
            response = clsMCPMessage(
                sender=self.agent_id,
                receiver=message.sender,
                message_type="acknowledgment",
                content={"text": "Research information incorporated into video analysis."},
                reply_to=message.id,
                conversation_id=message.conversation_id
            )
            
            self.broker.publish(response)
            return response
        
        elif message.message_type == "translation_response":
            # Process translation response from Translation Agent
            translation_result = message.content
            
            # Process the translated text
            if "final_text" in translation_result:
                text = translation_result["final_text"]
                original_text = translation_result.get("original_text", "")
                language_info = translation_result.get("language", {})
                
                result = self.agent_executor.invoke({
                    "input": f"Process this translated text: {text}\nOriginal language: {language_info.get('language', 'unknown')}\nOriginal text: {original_text}"
                })
                
                # Update notes with translation information
                for timestamp, note in self.video_notes.items():
                    if note["text"] == original_text:
                        note["translated_text"] = text
                        note["language"] = language_info
                        break
            
            return None
        
        return None
    
    def run(self):
        """Run the agent to listen for MCP messages"""
        print(f"Documentation Agent {self.agent_id} is running...")
        while True:
            message = self.broker.get_message(self.agent_id, timeout=1)
            if message:
                self.handle_mcp_message(message)
            time.sleep(0.1)
    
    def generate_summary(self) -> str:
        """Generate a summary of the video"""
        if not self.video_notes:
            return "No video data available to summarize."
        
        all_notes = "\n".join([f"{ts}: {note['text']}" for ts, note in self.video_notes.items()])
        
        result = self.agent_executor.invoke({
            "input": f"Generate a concise summary of this YouTube video, including key points and topics:\n{all_notes}"
        })
        
        return result["output"]

Let us understand the key methods in a step-by-step manner:

The Documentation Agent is like a smart assistant that watches a YouTube video, takes notes, pulls out important ideas, and creates a summary — almost like a professional note-taker trained to help educators, researchers, and content creators. It works with a team of other assistants, like a Translator Agent and a Research Agent, and they all talk to each other through a messaging system.

1. Starting to Work on a New Video

    def start_processing(self) -> str
    

    When a new video is being processed:

    • A new project ID is created.
    • Old notes and transcripts are cleared to start fresh.

    2. Processing the Whole Transcript

    def process_transcript(...)
    

    This is where the assistant:

    • Takes in the full transcript (what was said in the video).
    • Breaks it into small parts (like subtitles).
    • Sends each part to the smart brain for analysis.
    • Collects the results.
    • Finally, a summary of all the main ideas is created.

    3. Processing One Transcript Segment at a Time

    def process_segment(self, segment)
    

    For each chunk of the video:

    • The assistant reads the text and timestamp.
    • It asks GPT-4 to analyze it and suggest important insights.
    • It saves that insight along with the original text and timestamp.

    4. Handling Incoming Messages from Other Agents

    def handle_mcp_message(self, message)
    

    The assistant can also receive messages from teammates (other agents):

    If the message is from the Research Agent:

    • It reads new information and adds it to its notes.
    • It replies with a thank-you message to say it got the research.

    If the message is from the Translation Agent:

    • It takes the translated version of a transcript.
    • Updates its notes to reflect the translated text and its language.

    This is like a team of assistants emailing back and forth to make sure the notes are complete and accurate.

    5. Summarizing the Whole Video

    def generate_summary(self)
    

    After going through all the transcript parts, the agent asks GPT-4 to create a short, clean summary — identifying:

    • Main ideas
    • Key talking points
    • Structure of the content

    The final result is clear, professional, and usable in learning materials or documentation.


    class clsResearchAgent:
        """Research Agent built with AutoGen"""
        
        def __init__(self, agent_id: str, broker: clsMCPBroker):
            self.agent_id = agent_id
            self.broker = broker
            self.broker.register_agent(agent_id)
            
            # Configure AutoGen directly with API key
            if not OPENAI_API_KEY:
                print("Warning: OPENAI_API_KEY not set for ResearchAgent")
                
            # Create config list directly instead of loading from file
            config_list = [
                {
                    "model": "gpt-4-0125-preview",
                    "api_key": OPENAI_API_KEY
                }
            ]
            # Create AutoGen assistant for research
            self.assistant = AssistantAgent(
                name="research_assistant",
                system_message="""You are a Research Agent for YouTube videos. Your responsibilities include:
                    1. Research topics mentioned in the video
                    2. Find relevant information, facts, references, or context
                    3. Provide concise, accurate information to support the documentation
                    4. Focus on delivering high-quality, relevant information
                    
                    Respond directly to research requests with clear, factual information.
                """,
                llm_config={"config_list": config_list, "temperature": 0.1}
            )
            
            # Create user proxy to handle message passing
            self.user_proxy = UserProxyAgent(
                name="research_manager",
                human_input_mode="NEVER",
                code_execution_config={"work_dir": "coding", "use_docker": False},
                default_auto_reply="Working on the research request..."
            )
            
            # Current conversation tracking
            self.current_requests = {}
        
        def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
            """Handle an incoming MCP message"""
            if message.message_type == "request":
                # Process research request from Documentation Agent
                request_text = message.content.get("text", "")
                
                # Use AutoGen to process the research request
                def research_task():
                    self.user_proxy.initiate_chat(
                        self.assistant,
                        message=f"Research request for YouTube video content: {request_text}. Provide concise, factual information."
                    )
                    # Return last assistant message
                    return self.assistant.chat_messages[self.user_proxy.name][-1]["content"]
                
                # Execute research task
                research_result = research_task()
                
                # Send research results back to Documentation Agent
                response = clsMCPMessage(
                    sender=self.agent_id,
                    receiver=message.sender,
                    message_type="research_response",
                    content={"text": research_result},
                    reply_to=message.id,
                    conversation_id=message.conversation_id
                )
                
                self.broker.publish(response)
                return response
            
            return None
        
        def run(self):
            """Run the agent to listen for MCP messages"""
            print(f"Research Agent {self.agent_id} is running...")
            while True:
                message = self.broker.get_message(self.agent_id, timeout=1)
                if message:
                    self.handle_mcp_message(message)
                time.sleep(0.1)
    

    Let us understand the key methods in detail.

    1. Receiving and Responding to Research Requests

      def handle_mcp_message(self, message)
      

      When the Research Agent gets a message (like a question or request for info), it:

      1. Reads the message to see what needs to be researched.
      2. Asks GPT-4 to find helpful, accurate info about that topic.
      3. Sends the answer back to whoever asked the question (usually the Documentation Agent).

      class clsTranslationAgent:
          """Agent for language detection and translation"""
          
          def __init__(self, agent_id: str, broker: clsMCPBroker):
              self.agent_id = agent_id
              self.broker = broker
              self.broker.register_agent(agent_id)
              
              # Initialize language detector
              self.language_detector = clsLanguageDetector()
              
              # Initialize translation service
              self.translation_service = clsTranslationService()
          
          def process_text(self, text, conversation_id=None):
              """Process text: detect language and translate if needed, handling mixed language content"""
              if not conversation_id:
                  conversation_id = str(uuid.uuid4())
              
              # Detect language with support for mixed language content
              language_info = self.language_detector.detect(text)
              
              # Decide if translation is needed
              needs_translation = True
              
              # Pure English content doesn't need translation
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  needs_translation = False
              
              # For mixed language, check if it's primarily English
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  english_langs = [
                      lang for lang in language_info.get("languages", []) 
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-")
                  ]
                  
                  # If the highest confidence language is English and > 60% confident, don't translate
                  if english_langs and english_langs[0].get("confidence", 0) > 0.6:
                      needs_translation = False
              
              if needs_translation:
                  # Translate using the appropriate service based on language detection
                  translation_result = self.translation_service.translate(text, language_info)
                  
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": translation_result,
                      "final_text": translation_result.get("translated_text", text),
                      "conversation_id": conversation_id
                  }
              else:
                  # Already English or unknown language, return as is
                  return {
                      "original_text": text,
                      "language": language_info,
                      "translation": {"provider": "none"},
                      "final_text": text,
                      "conversation_id": conversation_id
                  }
          
          def handle_mcp_message(self, message: clsMCPMessage) -> Optional[clsMCPMessage]:
              """Handle an incoming MCP message"""
              if message.message_type == "translation_request":
                  # Process translation request from Documentation Agent
                  text = message.content.get("text", "")
                  
                  # Process the text
                  result = self.process_text(text, message.conversation_id)
                  
                  # Send translation results back to requester
                  response = clsMCPMessage(
                      sender=self.agent_id,
                      receiver=message.sender,
                      message_type="translation_response",
                      content=result,
                      reply_to=message.id,
                      conversation_id=message.conversation_id
                  )
                  
                  self.broker.publish(response)
                  return response
              
              return None
          
          def run(self):
              """Run the agent to listen for MCP messages"""
              print(f"Translation Agent {self.agent_id} is running...")
              while True:
                  message = self.broker.get_message(self.agent_id, timeout=1)
                  if message:
                      self.handle_mcp_message(message)
                  time.sleep(0.1)

      Let us understand the key methods in step-by-step manner:

      1. Understanding and Translating Text:

      def process_text(...)
      

      This is the core job of the agent. Here’s what it does with any piece of text:

      Step 1: Detect the Language

      • It tries to figure out the language of the input text.
      • It can handle cases where more than one language is mixed together, which is common in casual speech or subtitles.

      Step 2: Decide Whether to Translate

      • If the text is clearly in English, or it’s unclear what the language is, it decides not to translate.
      • If the text is mostly in another language or has less than 60% confidence in being English, it will translate it into English.

      Step 3: Translate (if needed)

      • If translation is required, it uses the translation service to do the job.
      • Then it packages all the information: the original text, detected language, the translated version, and a unique conversation ID.

      Step 4: Return the Results

      • If no translation is needed, it returns the original text and a note saying “no translation was applied.”

      2. Receiving Messages and Responding

      def handle_mcp_message(...)
      

      The agent listens for messages from other agents. When someone asks it to translate something:

      • It takes the text from the message.
      • Runs it through the process_text function (as explained above).
      • Sends the translated (or original) result to the person who asked.
      class clsTranslationService:
          """Translation service using multiple providers with support for mixed languages"""
          
          def __init__(self):
              # Initialize Sarvam AI client
              self.sarvam_api_key = SARVAM_API_KEY
              self.sarvam_url = "https://api.sarvam.ai/translate"
              
              # Initialize Google Cloud Translation client using simple HTTP requests
              self.google_api_key = GOOGLE_API_KEY
              self.google_translate_url = "https://translation.googleapis.com/language/translate/v2"
          
          def translate_with_sarvam(self, text, source_lang, target_lang="en-IN"):
              """Translate text using Sarvam AI (for Indian languages)"""
              if not self.sarvam_api_key:
                  return {"error": "Sarvam API key not set"}
              
              headers = {
                  "Content-Type": "application/json",
                  "api-subscription-key": self.sarvam_api_key
              }
              
              payload = {
                  "input": text,
                  "source_language_code": source_lang,
                  "target_language_code": target_lang,
                  "speaker_gender": "Female",
                  "mode": "formal",
                  "model": "mayura:v1"
              }
              
              try:
                  response = requests.post(self.sarvam_url, headers=headers, json=payload)
                  if response.status_code == 200:
                      return {"translated_text": response.json().get("translated_text", ""), "provider": "sarvam"}
                  else:
                      return {"error": f"Sarvam API error: {response.text}", "provider": "sarvam"}
              except Exception as e:
                  return {"error": f"Error calling Sarvam API: {str(e)}", "provider": "sarvam"}
          
          def translate_with_google(self, text, target_lang="en"):
              """Translate text using Google Cloud Translation API with direct HTTP request"""
              if not self.google_api_key:
                  return {"error": "Google API key not set"}
              
              try:
                  # Using the translation API v2 with API key
                  params = {
                      "key": self.google_api_key,
                      "q": text,
                      "target": target_lang
                  }
                  
                  response = requests.post(self.google_translate_url, params=params)
                  if response.status_code == 200:
                      data = response.json()
                      translation = data.get("data", {}).get("translations", [{}])[0]
                      return {
                          "translated_text": translation.get("translatedText", ""),
                          "detected_source_language": translation.get("detectedSourceLanguage", ""),
                          "provider": "google"
                      }
                  else:
                      return {"error": f"Google API error: {response.text}", "provider": "google"}
              except Exception as e:
                  return {"error": f"Error calling Google Translation API: {str(e)}", "provider": "google"}
          
          def translate(self, text, language_info):
              """Translate text to English based on language detection info"""
              # If already English or unknown language, return as is
              if language_info["language_code"] == "en-IN" or language_info["language_code"] == "unknown":
                  return {"translated_text": text, "provider": "none"}
              
              # Handle mixed language content
              if language_info.get("is_mixed", False) and language_info.get("languages", []):
                  # Strategy for mixed language: 
                  # 1. If one of the languages is English, don't translate the entire text, as it might distort English portions
                  # 2. If no English but contains Indian languages, use Sarvam as it handles code-mixing better
                  # 3. Otherwise, use Google Translate for the primary detected language
                  
                  has_english = False
                  has_indian = False
                  
                  for lang in language_info.get("languages", []):
                      if lang["language_code"] == "en-IN" or lang["language_code"].startswith("en-"):
                          has_english = True
                      if lang.get("is_indian", False):
                          has_indian = True
                  
                  if has_english:
                      # Contains English - use Google for full text as it handles code-mixing well
                      return self.translate_with_google(text)
                  elif has_indian:
                      # Contains Indian languages - use Sarvam
                      # Use the highest confidence Indian language as source
                      indian_langs = [lang for lang in language_info.get("languages", []) if lang.get("is_indian", False)]
                      if indian_langs:
                          # Sort by confidence
                          indian_langs.sort(key=lambda x: x.get("confidence", 0), reverse=True)
                          source_lang = indian_langs[0]["language_code"]
                          return self.translate_with_sarvam(text, source_lang)
                      else:
                          # Fallback to primary language
                          if language_info["is_indian"]:
                              return self.translate_with_sarvam(text, language_info["language_code"])
                          else:
                              return self.translate_with_google(text)
                  else:
                      # No English, no Indian languages - use Google for primary language
                      return self.translate_with_google(text)
              else:
                  # Not mixed language - use standard approach
                  if language_info["is_indian"]:
                      # Use Sarvam AI for Indian languages
                      return self.translate_with_sarvam(text, language_info["language_code"])
                  else:
                      # Use Google for other languages
                      return self.translate_with_google(text)

      This Translation Service is like a smart translator that knows how to:

      • Detect what language the text is written in,
      • Choose the best translation provider depending on the language (especially for Indian languages),
      • And then translate the text into English.

      It supports mixed-language content (such as Hindi-English in one sentence) and uses either Google Translate or Sarvam AI, a translation service designed for Indian languages.

      Now, let us understand the key methods in a step-by-step manner:

      1. Translating Using Google Translate

      def translate_with_google(...)
      

      This function uses Google Translate:

      • It sends the text, asks for English as the target language, and gets a translation back.
      • It also detects the source language automatically.
      • If successful, it returns the translated text and the detected original language.
      • If there’s an error, it returns a message saying what went wrong.

      Best For: Non-Indian languages (like Spanish, French, Chinese) and content that is not mixed with English.

      2. Main Translation Logic

      def translate(self, text, language_info)
      

      This is the decision-maker. Here’s how it works:

      Case 1: No Translation Needed

      If the text is already in English or the language is unknown, it simply returns the original text.

      Case 2: Mixed Language (e.g., Hindi + English)

      If the text contains more than one language:

      • ✅ If one part is English → use Google Translate (it’s good with mixed languages).
      • ✅ If it includes Indian languages only → use Sarvam AI (better at handling Indian content).
      • ✅ If it’s neither English nor Indian → use Google Translate.

      The service checks how confident it is about each language in the mix and chooses the most likely one to translate from.

      Case 3: Single Language

      If the text is only in one language:

      • ✅ If it’s an Indian language (like Bengali, Tamil, or Marathi), use Sarvam AI.
      • ✅ If it’s any other language, use Google Translate.

      So, we’ve done it.

      I’ve included the complete working solutions for you in the GitHub Link.

      We’ll cover the detailed performance testing, Optimized configurations & many other useful details in our next post.

      Till then, Happy Avenging! 🙂

      Building solutions using LLM AutoGen in Python – Part 3

      Before we dive into the details of this post, let us provide the previous two links that precede it.

      Building solutions using LLM AutoGen in Python – Part 1

      Building solutions using LLM AutoGen in Python – Part 2

      For, reference, we’ll share the demo before deep dive into the actual follow-up analysis in the below section –


      In this post, we will understand the initial code generated & then the revised code to compare them for a better understanding of the impact of revised prompts.

      But, before that let us broadly understand the communication types between the agents.

      • Agents InvolvedAgent1Agent2
      • Flow:
        • Agent1 sends a request directly to Agent2.
        • Agent2 processes the request and sends the response back to Agent1.
      • Use Case: Simple query-response interactions without intermediaries.
      • Agents InvolvedUserAgentMediatorSpecialistAgent1SpecialistAgent2
      • Flow:
        • UserAgent sends input to Mediator.
        • Mediator delegates tasks to SpecialistAgent1 and SpecialistAgent2.
        • Specialists process tasks and return results to Mediator.
        • Mediator consolidates results and sends them back to UserAgent.
      • Agents InvolvedBroadcasterAgentAAgentBAgentC
      • Flow:
        • Broadcaster sends a message to multiple agents simultaneously.
        • Agents that find the message relevant (AgentAAgentC) acknowledge or respond.
      • Use Case: System-wide notifications or alerts.
      • Agents InvolvedSupervisorWorker1Worker2
      • Flow:
        • Supervisor assigns tasks to Worker1 and Worker2.
        • Workers execute tasks and report progress back to Supervisor.
      • Use Case: Task delegation in structured organizations.
      • Agents InvolvedPublisherSubscriber1Topic
      • Flow:
        • Publisher publishes an event or message to a Topic.
        • Subscriber1, who is subscribed to the Topic, receives the event.
      • Use Case: Decoupled systems where publishers and subscribers do not need direct knowledge of each other.
      • Agents InvolvedTriggerEventReactiveAgentNextStep
      • Flow:
        • An event occurs (TriggerEvent).
        • ReactiveAgent detects the event and acts.
        • The action leads to the NextStep in the process.
      • Use Case: Systems that need to respond to asynchronous events or changes in the environment.

      Since, we now understand the basic communication types. Let us understand the AutoGen generated first code & the last code (That satisfies our need) –

      # filename: simple_snake.py (Generated by AutoGen)
      
      import pygame
      import time
      import random
       
      snake_speed = 15
       
      # Window color
      white = pygame.Color(255, 255, 255)
       
      # Snake color
      green = pygame.Color(0, 255, 0)
       
      snake_position = [100, 50]
       
      # defining first 4 blocks 
      # of snake body
      snake_body = [ [100, 50], 
                     [90, 50],
                     [80, 50],
                     [70, 50]
                  ]
      # fruit position
      fruit_position = [random.randrange(1, (1000//10)) * 10, 
                        random.randrange(1, (600//10)) * 10]
      fruit_spawn = True
       
      direction = 'RIGHT'
      change_to = direction
       
      score = 0
       
      # Initialising pygame
      pygame.init()
       
      # Initialise game window
      win = pygame.display.set_mode((1000, 600))
      pygame.display.set_caption("Snake game for kids")
       
      # FPS (frames per second) controller
      fps_controller = pygame.time.Clock()
       
        
      while True:
          # handling key events
          for event in pygame.event.get():
              if event.type == pygame.KEYDOWN:
                  if event.key == pygame.K_UP:
                      change_to = 'UP'
                  if event.key == pygame.K_DOWN:
                      change_to = 'DOWN'
                  if event.key == pygame.K_LEFT:
                      change_to = 'LEFT'
                  if event.key == pygame.K_RIGHT:
                      change_to = 'RIGHT'
      
          # If two keys pressed simultaneously
          # we don't want snake to move into two
          # directions simultaneously
          if change_to == 'UP' and direction != 'DOWN':
              direction = 'UP'
          if change_to == 'DOWN' and direction != 'UP':
              direction = 'DOWN'
          if change_to == 'LEFT' and direction != 'RIGHT':
              direction = 'LEFT'
          if change_to == 'RIGHT' and direction != 'LEFT':
              direction = 'RIGHT'
       
          # Moving the snake
          if direction == 'UP':
              snake_position[1] -= 10
          if direction == 'DOWN':
              snake_position[1] += 10
          if direction == 'LEFT':
              snake_position[0] -= 10
          if direction == 'RIGHT':
              snake_position[0] += 10
       
          # Snake body growing mechanism
          # if fruits and snakes collide then scores
          # will increase by 10
          snake_body.insert(0, list(snake_position))
          if snake_position[0] == fruit_position[0] and snake_position[1] == fruit_position[1]:
              score += 10
              fruit_spawn = False
          else:
              snake_body.pop()
               
          if not fruit_spawn:
              fruit_position = [random.randrange(1, (1000//10)) * 10, 
                                random.randrange(1, (600//10)) * 10]
               
          fruit_spawn = True
          win.fill(white)
          
          for pos in snake_body:
              pygame.draw.rect(win, green, pygame.Rect(pos[0], pos[1], 10, 10))
          pygame.draw.rect(win, white, pygame.Rect(
          fruit_position[0], fruit_position[1], 10, 10))
       
          # Game Over conditions
          if snake_position[0] < 0 or snake_position[0] > 1000-10:
              break
          if snake_position[1] < 0 or snake_position[1] > 600-10:
              break
       
          # Touching the snake body
          for block in snake_body[1:]:
              if snake_position[0] == block[0] and snake_position[1] == block[1]:
                  break
          
          # refresh game screen
          pygame.display.update()
      
          # Frame Per Second /Refresh rate
          fps_controller.tick(snake_speed)
      
      # displaying final score after game over
      print(f"You scored {score} in the game.")

      Now, let us remember the prompt refinement that we’ve done (We’ve already posted that in our last post as well) –

      I want to place the snake within the boundary of the visual wall & it should bounce each wall & then hit the target. And, it should increase the size. And, every time when it hits the target it will increase the score by 10. And, the application will run for 2 minutes, which will be displayed on the screen. This will exit once 2 minute is over.
      # filename: simple_snake.py (Generated by AutoGen)
      
      import pygame
      import time
      import random
      import math
      
      pygame.init()
      
      white = (255, 255, 255)
      yellow = (255, 255, 102)
      green = (0, 255, 0)
      red = (255, 0, 0)
      black = (0, 0, 0)
      blue = (0, 0, 255)
      
      dis_width = 800
      dis_height = 600
      
      dis = pygame.display.set_mode((dis_width, dis_height))
      pygame.display.set_caption('Snake Game')
      
      clock = pygame.time.Clock()
      snake_block = 10
      snake_speed = 30
      font_style = pygame.font.SysFont(None, 50)
      score_font = pygame.font.SysFont(None, 35)
      
      def our_snake(snake_block, snake_List):
          for x in snake_List:
              pygame.draw.rect(dis, green, [x[0], x[1], snake_block, snake_block])
      
      def message(msg,color):
          mesg = font_style.render(msg, True, color)
          dis.blit(mesg, [dis_width / 3, dis_height / 3])
      
      def gameLoop():  # creating a function
          game_over = False
          game_close = False
      
          # snake starting coordinates
          x1 = dis_width / 2
          y1 = dis_height / 2
      
          # snake initial movement direction
          x1_change = 0
          y1_change = 0
      
          # initialize snake length and list of coordinates
          snake_List = []
          Length_of_snake = 1
      
          # random starting point for the food
          foodx = round(random.randrange(0, dis_width - snake_block) / 10.0) * 10.0
          foody = round(random.randrange(0, dis_height - snake_block) / 10.0) * 10.0
      
          # initialize score
          score = 0
      
          # store starting time
          start_time = time.time()
      
          while not game_over:
      
              # Remaining time
              elapsed_time = time.time() - start_time
              remaining_time = 120 - elapsed_time  # 2 minutes game
              if remaining_time <= 0:
                  game_over = True
      
              # event handling loop
              for event in pygame.event.get():
                  if event.type == pygame.QUIT:
                      game_over = True  # when closing window
                  if event.type == pygame.MOUSEBUTTONUP:
                      # get mouse click coordinates
                      pos = pygame.mouse.get_pos()
      
                      # calculate new direction vector from snake to click position
                      x1_change = pos[0] - x1
                      y1_change = pos[1] - y1
      
                      # normalize direction vector
                      norm = math.sqrt(x1_change ** 2 + y1_change ** 2)
                      if norm != 0:
                          x1_change /= norm
                          y1_change /= norm
      
                      # multiply direction vector by step size
                      x1_change *= snake_block
                      y1_change *= snake_block
      
              x1 += x1_change
              y1 += y1_change
              dis.fill(white)
              pygame.draw.rect(dis, red, [foodx, foody, snake_block, snake_block])
              pygame.draw.rect(dis, green, [x1, y1, snake_block, snake_block])
              snake_Head = []
              snake_Head.append(x1)
              snake_Head.append(y1)
              snake_List.append(snake_Head)
              if len(snake_List) > Length_of_snake:
                  del snake_List[0]
      
              our_snake(snake_block, snake_List)
      
              # Bounces the snake back if it hits the edge
              if x1 < 0 or x1 > dis_width:
                  x1_change *= -1
              if y1 < 0 or y1 > dis_height:
                  y1_change *= -1
      
              # Display score
              value = score_font.render("Your Score: " + str(score), True, black)
              dis.blit(value, [0, 0])
      
              # Display remaining time
              time_value = score_font.render("Remaining Time: " + str(int(remaining_time)), True, blue)
              dis.blit(time_value, [0, 30])
      
              pygame.display.update()
      
              # Increase score and length of snake when snake gets the food
              if abs(x1 - foodx) < snake_block and abs(y1 - foody) < snake_block:
                  foodx = round(random.randrange(0, dis_width - snake_block) / 10.0) * 10.0
                  foody = round(random.randrange(0, dis_height - snake_block) / 10.0) * 10.0
                  Length_of_snake += 1
                  score += 10
      
              # Snake movement speed
              clock.tick(snake_speed)
      
          pygame.quit()
          quit()
      
      gameLoop()
      

      Now, let us understand the difference here –

      The first program is a snake game controlled by arrow keys that end if the Snake hits a wall or itself. The second game uses mouse clicks for control, bounces off walls instead of ending, includes a 2-minute timer, and displays the remaining time.

      So, we’ve done it. 🙂

      You can find the detailed code in the following Github link.


      I’ll bring some more exciting topics in the coming days from the Python verse.

      Till then, Happy Avenging! 🙂

      Building & deploying a RAG architecture rapidly using Langflow & Python

      I’ve been looking for a solution that can help deploy any RAG solution involving Python faster. It would be more effective if an available UI helped deliver the solution faster. And, here comes the solution that does exactly what I needed – “LangFlow.”

      Before delving into the details, I strongly recommend taking a look at the demo. It’s a great way to get a comprehensive understanding of LangFlow and its capabilities in deploying RAG architecture rapidly.

      Demo

      This describes the entire architecture; hence, I’ll share the architecture components I used to build the solution.

      To know more about RAG-Architecture, please refer to the following link.

      As we all know, we can parse the data from the source website URL (in this case, I’m referring to my photography website to extract the text of one of my blogs) and then embed it into the newly created Astra DB & new collection, where I will be storing the vector embeddings.

      As you can see from the above diagram, the flow that I configured within 5 minutes and the full functionality of writing a complete solution (underlying Python application) within no time that extracts chunks, converts them into embeddings, and finally stores them inside the Astra DB.

      Now, let us understand the next phase, where, based on the ask from a chatbot, I need to convert that question into Vector DB & then find the similarity search to bring the relevant vectors as shown below –

      You need to configure this entire flow by dragging the necessary widgets from the left-side panel as marked in the Blue-Box shown below –

      For this specific use case, we’ve created an instance of Astra DB & then created an empty vector collection. Also, we need to ensure that we generate the API-Key & and provide the right roles assigned with the token. After successfully creating the token, you need to copy the endpoint, token & collection details & paste them into the desired fields of the Astra-DB components inside the LangFlow. Think of it as a framework where one needs to provide all the necessary information to build & run the entire flow successfully.

      Following are some of the important snapshots from the Astra-DB –

      Step – 1

      Step – 2

      Once you run the vector DB population, this will insert extracted text & then convert it into vectors, which will show in the following screenshot –

      You can see the sample vectors along with the text chunks inside the Astra DB data explorer as shown below –

      Some of the critical components are highlighted in the Blue-box which is important for us to monitor the vector embeddings.

      Now, here is how you can modify the current Python code of any available widgets or build your own widget by using the custom widget.

      The first step is to click the code button highlighted in the Red-box as shown below –

      The next step is when you click that button, which will open the detailed Python code representing the entire widget build & its functionality. This button is the place where you can add, modify, or keep it as it is depending upon your need, which will shown below –

      Once one builds the entire solution, you must click the final compile button (shown in the red box), which will eventually compile all the individual widgets. However, you can build the compile button for the individual widgets as soon as you make the solution. So you can pinpoint any potential problems at that very step.

      Let us understand one sample code of a widget. In this case, we will take vector embedding insertion into the Astra DB. Let us see the code –

      from typing import List, Optional, Union
      from langchain_astradb import AstraDBVectorStore
      from langchain_astradb.utils.astradb import SetupMode
      
      from langflow.custom import CustomComponent
      from langflow.field_typing import Embeddings, VectorStore
      from langflow.schema import Record
      from langchain_core.retrievers import BaseRetriever
      
      
      class AstraDBVectorStoreComponent(CustomComponent):
          display_name = "Astra DB"
          description = "Builds or loads an Astra DB Vector Store."
          icon = "AstraDB"
          field_order = ["token", "api_endpoint", "collection_name", "inputs", "embedding"]
      
          def build_config(self):
              return {
                  "inputs": {
                      "display_name": "Inputs",
                      "info": "Optional list of records to be processed and stored in the vector store.",
                  },
                  "embedding": {"display_name": "Embedding", "info": "Embedding to use"},
                  "collection_name": {
                      "display_name": "Collection Name",
                      "info": "The name of the collection within Astra DB where the vectors will be stored.",
                  },
                  "token": {
                      "display_name": "Token",
                      "info": "Authentication token for accessing Astra DB.",
                      "password": True,
                  },
                  "api_endpoint": {
                      "display_name": "API Endpoint",
                      "info": "API endpoint URL for the Astra DB service.",
                  },
                  "namespace": {
                      "display_name": "Namespace",
                      "info": "Optional namespace within Astra DB to use for the collection.",
                      "advanced": True,
                  },
                  "metric": {
                      "display_name": "Metric",
                      "info": "Optional distance metric for vector comparisons in the vector store.",
                      "advanced": True,
                  },
                  "batch_size": {
                      "display_name": "Batch Size",
                      "info": "Optional number of records to process in a single batch.",
                      "advanced": True,
                  },
                  "bulk_insert_batch_concurrency": {
                      "display_name": "Bulk Insert Batch Concurrency",
                      "info": "Optional concurrency level for bulk insert operations.",
                      "advanced": True,
                  },
                  "bulk_insert_overwrite_concurrency": {
                      "display_name": "Bulk Insert Overwrite Concurrency",
                      "info": "Optional concurrency level for bulk insert operations that overwrite existing records.",
                      "advanced": True,
                  },
                  "bulk_delete_concurrency": {
                      "display_name": "Bulk Delete Concurrency",
                      "info": "Optional concurrency level for bulk delete operations.",
                      "advanced": True,
                  },
                  "setup_mode": {
                      "display_name": "Setup Mode",
                      "info": "Configuration mode for setting up the vector store, with options likeSync,Async, orOff”.",
                      "options": ["Sync", "Async", "Off"],
                      "advanced": True,
                  },
                  "pre_delete_collection": {
                      "display_name": "Pre Delete Collection",
                      "info": "Boolean flag to determine whether to delete the collection before creating a new one.",
                      "advanced": True,
                  },
                  "metadata_indexing_include": {
                      "display_name": "Metadata Indexing Include",
                      "info": "Optional list of metadata fields to include in the indexing.",
                      "advanced": True,
                  },
                  "metadata_indexing_exclude": {
                      "display_name": "Metadata Indexing Exclude",
                      "info": "Optional list of metadata fields to exclude from the indexing.",
                      "advanced": True,
                  },
                  "collection_indexing_policy": {
                      "display_name": "Collection Indexing Policy",
                      "info": "Optional dictionary defining the indexing policy for the collection.",
                      "advanced": True,
                  },
              }
      
          def build(
              self,
              embedding: Embeddings,
              token: str,
              api_endpoint: str,
              collection_name: str,
              inputs: Optional[List[Record]] = None,
              namespace: Optional[str] = None,
              metric: Optional[str] = None,
              batch_size: Optional[int] = None,
              bulk_insert_batch_concurrency: Optional[int] = None,
              bulk_insert_overwrite_concurrency: Optional[int] = None,
              bulk_delete_concurrency: Optional[int] = None,
              setup_mode: str = "Sync",
              pre_delete_collection: bool = False,
              metadata_indexing_include: Optional[List[str]] = None,
              metadata_indexing_exclude: Optional[List[str]] = None,
              collection_indexing_policy: Optional[dict] = None,
          ) -> Union[VectorStore, BaseRetriever]:
              try:
                  setup_mode_value = SetupMode[setup_mode.upper()]
              except KeyError:
                  raise ValueError(f"Invalid setup mode: {setup_mode}")
              if inputs:
                  documents = [_input.to_lc_document() for _input in inputs]
      
                  vector_store = AstraDBVectorStore.from_documents(
                      documents=documents,
                      embedding=embedding,
                      collection_name=collection_name,
                      token=token,
                      api_endpoint=api_endpoint,
                      namespace=namespace,
                      metric=metric,
                      batch_size=batch_size,
                      bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                      bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                      bulk_delete_concurrency=bulk_delete_concurrency,
                      setup_mode=setup_mode_value,
                      pre_delete_collection=pre_delete_collection,
                      metadata_indexing_include=metadata_indexing_include,
                      metadata_indexing_exclude=metadata_indexing_exclude,
                      collection_indexing_policy=collection_indexing_policy,
                  )
              else:
                  vector_store = AstraDBVectorStore(
                      embedding=embedding,
                      collection_name=collection_name,
                      token=token,
                      api_endpoint=api_endpoint,
                      namespace=namespace,
                      metric=metric,
                      batch_size=batch_size,
                      bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                      bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                      bulk_delete_concurrency=bulk_delete_concurrency,
                      setup_mode=setup_mode_value,
                      pre_delete_collection=pre_delete_collection,
                      metadata_indexing_include=metadata_indexing_include,
                      metadata_indexing_exclude=metadata_indexing_exclude,
                      collection_indexing_policy=collection_indexing_policy,
                  )
      
              return vector_store
      

      Method: build_config:

      • This method defines the configuration options for the component.
      • Each configuration option includes a display_name and info, which provides details about the option.
      • Some options are marked as advanced, indicating they are optional and more complex.

      Method: build:

      • This method is used to create an instance of the Astra DB Vector Store.
      • It takes several parameters, including embedding, token, api_endpoint, collection_name, and various optional parameters.
      • It converts the setup_mode string to an enum value.
      • If inputs are provided, they are converted to a format suitable for storing in the vector store.
      • Depending on whether inputs are provided, a new vector store from documents can be created, or an empty vector store can be initialized with the given configurations.
      • Finally, it returns the created vector store instance.

      And, here is the the screenshot of your run –

      And, this is the last steps to run the Integrated Chatbot as shown below –

      As one can see the left side highlighted shows the reference text & chunks & the right side actual response.


      So, we’ve done it. And, you know the fun fact. I did this entire workflow within 35 minutes alone. 😛

      I’ll bring some more exciting topics in the coming days from the Python verse.

      To learn more about LangFlow, please click here.

      To learn about Astra DB, you need to click the following link.

      To learn about my blog & photography, you can click the following url.

      Till then, Happy Avenging!  🙂

      Text2SQL Data Extractor (T2SDE) using Python & Open AI LLM

      Today, I will share a new post that will contextualize the source files & then read the data into the pandas data frame, and then dynamically create the SQL & execute it. Then, fetch the data from the sources based on the query generated dynamically. This project is for the advanced Python developer and data Science Newbie.

      In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

      Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

      Demo

      Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.


      The application will take the metadata captured from source data dynamically. It blends the metadata and enhances the prompt to pass to the Flask server. The Flask server has all the limits of contexts.

      Once the application receives the correct generated SQL, it will then apply the SQL using the SQLAlchemy package to get the desired results.

      The following are the important packages that are essential to this project –

      pip install openai==1.6.1
      pip install pandas==2.1.4
      pip install Flask==3.0.0
      pip install SQLAlchemy==2.0.23

      We’ll have both the server and the main application. Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

      • 1_invokeSQLServer.py (This is the main calling Python script to invoke the OpenAI-Server.)

      Please find some of the key snippet from this discussion –

      @app.route('/message', methods=['POST'])
      def message():
          input_text = request.json.get('input_text', None)
          session_id = request.json.get('session_id', None)
      
          print('*' * 240)
          print('User Input:')
          print(str(input_text))
          print('*' * 240)
      
          # Retrieve conversation history from the session or database
          conversation_history = session.get(session_id, [])
      
          # Add the new message to the conversation history
          conversation_history.append(input_text)
      
          # Call OpenAI API with the updated conversation
          response = client.with_options(max_retries=0).chat.completions.create(
              messages=[
                  {
                      "role": "user",
                      "content": input_text,
                  }
              ],
              model=cf.conf['MODEL_NAME'],
          )
      
          # Extract the content from the first choice's message
          chat_response = response.choices[0].message.content
          print('*' * 240)
          print('Resposne::')
          print(chat_response)
          print('*' * 240)
      
          conversation_history.append(chat_response)
      
          # Store the updated conversation history in the session or database
          session[session_id] = conversation_history
      
          return chat_response

      This code defines a web application route that handles POST requests sent to the /message endpoint:

      1. Route Declaration: The @app.route('/message', methods=['POST']) part specifies that the function message() is executed when the server receives a POST request at the /message URL.
      2. Function Definition: Inside the message() function:
        • It retrieves two pieces of data from the request’s JSON body: input_text (the user’s input message) and session_id (a unique identifier for the user’s session).
        • It prints the user’s input message, surrounded by lines of asterisks for emphasis.
      3. Conversation History Management:
        • The code retrieves the conversation history associated with the given session_id. This history is a list of messages.
        • It then adds the new user message (input_text) to this conversation history.
      4. OpenAI API Call:
        • The function makes a call to the OpenAI API, passing the user’s message. It specifies not to retry the request if it fails (max_retries=0).
        • The model used for the OpenAI API call is taken from some configurations (cf.conf['MODEL_NAME']).
      5. Processing API Response:
        • The response from the OpenAI API is processed to extract the content of the chat response.
        • This chat response is printed.
      6. Updating Conversation History:
        • The chat response is added to the conversation history.
        • The updated conversation history is then stored back in the session or database, associated with the session_id.
      7. Returning the Response: Finally, the function returns the chat response.

      • clsDynamicSQLProcess.py (This Python class generates the SQL & then executes the flask server to invoke the OpenAI-Server.)

      Now, let us understand the few important piece of snippet –

      def text2SQLBegin(self, DBFileNameList, fileDBPath, srcQueryPrompt, joinCond, debugInd='N'):
      
              question = srcQueryPrompt
              create_table_statement = ''
              jStr = ''
      
              print('DBFileNameList::', DBFileNameList)
              print('prevSessionDBFileNameList::', self.prevSessionDBFileNameList)
      
              if set(self.prevSessionDBFileNameList) == set(DBFileNameList):
                  self.flag = 'Y'
              else:
                  self.flag = 'N'
      
              if self.flag == 'N':
      
                  for i in DBFileNameList:
                      DBFileName = i
      
                      FullDBname = fileDBPath + DBFileName
                      print('File: ', str(FullDBname))
      
                      tabName, _ = DBFileName.split('.')
      
                      # Reading the source data
                      df = pd.read_csv(FullDBname)
      
                      # Convert all string columns to lowercase
                      df = df.apply(lambda x: x.str.lower() if x.dtype == "object" else x)
      
                      # Convert DataFrame to SQL table
                      df.to_sql(tabName, con=engine, index=False)
      
                      # Create a MetaData object and reflect the existing database
                      metadata = MetaData()
                      metadata.reflect(bind=engine)
      
                      # Access the 'users' table from the reflected metadata
                      table = metadata.tables[tabName]
      
                      # Generate the CREATE TABLE statement
                      create_table_statement = create_table_statement + str(CreateTable(table)) + '; \n'
      
                      tabName = ''
      
                  for joinS in joinCond:
                      jStr = jStr + joinS + '\n'
      
                  self.prevSessionDBFileNameList = DBFileNameList
                  self.prev_create_table_statement = create_table_statement
      
                  masterSessionDBFileNameList = self.prevSessionDBFileNameList
                  mast_create_table_statement = self.prev_create_table_statement
      
              else:
                  masterSessionDBFileNameList = self.prevSessionDBFileNameList
                  mast_create_table_statement = self.prev_create_table_statement
      
              inputPrompt = (templateVal_1 + mast_create_table_statement + jStr + templateVal_2).format(question=question)
      
              if debugInd == 'Y':
                  print('INPUT PROMPT::')
                  print(inputPrompt)
      
              print('*' * 240)
              print('Find the Generated SQL:')
              print()
      
              DBFileNameList = []
              create_table_statement = ''
      
              return inputPrompt
      1. Function Overview: The text2SQLBegin function processes a list of database file names (DBFileNameList), a file path (fileDBPath), a query prompt (srcQueryPrompt), join conditions (joinCond), and a debug indicator (debugInd) to generate SQL commands.
      2. Initial Setup: It starts by initializing variables for the question, the SQL table creation statement, and a string for join conditions.
      3. Debug Prints: The function prints the current and previous session database file names for debugging purposes.
      4. Flag Setting: A flag is set to ‘Y’ if the current session’s database file names match the previous session’s; otherwise, it’s set to ‘N’.
      5. Processing New Session Data: If the flag is ‘N’, indicating new session data:
        • For each database file, it reads the data, converts string columns to lowercase, and creates a corresponding SQL table in a database using the pandas library.
        • Metadata is generated for each table and a CREATE TABLE SQL statement is created.
      6. Join Conditions and Statement Aggregation: Join conditions are concatenated, and previous session information is updated with the current session’s data.
      7. Handling Repeated Sessions: If the session data is repeated (flag is ‘Y’), it uses the previous session’s SQL table creation statements and database file names.
      8. Final Input Prompt Creation: It constructs the final input prompt by combining template values with the create table statement, join conditions, and the original question.
      9. Debug Printing: If debug mode is enabled, it prints the final input prompt.
      10. Conclusion: The function clears the DBFileNameList and create_table_statement variables, and returns the constructed input prompt.
        def text2SQLEnd(self, srcContext, debugInd='N'):
            url = self.url
      
            payload = json.dumps({"input_text": srcContext,"session_id": ""})
            headers = {'Content-Type': 'application/json', 'Cookie': cf.conf['HEADER_TOKEN']}
      
            response = requests.request("POST", url, headers=headers, data=payload)
      
            return response.text

      The text2SQLEnd function sends an HTTP POST request to a specified URL and returns the response. It takes two parameters: srcContext which contains the input text, and an optional debugInd for debugging purposes. The function constructs the request payload by converting the input text and an empty session ID to JSON format. It sets the request headers, including a content type of ‘application/json’ and a token from the configuration file. The function then sends the POST request using the requests library and returns the text content of the response.

        def sql2Data(self, srcSQL):
            # Executing the query on top of your data
            resultSQL = pd.read_sql_query(srcSQL, con=engine)
      
            return resultSQL

      The sql2Data function is designed to execute a SQL query on a database and return the result. It takes a single parameter, srcSQL, which contains the SQL query to be executed. The function uses the pandas library to run the provided SQL query (srcSQL) against a database connection (engine). It then returns the result of this query, which is typically a DataFrame object containing the data retrieved from the database.

      def genData(self, srcQueryPrompt, fileDBPath, DBFileNameList, joinCond, debugInd='N'):
          try:
              authorName = self.authorName
              website = self.website
              var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      
              print('*' * 240)
              print('SQL Start Time: ' + str(var))
              print('*' * 240)
      
              print('*' * 240)
              print()
      
              if debugInd == 'Y':
                  print('Author Name: ', authorName)
                  print('For more information, please visit the following Website: ', website)
                  print()
      
                  print('*' * 240)
              print('Your Data for Retrieval:')
              print('*' * 240)
      
              if debugInd == 'Y':
      
                  print()
                  print('Converted File to Dataframe Sample:')
                  print()
      
              else:
                  print()
      
              context = self.text2SQLBegin(DBFileNameList, fileDBPath, srcQueryPrompt, joinCond, debugInd)
              srcSQL = self.text2SQLEnd(context, debugInd)
      
              print(srcSQL)
              print('*' * 240)
              print()
              resDF = self.sql2Data(srcSQL)
      
              print('*' * 240)
              print('SQL End Time: ' + str(var))
              print('*' * 240)
      
              return resDF
      
          except Exception as e:
              x = str(e)
              print('Error: ', x)
      
              df = pd.DataFrame()
      
              return df
      1. Initialization and Debug Information: The function begins by initializing variables like authorName, website, and a timestamp (var). It then prints the start time of the SQL process. If the debug indicator (debugInd) is ‘Y’, it prints additional information like the author’s name and website.
      2. Generating SQL Context and Query: The function calls text2SQLBegin with various parameters (file paths, database file names, query prompt, join conditions, and the debug indicator) to generate an SQL context. Then it calls text2SQLEnd with this context and the debug indicator to generate the actual SQL query.
      3. Executing the SQL Query: It prints the generated SQL query for visibility, especially in debug mode. The query is then executed by calling sql2Data, which returns the result as a data frame (resDF).
      4. Finalization and Error Handling: After executing the query, it prints the SQL end time. In case of any exceptions during the process, it catches the error, prints it, and returns an empty DataFrame.
      5. Return Value: The function returns the DataFrame (resDF) containing the results of the executed SQL query. If an error occurs, it returns an empty DataFrame instead.

      Let us explore the directory structure starting from the parent to some of the important child folder should look like this –

      Let us understand the important screenshots of this entire process –


      So, finally, we’ve done it.

      You will get the complete codebase in the following GitHub link.

      I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

      Till then, Happy Avenging! 🙂

      RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

      Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

      In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

      Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

      Demo

      Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

      As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

      Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

      Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

      The following are the important packages that are essential to this project –

      pip install farm-haystack==1.19.0
      pip install Flask==2.2.5
      pip install Flask-Cors==4.0.0
      pip install Flask-JWT-Extended==4.5.2
      pip install Flask-Session==0.5.0
      pip install openai==0.27.8
      pip install pandas==2.0.3
      pip install tensorflow==2.11.1

      We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

      Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

      • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
      #########################################################
      #### Written By: SATYAKI DE                          ####
      #### Written On: 27-Jun-2023                         ####
      #### Modified On 28-Jun-2023                         ####
      ####                                                 ####
      #### Objective: This is the main calling             ####
      #### python script that will invoke the              ####
      #### shortcut application created inside MAC         ####
      #### enviornment including MacBook, IPad or IPhone.  ####
      ####                                                 ####
      #########################################################
      
      from flask import Flask, jsonify, request, session
      from flask_cors import CORS
      from werkzeug.security import check_password_hash, generate_password_hash
      from flask_jwt_extended import JWTManager, jwt_required, create_access_token
      import pandas as pd
      from clsConfigClient import clsConfigClient as cf
      import clsL as log
      import clsContentScrapper as csc
      import clsRAGOpenAI as crao
      import csv
      from datetime import timedelta
      import os
      import re
      import json
      
      ########################################################
      ################    Global Area   ######################
      ########################################################
      #Initiating Logging Instances
      clog = log.clsL()
      
      admin_key = cf.conf['ADMIN_KEY']
      secret_key = cf.conf['SECRET_KEY']
      session_path = cf.conf['SESSION_PATH']
      sessionFile = cf.conf['SESSION_CACHE_FILE']
      
      app = Flask(__name__)
      CORS(app)  # This will enable CORS for all routes
      app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
      app.secret_key = secret_key
      
      jwt = JWTManager(app)
      
      users = cf.conf['USER_NM']
      passwd = cf.conf['USER_PWD']
      
      cCScrapper = csc.clsContentScrapper()
      cr = crao.clsRAGOpenAI()
      
      # Disbling Warning
      def warn(*args, **kwargs):
          pass
      
      import warnings
      warnings.warn = warn
      
      # Define the aggregation functions
      def join_unique(series):
          unique_vals = series.drop_duplicates().astype(str)
          return ', '.join(filter(lambda x: x != 'nan', unique_vals))
      
      # Building the preaggregate cache
      def groupImageWiki():
          try:
              base_path = cf.conf['OUTPUT_PATH']
              inputFile = cf.conf['CLEANED_FILE']
              outputFile = cf.conf['CLEANED_FILE_SHORT']
              subdir = cf.conf['SUBDIR_OUT']
              Ind = cf.conf['DEBUG_IND']
      
              inputCleanedFileLookUp = base_path + inputFile
      
              #Opening the file in dataframe
              df = pd.read_csv(inputCleanedFileLookUp)
              hash_values = df['Total_Hash'].unique()
      
              dFin = df[['primaryImage','Wiki_URL','Total_Hash']]
      
              # Ensure columns are strings and not NaN
              # Convert columns to string and replace 'nan' with an empty string
              dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
              dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')
      
              dFin.drop_duplicates()
      
              # Group by 'Total_Hash' and aggregate
              dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()
      
              return dfAgg
      
          except Exception as e:
              x = str(e)
              print('Error: ', x)
      
              df = pd.DataFrame()
      
              return df
      
      resDf = groupImageWiki()
      
      ########################################################
      ################  End  Global Area  ####################
      ########################################################
      
      def extractRemoveUrls(hash_value):
          image_urls = ''
          wiki_urls = ''
          # Parse the inner message JSON string
          try:
      
              resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
              filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]
      
              if not filtered_df.empty:
                  image_urls = filtered_df['primaryImage'].values[0]
                  wiki_urls = filtered_df['Wiki_URL'].values[0]
      
              return image_urls, wiki_urls
      
          except Exception as e:
              x = str(e)
              print('extractRemoveUrls Error: ', x)
              return image_urls, wiki_urls
      
      def isIncomplete(line):
          """Check if a line appears to be incomplete."""
      
          # Check if the line ends with certain patterns indicating it might be incomplete.
          incomplete_patterns = [': [Link](', ': Approximately ', ': ']
          return any(line.endswith(pattern) for pattern in incomplete_patterns)
      
      def filterData(data):
          """Return only the complete lines from the data."""
      
          lines = data.split('\n')
          complete_lines = [line for line in lines if not isIncomplete(line)]
      
          return '\n'.join(complete_lines)
      
      def updateCounter(sessionFile):
          try:
              counter = 0
      
              # Check if the CSV file exists
              if os.path.exists(sessionFile):
                  with open(sessionFile, 'r') as f:
                      reader = csv.reader(f)
                      for row in reader:
                          # Assuming the counter is the first value in the CSV
                          counter = int(row[0])
      
              # Increment counter
              counter += 1
      
              # Write counter back to CSV
              with open(sessionFile, 'w', newline='') as f:
                  writer = csv.writer(f)
                  writer.writerow([counter])
      
              return counter
          except Exception as e:
              x = str(e)
              print('Error: ', x)
      
              return 1
      
      def getPreviousResult():
          try:
              fullFileName = session_path + sessionFile
              newCounterValue = updateCounter(fullFileName)
      
              return newCounterValue
          except Exception as e:
              x = str(e)
              print('Error: ', x)
      
              return 1
      
      @app.route('/login', methods=['POST'])
      def login():
          username = request.json.get('username', None)
          password = request.json.get('password', None)
      
          print('User Name: ', str(username))
          print('Password: ', str(password))
      
          #if username not in users or not check_password_hash(users.get(username), password):
          if ((username not in users) or (password not in passwd)):
              return jsonify({'login': False}), 401
      
          access_token = create_access_token(identity=username)
          return jsonify(access_token=access_token)
      
      @app.route('/chat', methods=['POST'])
      def get_chat():
          try:
              #session["key"] = "1D98KI"
              #session_id = session.sid
              #print('Session Id: ', str(session_id))
      
              cnt = getPreviousResult()
              print('Running Session Count: ', str(cnt))
      
              username = request.json.get('username', None)
              message = request.json.get('message', None)
      
              print('User: ', str(username))
              print('Content: ', str(message))
      
              if cnt == 1:
                  retList = cCScrapper.extractCatalog()
              else:
                  hashValue, cleanedData = cr.getData(str(message))
                  print('Main Hash Value:', str(hashValue))
      
                  imageUrls, wikiUrls = extractRemoveUrls(hashValue)
                  print('Image URLs: ', str(imageUrls))
                  print('Wiki URLs: ', str(wikiUrls))
                  print('Clean Text:')
                  print(str(cleanedData))
                  retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'
      
              response = {
                  'message': retList
              }
      
              print('JSON: ', str(response))
              return jsonify(response)
      
          except Exception as e:
              x = str(e)
      
              response = {
                  'message': 'Error: ' + x
              }
              return jsonify(response)
      
      @app.route('/api/data', methods=['GET'])
      @jwt_required()
      def get_data():
          response = {
              'message': 'Hello from Flask!'
          }
          return jsonify(response)
      
      if __name__ == '__main__':
          app.run(debug=True)
      

      Let us understand some of the important sections of the above script –

      Function – login():

      The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

      Function – get_chat():

      The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

      Function – updateCounter():

      The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

      Function – extractRemoveUrls():

      The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

      • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
      #####################################################
      #### Written By: SATYAKI DE                      ####
      #### Written On: 27-May-2023                     ####
      #### Modified On 28-May-2023                     ####
      ####                                             ####
      #### Objective: This is the main calling         ####
      #### python class that will invoke the           ####
      #### LangChain of package to extract             ####
      #### the transcript from the YouTube videos &    ####
      #### then answer the questions based on the      ####
      #### topics selected by the users.               ####
      ####                                             ####
      #####################################################
      
      from langchain.document_loaders import YoutubeLoader
      from langchain.text_splitter import RecursiveCharacterTextSplitter
      from langchain.embeddings.openai import OpenAIEmbeddings
      from langchain.vectorstores import FAISS
      from langchain.chat_models import ChatOpenAI
      from langchain.chains import LLMChain
      
      from langchain.prompts.chat import (
          ChatPromptTemplate,
          SystemMessagePromptTemplate,
          HumanMessagePromptTemplate,
      )
      
      from googleapiclient.discovery import build
      
      import clsTemplate as ct
      from clsConfigClient import clsConfigClient as cf
      
      import os
      
      from flask import jsonify
      import requests
      
      ###############################################
      ###           Global Section                ###
      ###############################################
      open_ai_Key = cf.conf['OPEN_AI_KEY']
      os.environ["OPENAI_API_KEY"] = open_ai_Key
      embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)
      
      YouTube_Key = cf.conf['YOUTUBE_KEY']
      youtube = build('youtube', 'v3', developerKey=YouTube_Key)
      
      # Disbling Warning
      def warn(*args, **kwargs):
          pass
      
      import warnings
      warnings.warn = warn
      
      ###############################################
      ###    End of Global Section                ###
      ###############################################
      
      class clsContentScrapper:
          def __init__(self):
              self.model_name = cf.conf['MODEL_NAME']
              self.temp_val = cf.conf['TEMP_VAL']
              self.max_cnt = int(cf.conf['MAX_CNT'])
              self.url = cf.conf['BASE_URL']
              self.header_token = cf.conf['HEADER_TOKEN']
      
          def extractCatalog(self):
              try:
                  base_url = self.url
                  header_token = self.header_token
      
                  url = base_url + '/departments'
      
                  print('Full URL: ', str(url))
      
                  payload={}
                  headers = {'Cookie': header_token}
      
                  response = requests.request("GET", url, headers=headers, data=payload)
      
                  x = response.text
      
                  return x
              except Exception as e:
                  discussedTopic = []
                  x = str(e)
                  print('Error: ', x)
      
                  return x
      

      Let us understand the the core part that require from this class.

      Function – extractCatalog():

      The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

      • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
      #########################################################
      #### Written By: SATYAKI DE                          ####
      #### Written On: 27-Jun-2023                         ####
      #### Modified On 28-Jun-2023                         ####
      ####                                                 ####
      #### Objective: This is the main calling             ####
      #### python script that will invoke the              ####
      #### shortcut application created inside MAC         ####
      #### enviornment including MacBook, IPad or IPhone.  ####
      ####                                                 ####
      #########################################################
      
      from haystack.document_stores.faiss import FAISSDocumentStore
      from haystack.nodes import DensePassageRetriever
      import openai
      
      from clsConfigClient import clsConfigClient as cf
      import clsL as log
      
      # Disbling Warning
      def warn(*args, **kwargs):
          pass
      
      import warnings
      warnings.warn = warn
      
      import os
      import re
      ###############################################
      ###           Global Section                ###
      ###############################################
      Ind = cf.conf['DEBUG_IND']
      queryModel = cf.conf['QUERY_MODEL']
      passageModel = cf.conf['PASSAGE_MODEL']
      
      #Initiating Logging Instances
      clog = log.clsL()
      
      os.environ["TOKENIZERS_PARALLELISM"] = "false"
      
      vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
      
      indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
      indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"
      
      print('File: ', str(indexFile))
      print('Config: ', str(indexConfig))
      
      # Also, provide `config_path` parameter if you set it when calling the `save()` method:
      new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)
      
      # Initialize Retriever
      retriever = DensePassageRetriever(document_store=new_document_store,
                                        query_embedding_model=queryModel,
                                        passage_embedding_model=passageModel,
                                        use_gpu=False)
      
      
      ###############################################
      ###    End of Global Section                ###
      ###############################################
      
      class clsRAGOpenAI:
          def __init__(self):
              self.basePath = cf.conf['DATA_PATH']
              self.fileName = cf.conf['FILE_NAME']
              self.Ind = cf.conf['DEBUG_IND']
              self.subdir = str(cf.conf['OUT_DIR'])
              self.base_url = cf.conf['BASE_URL']
              self.outputPath = cf.conf['OUTPUT_PATH']
              self.vectorDBPath = cf.conf['VECTORDB_PATH']
              self.openAIKey = cf.conf['OPEN_AI_KEY']
              self.temp = cf.conf['TEMP_VAL']
              self.modelName = cf.conf['MODEL_NAME']
              self.maxToken = cf.conf['MAX_TOKEN']
      
          def extractHash(self, text):
              try:
                  # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
                  pattern = r"Ref: \{'(\d+)'\}"
                  match = re.search(pattern, text)
      
                  if match:
                      return match.group(1)
                  else:
                      return None
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return None
      
          def removeSentencesWithNaN(self, text):
              try:
                  # Split text into sentences using regular expression
                  sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
                  # Filter out sentences containing 'nan'
                  filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
                  # Rejoin the sentences
                  return ' '.join(filteredSentences)
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return ''
      
          def retrieveDocumentsReader(self, question, top_k=9):
              return retriever.retrieve(question, top_k=top_k)
      
          def generateAnswerWithGPT3(self, retrieved_docs, question):
              try:
                  openai.api_key = self.openAIKey
                  temp = self.temp
                  modelName = self.modelName
                  maxToken = self.maxToken
      
                  documentsText = " ".join([doc.content for doc in retrieved_docs])
      
                  filteredDocs = self.removeSentencesWithNaN(documentsText)
                  hashValue = self.extractHash(filteredDocs)
      
                  print('RAG Docs:: ')
                  print(filteredDocs)
                  #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"
      
                  # Set up a chat-style prompt with your data
                  messages = [
                      {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                      {"role": "user", "content": filteredDocs}
                  ]
      
                  # Chat style invoking the latest model
                  response = openai.ChatCompletion.create(
                      model=modelName,
                      messages=messages,
                      temperature = temp,
                      max_tokens=maxToken
                  )
                  return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
              except Exception as e:
                  x = str(e)
                  print('failed to get from OpenAI: ', x)
                  return 'Not Available!'
      
          def ragAnswerWithHaystackAndGPT3(self, question):
              retrievedDocs = self.retrieveDocumentsReader(question)
              return self.generateAnswerWithGPT3(retrievedDocs, question)
      
          def getData(self, strVal):
              try:
                  print('*'*120)
                  print('Index Your Data for Retrieval:')
                  print('*'*120)
      
                  print('Response from New Docs: ')
                  print()
      
                  hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)
      
                  print('GPT3 Answer::')
                  print(answer)
                  print('Hash Value:')
                  print(str(hashValue))
      
                  print('*'*240)
                  print('End Of Use RAG to Generate Answers:')
                  print('*'*240)
      
                  return hashValue, answer
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
                  answer = x
                  hashValue = 1
      
                  return hashValue, answer
      

      Let us understand some of the important block –

      Function – ragAnswerWithHaystackAndGPT3():

      The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

      Function – generateAnswerWithGPT3():

      The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

      Function – retrieveDocumentsReader():

      The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

      • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
      // App.js
      import React, { useState } from 'react';
      import axios from 'axios';
      import './App.css';
      
      const App = () => {
        const [isLoggedIn, setIsLoggedIn] = useState(false);
        const [username, setUsername] = useState('');
        const [password, setPassword] = useState('');
        const [message, setMessage] = useState('');
        const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);
      
        const handleLogin = async (e) => {
          e.preventDefault();
          try {
            const response = await axios.post('http://localhost:5000/login', { username, password });
            if (response.status === 200) {
              setIsLoggedIn(true);
            }
          } catch (error) {
            console.error('Login error:', error);
          }
        };
      
        const sendMessage = async (username) => {
          if (message.trim() === '') return;
      
          // Create a new chat entry
          const newChatEntry = {
            sender: 'user',
            message: message.trim(),
          };
      
          // Clear the input field
          setMessage('');
      
          try {
            // Make API request to Python-based API
            const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
            const responseData = response.data;
      
            // Print the response to the console for debugging
            console.log('API Response:', responseData);
      
            // Parse the nested JSON from the 'message' attribute
            const jsonData = JSON.parse(responseData.message);
      
            // Check if the data contains 'departments'
            if (jsonData.departments) {
      
              // Extract the 'departments' attribute from the parsed data
              const departments = jsonData.departments;
      
              // Extract the department names and create a single string with line breaks
              const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');
      
              // Update the chat log with the bot's response
              setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
            }
            else if (jsonData.records)
            {
              // Data structure 2: Artwork information
              const records = jsonData.records;
      
              // Prepare chat entries
              const chatEntries = [];
      
              // Iterate through records and extract text, image, and wiki information
              records.forEach((record) => {
                const textInfo = Object.entries(record).map(([key, value]) => {
                  if (key !== 'Image' && key !== 'Wiki') {
                    return `${key}: ${value}`;
                  }
                  return null;
                }).filter((info) => info !== null).join('\n');
      
                const imageLink = record.Image;
                //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
                //const wikiLinks = record.Wiki;
                const wikiLinks = record.Wiki.split(',').map(link => link.trim());
      
                console.log('Wiki:', wikiLinks);
      
                // Check if there is a valid image link
                const hasValidImage = imageLink && imageLink !== '[]';
      
                const imageElement = hasValidImage ? (
                  <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
                ) : null;
      
                // Create JSX elements for rendering the wiki links (if available)
                const wikiElements = wikiLinks.map((link, index) => (
                  <div key={index}>
                    <a href={link} target="_blank" rel="noopener noreferrer">
                      Wiki Link {index + 1}
                    </a>
                  </div>
                ));
      
                if (textInfo) {
                  chatEntries.push({ sender: 'bot', message: textInfo });
                }
      
                if (imageElement) {
                  chatEntries.push({ sender: 'bot', message: imageElement });
                }
      
                if (wikiElements.length > 0) {
                  chatEntries.push({ sender: 'bot', message: wikiElements });
                }
              });
      
              // Update the chat log with the bot's response
              setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
            }
      
          } catch (error) {
            console.error('Error sending message:', error);
          }
        };
      
        if (!isLoggedIn) {
          return (
            <div className="login-container">
              <h2>Welcome to the MuBot</h2>
              <form onSubmit={handleLogin} className="login-form">
                <input
                  type="text"
                  placeholder="Enter your name"
                  value={username}
                  onChange={(e) => setUsername(e.target.value)}
                  required
                />
                <input
                  type="password"
                  placeholder="Enter your password"
                  value={password}
                  onChange={(e) => setPassword(e.target.value)}
                  required
                />
                <button type="submit">Login</button>
              </form>
            </div>
          );
        }
      
        return (
          <div className="chat-container">
            <div className="chat-header">
              <h2>Hello, {username}</h2>
              <h3>Chat with MuBot</h3>
            </div>
            <div className="chat-log">
              {chatLog.map((chatEntry, index) => (
                <div
                  key={index}
                  className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
                >
                  <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
                  <p className="chat-message">{chatEntry.message}</p>
                </div>
              ))}
            </div>
            <div className="chat-input">
              <input
                type="text"
                placeholder="Type your message..."
                value={message}
                onChange={(e) => setMessage(e.target.value)}
                onKeyPress={(e) => {
                  if (e.key === 'Enter') {
                    sendMessage();
                  }
                }}
              />
              <button onClick={sendMessage}>Send</button>
            </div>
          </div>
        );
      };
      
      export default App;
      

      Please find some of the important logic –

      Function – handleLogin():

      The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

      Function – sendMessage():

      The sendMessage asynchronous function is designed to handle the user’s chat interaction:

      1. If the message is empty (after trimming spaces), the function exits without further action.
      2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
      3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
      4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
      5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
      6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
      7. Errors, if encountered, are logged to the console.

      This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


      Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


      So, finally, we’ve done it.

      I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

      You will get the complete codebase in the following GitHub link.

      I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

      Till then, Happy Avenging! 🙂

      Neural prophet – The enhanced version of Facebook’s forecasting API

      Hi Team,

      Today, I’ll be explaining the enhancement of one of the previous posts. I know that I’ve shared the fascinating API named prophet-API, which Facebook developed. One can quickly get more accurate predictions with significantly fewer data points. (If you want to know more about that post, please click on the following link.)

      However, there is another enhancement on top of that API, which is more accurate. However, one needs to know – when they should consider using it. So, today, we’ll be talking about the neural prophet API.

      But, before we start digging deep, why don’t we view the demo first?

      Demo

      Let’s visit a diagram. That way, you can understand where you can use it. Also, I’ll be sharing some of the links from the original site for better information mining.

      Source: Neural Prophet (Official Site)

      As one can see, this API is trying to bridge between the different groups & it enables the time-series computation efficiently.

      WHERE TO USE:

      Let’s visit another diagram from the same source.

      Source: Neural Prophet (Official Site)

      So, I hope these two pictures give you a clear picture & relatively set your expectations to more ground reality.


      ARCHITECTURE:

      Let us explore the architecture –

      Architecture Diagram

      As one can see, the application is processing IoT data & creating a historical data volume, out of which the model is gradually predicting correct outcomes with higher confidence.

      For more information on this API, please visit the following link.


      CODE:

      Let’s explore the essential scripts here.

      1. clsConfig.py (Configuration file for the entire application.)


      ################################################
      #### Written By: SATYAKI DE ####
      #### Written On: 15-May-2020 ####
      #### Modified On: 28-Dec-2021 ####
      #### ####
      #### Objective: This script is a config ####
      #### file, contains all the keys for ####
      #### Machine-Learning & streaming dashboard.####
      #### ####
      ################################################
      import os
      import platform as pl
      import pandas as p
      class clsConfig(object):
      Curr_Path = os.path.dirname(os.path.realpath(__file__))
      os_det = pl.system()
      if os_det == "Windows":
      sep = '\\'
      else:
      sep = '/'
      conf = {
      'APP_ID': 1,
      'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
      'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
      'LOG_PATH': Curr_Path + sep + 'log' + sep,
      'REPORT_PATH': Curr_Path + sep + 'report',
      'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'thermostatIoT.csv',
      'SRC_PATH': Curr_Path + sep + 'data' + sep,
      'APP_DESC_1': 'Old Video Enhancement!',
      'DEBUG_IND': 'N',
      'INIT_PATH': Curr_Path,
      'SUBDIR': 'data',
      'SEP': sep,
      'testRatio':0.2,
      'valRatio':0.2,
      'epochsVal':8,
      'sleepTime':3,
      'sleepTime1':6,
      'factorVal':0.2,
      'learningRateVal':0.001,
      'event1': {
      'event': 'SummerEnd',
      'ds': p.to_datetime([
      '2010-04-01', '2011-04-01', '2012-04-01',
      '2013-04-01', '2014-04-01', '2015-04-01',
      '2016-04-01', '2017-04-01', '2018-04-01',
      '2019-04-01', '2020-04-01', '2021-04-01',
      ]),},
      'event2': {
      'event': 'LongWeekend',
      'ds': p.to_datetime([
      '2010-12-01', '2011-12-01', '2012-12-01',
      '2013-12-01', '2014-12-01', '2015-12-01',
      '2016-12-01', '2017-12-01', '2018-12-01',
      '2019-12-01', '2020-12-01', '2021-12-01',
      ]),}
      }

      view raw

      clsConfig.py

      hosted with ❤ by GitHub

      The only key snippet would be passing a nested json element with pandas dataframe in the following lines –

      'event1': {
          'event': 'SummerEnd',
          'ds': p.to_datetime([
              '2010-04-01', '2011-04-01', '2012-04-01',
              '2013-04-01', '2014-04-01', '2015-04-01',
              '2016-04-01', '2017-04-01', '2018-04-01',
              '2019-04-01', '2020-04-01', '2021-04-01',
          ]),},
      'event2': {
          'event': 'LongWeekend',
          'ds': p.to_datetime([
              '2010-12-01', '2011-12-01', '2012-12-01',
              '2013-12-01', '2014-12-01', '2015-12-01',
              '2016-12-01', '2017-12-01', '2018-12-01',
              '2019-12-01', '2020-12-01', '2021-12-01',
          ]),}

      As one can see, our application is equipped with the events to predict our use case better.

      2. clsPredictIonIoT.py (Main class file, which will invoke neural-prophet forecast for the entire application.)


      ################################################
      #### Written By: SATYAKI DE ####
      #### Written On: 19-Feb-2022 ####
      #### Modified On 21-Feb-2022 ####
      #### ####
      #### Objective: This python script will ####
      #### perform the neural-prophet forecast ####
      #### based on the historical input received ####
      #### from IoT device. ####
      ################################################
      # We keep the setup code in a different class as shown below.
      from clsConfig import clsConfig as cf
      import psutil
      import os
      import pandas as p
      import json
      import datetime
      from neuralprophet import NeuralProphet, set_log_level
      from neuralprophet import set_random_seed
      from neuralprophet.benchmark import Dataset, NeuralProphetModel, SimpleExperiment, CrossValidationExperiment
      import time
      import clsL as cl
      import matplotlib.pyplot as plt
      ###############################################
      ### Global Section ###
      ###############################################
      # Initiating Log class
      l = cl.clsL()
      set_random_seed(10)
      set_log_level("ERROR", "INFO")
      ###############################################
      ### End of Global Section ###
      ###############################################
      class clsPredictIonIoT:
      def __init__(self):
      self.sleepTime = int(cf.conf['sleepTime'])
      self.event1 = cf.conf['event1']
      self.event2 = cf.conf['event2']
      def forecastSeries(self, inputDf):
      try:
      sleepTime = self.sleepTime
      event1 = self.event1
      event2 = self.event2
      df = inputDf
      print('IoTData: ')
      print(df)
      ## user specified events
      # history events
      SummerEnd = p.DataFrame(event1)
      LongWeekend = p.DataFrame(event2)
      dfEvents = p.concat((SummerEnd, LongWeekend))
      # NeuralProphet Object
      # Adding events
      m = NeuralProphet(loss_func="MSE")
      # set the model to expect these events
      m = m.add_events(["SummerEnd", "LongWeekend"])
      # create the data df with events
      historyDf = m.create_df_with_events(df, dfEvents)
      # fit the model
      metrics = m.fit(historyDf, freq="D")
      # forecast with events known ahead
      futureDf = m.make_future_dataframe(df=historyDf, events_df=dfEvents, periods=365, n_historic_predictions=len(df))
      forecastDf = m.predict(df=futureDf)
      events = forecastDf[(forecastDf['event_SummerEnd'].abs() + forecastDf['event_LongWeekend'].abs()) > 0]
      events.tail()
      ## plotting forecasts
      fig = m.plot(forecastDf)
      ## plotting components
      figComp = m.plot_components(forecastDf)
      ## plotting parameters
      figParam = m.plot_parameters()
      #################################
      #### Train & Test Evaluation ####
      #################################
      m = NeuralProphet(seasonality_mode= "multiplicative", learning_rate = 0.1)
      dfTrain, dfTest = m.split_df(df=df, freq="MS", valid_p=0.2)
      metricsTrain = m.fit(df=dfTrain, freq="MS")
      metricsTest = m.test(df=dfTest)
      print('metricsTest:: ')
      print(metricsTest)
      # Predict Into Future
      metricsTrain2 = m.fit(df=df, freq="MS")
      futureDf = m.make_future_dataframe(df, periods=24, n_historic_predictions=48)
      forecastDf = m.predict(futureDf)
      fig = m.plot(forecastDf)
      # Visualize training
      m = NeuralProphet(seasonality_mode="multiplicative", learning_rate=0.1)
      dfTrain, dfTest = m.split_df(df=df, freq="MS", valid_p=0.2)
      metrics = m.fit(df=dfTrain, freq="MS", validation_df=dfTest, plot_live_loss=True)
      print('Tail of Metrics: ')
      print(metrics.tail(1))
      ######################################
      #### Time-series Cross-Validation ####
      ######################################
      METRICS = ['SmoothL1Loss', 'MAE', 'RMSE']
      params = {"seasonality_mode": "multiplicative", "learning_rate": 0.1}
      folds = NeuralProphet(**params).crossvalidation_split_df(df, freq="MS", k=5, fold_pct=0.20, fold_overlap_pct=0.5)
      metricsTrain = p.DataFrame(columns=METRICS)
      metricsTest = p.DataFrame(columns=METRICS)
      for dfTrain, dfTest in folds:
      m = NeuralProphet(**params)
      train = m.fit(df=dfTrain, freq="MS")
      test = m.test(df=dfTest)
      metricsTrain = metricsTrain.append(train[METRICS].iloc[-1])
      metricsTest = metricsTest.append(test[METRICS].iloc[-1])
      print('Stats: ')
      dfStats = metricsTest.describe().loc[["mean", "std", "min", "max"]]
      print(dfStats)
      ####################################
      #### Using Benchmark Framework ####
      ####################################
      print('Starting extracting result set for Benchmark:')
      ts = Dataset(df = df, name = "thermoStatsCPUUsage", freq = "MS")
      params = {"seasonality_mode": "multiplicative"}
      exp = SimpleExperiment(
      model_class=NeuralProphetModel,
      params=params,
      data=ts,
      metrics=["MASE", "RMSE"],
      test_percentage=25,
      )
      resultTrain, resultTest = exp.run()
      print('Test result for Benchmark:: ')
      print(resultTest)
      print('Finished extracting result test for Benchmark!')
      ####################################
      #### Cross Validate Experiment ####
      ####################################
      print('Starting extracting result set for Corss-Validation:')
      ts = Dataset(df = df, name = "thermoStatsCPUUsage", freq = "MS")
      params = {"seasonality_mode": "multiplicative"}
      exp_cv = CrossValidationExperiment(
      model_class=NeuralProphetModel,
      params=params,
      data=ts,
      metrics=["MASE", "RMSE"],
      test_percentage=10,
      num_folds=3,
      fold_overlap_pct=0,
      )
      resultTrain, resultTest = exp_cv.run()
      print('resultTest for Cross Validation:: ')
      print(resultTest)
      print('Finished extracting result test for Corss-Validation!')
      ######################################################
      #### 3-Phase Train, Test & Validation Experiment ####
      ######################################################
      print('Starting 3-phase Train, Test & Validation Experiment!')
      m = NeuralProphet(seasonality_mode= "multiplicative", learning_rate = 0.1)
      # create a test holdout set:
      dfTrainVal, dfTest = m.split_df(df=df, freq="MS", valid_p=0.2)
      # create a validation holdout set:
      dfTrain, dfVal = m.split_df(df=dfTrainVal, freq="MS", valid_p=0.2)
      # fit a model on training data and evaluate on validation set.
      metricsTrain1 = m.fit(df=dfTrain, freq="MS")
      metrics_val = m.test(df=dfVal)
      # refit model on training and validation data and evaluate on test set.
      metricsTrain2 = m.fit(df=dfTrainVal, freq="MS")
      metricsTest = m.test(df=dfTest)
      metricsTrain1["split"] = "train1"
      metricsTrain2["split"] = "train2"
      metrics_val["split"] = "validate"
      metricsTest["split"] = "test"
      metrics_stat = metricsTrain1.tail(1).append([metricsTrain2.tail(1), metrics_val, metricsTest]).drop(columns=['RegLoss'])
      print('Metrics Stat:: ')
      print(metrics_stat)
      # Train, Cross-Validate and Cross-Test evaluation
      METRICS = ['SmoothL1Loss', 'MAE', 'RMSE']
      params = {"seasonality_mode": "multiplicative", "learning_rate": 0.1}
      crossVal, crossTest = NeuralProphet(**params).double_crossvalidation_split_df(df, freq="MS", k=5, valid_pct=0.10, test_pct=0.10)
      metricsTrain1 = p.DataFrame(columns=METRICS)
      metrics_val = p.DataFrame(columns=METRICS)
      for dfTrain1, dfVal in crossVal:
      m = NeuralProphet(**params)
      train1 = m.fit(df=dfTrain, freq="MS")
      val = m.test(df=dfVal)
      metricsTrain1 = metricsTrain1.append(train1[METRICS].iloc[-1])
      metrics_val = metrics_val.append(val[METRICS].iloc[-1])
      metricsTrain2 = p.DataFrame(columns=METRICS)
      metricsTest = p.DataFrame(columns=METRICS)
      for dfTrain2, dfTest in crossTest:
      m = NeuralProphet(**params)
      train2 = m.fit(df=dfTrain2, freq="MS")
      test = m.test(df=dfTest)
      metricsTrain2 = metricsTrain2.append(train2[METRICS].iloc[-1])
      metricsTest = metricsTest.append(test[METRICS].iloc[-1])
      mtrain2 = metricsTrain2.describe().loc[["mean", "std"]]
      print('Train 2 Stats:: ')
      print(mtrain2)
      mval = metrics_val.describe().loc[["mean", "std"]]
      print('Validation Stats:: ')
      print(mval)
      mtest = metricsTest.describe().loc[["mean", "std"]]
      print('Test Stats:: ')
      print(mtest)
      return 0
      except Exception as e:
      x = str(e)
      print('Error: ', x)
      return 1

      Some of the key snippets that I will discuss here are as follows –

      ## user specified events
      # history events
      SummerEnd = p.DataFrame(event1)
      LongWeekend = p.DataFrame(event2)
      
      dfEvents = p.concat((SummerEnd, LongWeekend))
      
      # NeuralProphet Object
      # Adding events
      m = NeuralProphet(loss_func="MSE")
      
      # set the model to expect these events
      m = m.add_events(["SummerEnd", "LongWeekend"])
      
      # create the data df with events
      historyDf = m.create_df_with_events(df, dfEvents)

      Creating & adding events into your model will allow it to predict based on the milestones.

      # fit the model
      metrics = m.fit(historyDf, freq="D")
      
      # forecast with events known ahead
      futureDf = m.make_future_dataframe(df=historyDf, events_df=dfEvents, periods=365, n_historic_predictions=len(df))
      forecastDf = m.predict(df=futureDf)
      
      events = forecastDf[(forecastDf['event_SummerEnd'].abs() + forecastDf['event_LongWeekend'].abs()) > 0]
      events.tail()
      
      ## plotting forecasts
      fig = m.plot(forecastDf)
      
      ## plotting components
      figComp = m.plot_components(forecastDf)
      
      ## plotting parameters
      figParam = m.plot_parameters()

      Based on the daily/monthly collected data, our algorithm tries to plot the data points & predict a future trend, which will look like this –

      Future Data Points

      From the above diagram, we can conclude that the CPU’s trend has been growing day by day since the beginning. However, there are some events when we can see a momentary drop in requirements due to the climate & holidays. During those times, either people are not using them or are not at home.

      Apart from that, I’ve demonstrated the use of a benchwork framework, & splitting the data into Train, Test & Validation & captured the RMSE values. I would request you to go through that & post any questions if you have any.

      You can witness the train & validation datasets & visualize them in the standard manner, which will look something like –

      Demo

      3. readingIoT.py (Main invoking script.)


      ###############################################
      #### Written By: SATYAKI DE ####
      #### Written On: 21-Feb-2022 ####
      #### Modified On 21-Feb-2022 ####
      #### ####
      #### Objective: This python script will ####
      #### invoke the main class to use the ####
      #### stored historical IoT data stored & ####
      #### then transform, cleanse, predict & ####
      #### analyze the data points into more ####
      #### meaningful decision-making insights. ####
      ###############################################
      # We keep the setup code in a different class as shown below.
      from clsConfig import clsConfig as cf
      import datetime
      import logging
      import pandas as p
      import clsPredictIonIoT as cpt
      ###############################################
      ### Global Section ###
      ###############################################
      sep = str(cf.conf['SEP'])
      Curr_Path = str(cf.conf['INIT_PATH'])
      fileName = str(cf.conf['FILE_NAME'])
      ###############################################
      ### End of Global Section ###
      ###############################################
      def main():
      try:
      # Other useful variables
      debugInd = 'Y'
      var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      var1 = datetime.datetime.now()
      # Initiating Prediction class
      x1 = cpt.clsPredictIonIoT()
      print('Start Time: ', str(var))
      # End of useful variables
      # Initiating Log Class
      general_log_path = str(cf.conf['LOG_PATH'])
      # Enabling Logging Info
      logging.basicConfig(filename=general_log_path + 'IoT_NeuralProphet.log', level=logging.INFO)
      # Reading the source IoT data
      iotData = p.read_csv(fileName)
      df = iotData.rename(columns={'MonthlyDate': 'ds', 'AvgIoTCPUUsage': 'y'})[['ds', 'y']]
      r1 = x1.forecastSeries(df)
      if (r1 == 0):
      print('Successfully IoT forecast predicted!')
      else:
      print('Failed to predict IoT forecast!')
      var2 = datetime.datetime.now()
      c = var2 – var1
      minutes = c.total_seconds() / 60
      print('Total Run Time in minutes: ', str(minutes))
      print('End Time: ', str(var1))
      except Exception as e:
      x = str(e)
      print('Error: ', x)
      if __name__ == "__main__":
      main()

      view raw

      readingIoT.py

      hosted with ❤ by GitHub

      Here are some of the key snippets –

      # Reading the source IoT data
      iotData = p.read_csv(fileName)
      df = iotData.rename(columns={'MonthlyDate': 'ds', 'AvgIoTCPUUsage': 'y'})[['ds', 'y']]
      
      r1 = x1.forecastSeries(df)
      
      if (r1 == 0):
          print('Successfully IoT forecast predicted!')
      else:
          print('Failed to predict IoT forecast!')
      
      var2 = datetime.datetime.now()

      In those above lines, the main calling application is invoking the neural-forecasting class & passing the pandas dataframe containing IoT’s historical data to train its model.

      For your information, here is the outcome of the run, when you invoke the main calling script –

      Demo – Continue

      FOLDER STRUCTURE:

      Please find the folder structure as shown –

      Directory Structure

      So, we’ve done it.

      You will get the complete codebase in the following Github link.

      I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

      Till then, Happy Avenging! 😀

      Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

      Python-based dash framework visualizing real-time covid-19 trend.

      Hi Team,

      We’ll enhance our last post on Covid-19 prediction & try to capture them in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes. In short, this is genuinely a real-time visual dashboard displaying all the graphs, trends depending upon the third-party API source data change.

      However, I would like to share the run before we dig deep into this.

      Demo Run

      Architecture:

      Let us understand the architecture for this solution –

      Streaming Architecture

      From the above diagram, one can see that we’re maintaining a similar approach compared to our last initiative. However, we’ve used a different framework to display the data live.

      To achieve this, we’ve used a compelling python-based framework called Dash. Other than that, we’ve used Ably, Plotly & Prophet API.

      If you need to know more about our last post, please visit this link.


      Package Installation:

      Let us understand the sample packages that require for this task.

      Step – 1:

      Installing Packages

      Step – 2:

      Installing Packages – Continue

      Step – 3:

      Installing Packages – Continue

      Step – 4:

      Installing Packages – Final

      And, here is the command to install those packages –

      pip install pandas
      pip install plotly
      pip install prophet
      pip install dash
      pip install pandas
      pip install ably

      Code:

      Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

      1. clsConfig.py ( This native Python script contains the configuration entries. )


      ################################################
      #### Written By: SATYAKI DE ####
      #### Written On: 15-May-2020 ####
      #### Modified On: 09-Sep-2021 ####
      #### ####
      #### Objective: This script is a config ####
      #### file, contains all the keys for ####
      #### Machine-Learning & streaming dashboard.####
      #### ####
      ################################################
      import os
      import platform as pl
      class clsConfig(object):
      Curr_Path = os.path.dirname(os.path.realpath(__file__))
      os_det = pl.system()
      if os_det == "Windows":
      sep = '\\'
      else:
      sep = '/'
      conf = {
      'APP_ID': 1,
      'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
      'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
      'LOG_PATH': Curr_Path + sep + 'log' + sep,
      'REPORT_PATH': Curr_Path + sep + 'report',
      'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
      'SRC_PATH': Curr_Path + sep + 'data' + sep,
      'APP_DESC_1': 'Dash Integration with Ably!',
      'DEBUG_IND': 'N',
      'INIT_PATH': Curr_Path,
      'SUBDIR' : 'data',
      'ABLY_ID': 'XXX2LL.93kdkiU2:Kdsldoeie737484E',
      "URL":"https://corona-api.com/countries/&quot;,
      "appType":"application/json",
      "conType":"keep-alive",
      "limRec": 10,
      "CACHE":"no-cache",
      "MAX_RETRY": 3,
      "coList": "DE, IN, US, CA, GB, ID, BR",
      "FNC": "NewConfirmed",
      "TMS": "ReportedDate",
      "FND": "NewDeaths",
      "FinData": "Cache.csv"
      }

      view raw

      clsConfig.py

      hosted with ❤ by GitHub

      A few of the new entries, which are essential to this task are -> ABLY_ID & FinData.

      2. clsPublishStream.py ( This script will publish the data transformed for Covid-19 predictions from the third-party sources. )


      ###############################################################
      #### ####
      #### Written By: Satyaki De ####
      #### Written Date: 26-Jul-2021 ####
      #### Modified Date: 08-Sep-2021 ####
      #### ####
      #### Objective: This script will publish real-time ####
      #### streaming data coming out from a hosted API ####
      #### sources using another popular third-party service ####
      #### named Ably. Ably mimics pubsub Streaming concept, ####
      #### which might be extremely useful for any start-ups. ####
      #### ####
      ###############################################################
      from ably import AblyRest
      import logging
      import json
      from random import seed
      from random import random
      import json
      import math
      import random
      from clsConfig import clsConfig as cf
      # Global Section
      logger = logging.getLogger('ably')
      logger.addHandler(logging.StreamHandler())
      ably_id = str(cf.conf['ABLY_ID'])
      ably = AblyRest(ably_id)
      channel = ably.channels.get('sd_channel')
      # End Of Global Section
      class clsPublishStream:
      def __init__(self):
      self.fnc = cf.conf['FNC']
      def pushEvents(self, srcDF, debugInd, varVa, flg):
      try:
      # JSON data
      # This is the default data for all the identified category
      # we've prepared. You can extract this dynamically. Or, By
      # default you can set their base trade details.
      json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
      {'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]
      jdata = json.dumps(json_data)
      # Publish a message to the sd_channel channel
      channel.publish('event', jdata)
      # Capturing the inbound dataframe
      iDF = srcDF
      # Adding new selected points
      covid_dict = iDF.to_dict('records')
      jdata_fin = json.dumps(covid_dict)
      # Publish rest of the messages to the sd_channel channel
      channel.publish('event', jdata_fin)
      jdata_fin = ''
      return 0
      except Exception as e:
      x = str(e)
      print(x)
      logging.info(x)
      return 1

      We’ve already discussed this script. The only new line that appears here is –

      json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
                  {'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]

      This statement is more like a dummy feed, which creates the basic structure of your graph.

      3. clsStreamConsume.py ( This script will consume the stream from Ably Queue configuration entries. )


      ##############################################
      #### Written By: SATYAKI DE ####
      #### Written On: 26-Jul-2021 ####
      #### Modified On 08-Sep-2021 ####
      #### ####
      #### Objective: Consuming Streaming data ####
      #### from Ably channels published by the ####
      #### callPredictCovidAnalysisRealtime.py ####
      #### ####
      ##############################################
      import json
      from clsConfig import clsConfig as cf
      import requests
      import logging
      import time
      import pandas as p
      import clsL as cl
      from ably import AblyRest
      # Initiating Log class
      l = cl.clsL()
      class clsStreamConsume:
      def __init__(self):
      self.ably_id = str(cf.conf['ABLY_ID'])
      self.fileName = str(cf.conf['FinData'])
      def conStream(self, varVa, debugInd):
      try:
      ably_id = self.ably_id
      fileName = self.fileName
      var = varVa
      debug_ind = debugInd
      # Fetching the data
      client = AblyRest(ably_id)
      channel = client.channels.get('sd_channel')
      message_page = channel.history()
      # Counter Value
      cnt = 0
      # Declaring Global Data-Frame
      df_conv = p.DataFrame()
      for i in message_page.items:
      print('Last Msg: {}'.format(i.data))
      json_data = json.loads(i.data)
      # Converting JSON to Dataframe
      df = p.json_normalize(json_data)
      df.columns = df.columns.map(lambda x: x.split(".")[-1])
      if cnt == 0:
      df_conv = df
      else:
      d_frames = [df_conv, df]
      df_conv = p.concat(d_frames)
      cnt += 1
      # Resetting the Index Value
      df_conv.reset_index(drop=True, inplace=True)
      # This will check whether the current load is happening
      # or not. Based on that, it will capture the old events
      # from cache.
      if df_conv.empty:
      df_conv = p.read_csv(fileName, index = True)
      else:
      l.logr(fileName, debug_ind, df_conv, 'log')
      return df_conv
      except Exception as e:
      x = str(e)
      print(x)
      logging.info(x)
      # This will handle the error scenaio as well.
      # Based on that, it will capture the old events
      # from cache.
      try:
      df_conv = p.read_csv(fileName, index = True)
      except:
      df = p.DataFrame()
      return df

      We’ve already discussed this script in one of my earlier posts, which you will get here.

      So, I’m not going to discuss all the steps in detail.

      The only added part was to introduce some temporary local caching mechanism.

      if df_conv.empty:
          df_conv = p.read_csv(fileName, index = True)
      else:
          l.logr(fileName, debug_ind, df_conv, 'log')

      4. callPredictCovidAnalysisRealtime.py ( Main calling script to fetch the COVID-19 data from the third-party source & then publish it to the Ably message queue after transforming the data & adding the prediction using Facebook’s prophet API. )


      ##############################################
      #### Written By: SATYAKI DE ####
      #### Written On: 26-Jul-2021 ####
      #### Modified On 26-Jul-2021 ####
      #### ####
      #### Objective: Calling multiple API's ####
      #### that including Prophet-API developed ####
      #### by Facebook for future prediction of ####
      #### Covid-19 situations in upcoming days ####
      #### for world's major hotspots. ####
      ##############################################
      import json
      import clsCovidAPI as ca
      from clsConfig import clsConfig as cf
      import datetime
      import logging
      import clsL as cl
      import math as m
      import clsPublishStream as cps
      import clsForecast as f
      from prophet import Prophet
      from prophet.plot import plot_plotly, plot_components_plotly
      import matplotlib.pyplot as plt
      import pandas as p
      import datetime as dt
      import time
      # Disbling Warning
      def warn(*args, **kwargs):
      pass
      import warnings
      warnings.warn = warn
      # Initiating Log class
      l = cl.clsL()
      # Helper Function that removes underscores
      def countryDet(inputCD):
      try:
      countryCD = inputCD
      if str(countryCD) == 'DE':
      cntCD = 'Germany'
      elif str(countryCD) == 'BR':
      cntCD = 'Brazil'
      elif str(countryCD) == 'GB':
      cntCD = 'UnitedKingdom'
      elif str(countryCD) == 'US':
      cntCD = 'UnitedStates'
      elif str(countryCD) == 'IN':
      cntCD = 'India'
      elif str(countryCD) == 'CA':
      cntCD = 'Canada'
      elif str(countryCD) == 'ID':
      cntCD = 'Indonesia'
      else:
      cntCD = 'N/A'
      return cntCD
      except:
      cntCD = 'N/A'
      return cntCD
      def lookupCountry(row):
      try:
      strCD = str(row['CountryCode'])
      retVal = countryDet(strCD)
      return retVal
      except:
      retVal = 'N/A'
      return retVal
      def adjustTrend(row):
      try:
      flTrend = float(row['trend'])
      flTrendUpr = float(row['trend_upper'])
      flTrendLwr = float(row['trend_lower'])
      retVal = m.trunc((flTrend + flTrendUpr + flTrendLwr)/3)
      if retVal < 0:
      retVal = 0
      return retVal
      except:
      retVal = 0
      return retVal
      def ceilTrend(row, colName):
      try:
      flTrend = str(row[colName])
      if flTrend.find('.'):
      if float(flTrend) > 0:
      retVal = m.trunc(float(flTrend)) + 1
      else:
      retVal = m.trunc(float(flTrend))
      else:
      retVal = float(flTrend)
      if retVal < 0:
      retVal = 0
      return retVal
      except:
      retVal = 0
      return retVal
      def plot_picture(inputDF, debug_ind, var, countryCD, stat):
      try:
      iDF = inputDF
      # Lowercase the column names
      iDF.columns = [c.lower() for c in iDF.columns]
      # Determine which is Y axis
      y_col = [c for c in iDF.columns if c.startswith('y')][0]
      # Determine which is X axis
      x_col = [c for c in iDF.columns if c.startswith('ds')][0]
      # Data Conversion
      iDF['y'] = iDF[y_col].astype('float')
      iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
      # Forecast calculations
      # Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
      m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
      #m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.04525)
      #m = Prophet(n_changepoints=['2021-09-10'])
      m.fit(iDF)
      forecastDF = m.make_future_dataframe(periods=365)
      forecastDF = m.predict(forecastDF)
      l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
      df_M = forecastDF[['ds', 'trend', 'trend_lower', 'trend_upper']]
      l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
      # Getting Full Country Name
      cntCD = countryDet(countryCD)
      # Draw forecast results
      df_M['Country'] = cntCD
      l.logr('17.df_M_C_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
      df_M['AdjustTrend'] = df_M.apply(lambda row: adjustTrend(row), axis=1)
      l.logr('20.df_M_AdjustTrend_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
      return df_M
      except Exception as e:
      x = str(e)
      print(x)
      df = p.DataFrame()
      return df
      def countrySpecificDF(counryDF, val):
      try:
      countryName = val
      df = counryDF
      df_lkpFile = df[(df['CountryCode'] == val)]
      return df_lkpFile
      except:
      df = p.DataFrame()
      return df
      def toNum(row, colName):
      try:
      flTrend = str(row[colName])
      flTr, subpart = flTrend.split(' ')
      retVal = int(flTr.replace('-',''))
      return retVal
      except:
      retVal = 0
      return retVal
      def extractPredictedDF(OrigDF, MergePredictedDF, colName):
      try:
      iDF_1 = OrigDF
      iDF_2 = MergePredictedDF
      dt_format = '%Y-%m-%d'
      iDF_1_max_group = iDF_1.groupby(["Country"] , as_index=False)["ReportedDate"].max()
      iDF_2['ReportedDate'] = iDF_2.apply(lambda row: toNum(row, 'ds'), axis=1)
      col_one_list = iDF_1_max_group['Country'].tolist()
      col_two_list = iDF_1_max_group['ReportedDate'].tolist()
      print('col_one_list: ', str(col_one_list))
      print('col_two_list: ', str(col_two_list))
      cnt_1_x = 1
      cnt_1_y = 1
      cnt_x = 0
      df_M = p.DataFrame()
      for i in col_one_list:
      str_countryVal = str(i)
      cnt_1_y = 1
      for j in col_two_list:
      intReportDate = int(str(j).strip().replace('-',''))
      if cnt_1_x == cnt_1_y:
      print('str_countryVal: ', str(str_countryVal))
      print('intReportDate: ', str(intReportDate))
      iDF_2_M = iDF_2[(iDF_2['Country'] == str_countryVal) & (iDF_2['ReportedDate'] > intReportDate)]
      # Merging with the previous Country Code data
      if cnt_x == 0:
      df_M = iDF_2_M
      else:
      d_frames = [df_M, iDF_2_M]
      df_M = p.concat(d_frames)
      cnt_x += 1
      cnt_1_y += 1
      cnt_1_x += 1
      df_M.drop(columns=['ReportedDate'], axis=1, inplace=True)
      df_M.rename(columns={'ds':'ReportedDate'}, inplace=True)
      df_M.rename(columns={'AdjustTrend':colName}, inplace=True)
      return df_M
      except:
      df = p.DataFrame()
      return df
      def toPivot(inDF, colName):
      try:
      iDF = inDF
      iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
      iDF_Piv.reset_index( drop=False, inplace=True )
      list1 = ['ReportedDate']
      iDF_Arr = iDF['Country'].unique()
      list2 = iDF_Arr.tolist()
      listV = list1 + list2
      iDF_Piv.reindex([listV], axis=1)
      return iDF_Piv
      except Exception as e:
      x = str(e)
      print(x)
      df = p.DataFrame()
      return df
      def toAgg(inDF, var, debugInd, flg):
      try:
      iDF = inDF
      colName = "ReportedDate"
      list1 = list(iDF.columns.values)
      list1.remove(colName)
      list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]
      iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
      iDF.drop(columns=[colName], axis=1, inplace=True)
      ColNameGrp = "Year_Mon"
      print('List1 Aggregate:: ', str(list1))
      print('ColNameGrp :: ', str(ColNameGrp))
      iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
      iDF_T.fillna(0, inplace = True)
      print('iDF_T:: ')
      print(iDF_T)
      iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
      iDF_1_max_group['Status'] = flg
      return iDF_1_max_group
      except Exception as e:
      x = str(e)
      print(x)
      df = p.DataFrame()
      return df
      def publishEvents(inDF1, inDF2, inDF3, inDF4, var, debugInd):
      try:
      # Original Covid Data from API
      iDF1 = inDF1
      iDF2 = inDF2
      NC = 'NewConfirmed'
      ND = 'NewDeaths'
      iDF1_PV = toPivot(iDF1, NC)
      iDF1_PV['ReportedDate'] = p.to_datetime(iDF1_PV['ReportedDate'])
      l.logr('57.iDF1_PV_' + var + '.csv', debugInd, iDF1_PV, 'log')
      iDF2_PV = toPivot(iDF2, ND)
      iDF2_PV['ReportedDate'] = p.to_datetime(iDF2_PV['ReportedDate'])
      l.logr('58.iDF2_PV_' + var + '.csv', debugInd, iDF2_PV, 'log')
      # Predicted Covid Data from Facebook API
      iDF3 = inDF3
      iDF4 = inDF4
      iDF3_PV = toPivot(iDF3, NC)
      l.logr('59.iDF3_PV_' + var + '.csv', debugInd, iDF3_PV, 'log')
      iDF4_PV = toPivot(iDF4, ND)
      l.logr('60.iDF4_PV_' + var + '.csv', debugInd, iDF4_PV, 'log')
      # Now aggregating data based on year-month only
      iDF1_Agg = toAgg(iDF1_PV, var, debugInd, NC)
      l.logr('61.iDF1_Agg_' + var + '.csv', debugInd, iDF1_Agg, 'log')
      iDF2_Agg = toAgg(iDF2_PV, var, debugInd, ND)
      l.logr('62.iDF2_Agg_' + var + '.csv', debugInd, iDF2_Agg, 'log')
      iDF3_Agg = toAgg(iDF3_PV, var, debugInd, NC)
      l.logr('63.iDF3_Agg_' + var + '.csv', debugInd, iDF3_Agg, 'log')
      iDF4_Agg = toAgg(iDF4_PV, var, debugInd, ND)
      l.logr('64.iDF4_Agg_' + var + '.csv', debugInd, iDF4_Agg, 'log')
      # Initiating Ably class to push events
      x1 = cps.clsPublishStream()
      # Pushing both the Historical Confirmed Cases
      retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)
      if retVal_1 == 0:
      print('Successfully historical event pushed!')
      else:
      print('Failed to push historical events!')
      # Pushing both the Historical Death Cases
      retVal_3 = x1.pushEvents(iDF2_Agg, debugInd, var, ND)
      if retVal_3 == 0:
      print('Successfully historical event pushed!')
      else:
      print('Failed to push historical events!')
      time.sleep(5)
      # Pushing both the New Confirmed Cases
      retVal_2 = x1.pushEvents(iDF3_Agg, debugInd, var, NC)
      if retVal_2 == 0:
      print('Successfully predicted event pushed!')
      else:
      print('Failed to push predicted events!')
      # Pushing both the New Death Cases
      retVal_4 = x1.pushEvents(iDF4_Agg, debugInd, var, ND)
      if retVal_4 == 0:
      print('Successfully predicted event pushed!')
      else:
      print('Failed to push predicted events!')
      return 0
      except Exception as e:
      x = str(e)
      print(x)
      return 1
      def main():
      try:
      var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      print('*' *60)
      DInd = 'Y'
      NC = 'New Confirmed'
      ND = 'New Dead'
      SM = 'data process Successful!'
      FM = 'data process Failure!'
      print("Calling the custom Package for large file splitting..")
      print('Start Time: ' + str(var1))
      countryList = str(cf.conf['coList']).split(',')
      # Initiating Log Class
      general_log_path = str(cf.conf['LOG_PATH'])
      # Enabling Logging Info
      logging.basicConfig(filename=general_log_path + 'CovidAPI.log', level=logging.INFO)
      # Create the instance of the Covid API Class
      x1 = ca.clsCovidAPI()
      # Let's pass this to our map section
      retDF = x1.searchQry(var1, DInd)
      retVal = int(retDF.shape[0])
      if retVal > 0:
      print('Successfully Covid Data Extracted from the API-source.')
      else:
      print('Something wrong with your API-source!')
      # Extracting Skeleton Data
      df = retDF[['data.code', 'date', 'deaths', 'confirmed', 'recovered', 'new_confirmed', 'new_recovered', 'new_deaths', 'active']]
      df.columns = ['CountryCode', 'ReportedDate', 'TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases']
      df.dropna()
      print('Returned Skeleton Data Frame: ')
      print(df)
      l.logr('5.df_' + var1 + '.csv', DInd, df, 'log')
      # Due to source data issue, application will perform of
      # avg of counts based on dates due to multiple entries
      g_df = df.groupby(["CountryCode", "ReportedDate"] , as_index=False)["TotalReportedDead","TotalConfirmedCase","TotalRecovered","NewConfirmed","NewRecovered","NewDeaths","ActiveCaases"].mean()
      g_df['TotalReportedDead_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalReportedDead'), axis=1)
      g_df['TotalConfirmedCase_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalConfirmedCase'), axis=1)
      g_df['TotalRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalRecovered'), axis=1)
      g_df['NewConfirmed_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewConfirmed'), axis=1)
      g_df['NewRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewRecovered'), axis=1)
      g_df['NewDeaths_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewDeaths'), axis=1)
      g_df['ActiveCaases_M'] = g_df.apply(lambda row: ceilTrend(row, 'ActiveCaases'), axis=1)
      # Dropping old columns
      g_df.drop(columns=['TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases'], axis=1, inplace=True)
      # Renaming the new columns to old columns
      g_df.rename(columns={'TotalReportedDead_M':'TotalReportedDead'}, inplace=True)
      g_df.rename(columns={'TotalConfirmedCase_M':'TotalConfirmedCase'}, inplace=True)
      g_df.rename(columns={'TotalRecovered_M':'TotalRecovered'}, inplace=True)
      g_df.rename(columns={'NewConfirmed_M':'NewConfirmed'}, inplace=True)
      g_df.rename(columns={'NewRecovered_M':'NewRecovered'}, inplace=True)
      g_df.rename(columns={'NewDeaths_M':'NewDeaths'}, inplace=True)
      g_df.rename(columns={'ActiveCaases_M':'ActiveCaases'}, inplace=True)
      l.logr('5.g_df_' + var1 + '.csv', DInd, g_df, 'log')
      # Working with forecast
      # Create the instance of the Forecast API Class
      x2 = f.clsForecast()
      # Fetching each country name & then get the details
      cnt = 6
      cnt_x = 0
      cnt_y = 0
      df_M_Confirmed = p.DataFrame()
      df_M_Deaths = p.DataFrame()
      for i in countryList:
      try:
      cntryIndiv = i.strip()
      cntryFullName = countryDet(cntryIndiv)
      print('Country Porcessing: ' + str(cntryFullName))
      # Creating dataframe for each country
      # Germany Main DataFrame
      dfCountry = countrySpecificDF(g_df, cntryIndiv)
      l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
      # Let's pass this to our map section
      retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
      statVal = str(NC)
      a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
      # Merging with the previous Country Code data
      if cnt_x == 0:
      df_M_Confirmed = a1
      else:
      d_frames = [df_M_Confirmed, a1]
      df_M_Confirmed = p.concat(d_frames)
      cnt_x += 1
      retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
      statVal = str(ND)
      a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
      # Merging with the previous Country Code data
      if cnt_y == 0:
      df_M_Deaths = a2
      else:
      d_frames = [df_M_Deaths, a2]
      df_M_Deaths = p.concat(d_frames)
      cnt_y += 1
      # Printing Proper message
      if (a1 + a2) == 0:
      oprMsg = cntryFullName + ' ' + SM
      print(oprMsg)
      else:
      oprMsg = cntryFullName + ' ' + FM
      print(oprMsg)
      # Resetting the dataframe value for the next iteration
      dfCountry = p.DataFrame()
      cntryIndiv = ''
      oprMsg = ''
      cntryFullName = ''
      a1 = 0
      a2 = 0
      statVal = ''
      cnt += 1
      except Exception as e:
      x = str(e)
      print(x)
      l.logr('49.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
      l.logr('50.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
      # Removing unwanted columns
      df_M_Confirmed.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
      df_M_Deaths.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
      l.logr('51.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
      l.logr('52.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
      # Creating original dataframe from the source API
      df_M_Confirmed_Orig = g_df[['CountryCode', 'ReportedDate','NewConfirmed']]
      df_M_Deaths_Orig = g_df[['CountryCode', 'ReportedDate','NewDeaths']]
      # Transforming Country Code
      df_M_Confirmed_Orig['Country'] = df_M_Confirmed_Orig.apply(lambda row: lookupCountry(row), axis=1)
      df_M_Deaths_Orig['Country'] = df_M_Deaths_Orig.apply(lambda row: lookupCountry(row), axis=1)
      # Dropping unwanted column
      df_M_Confirmed_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
      df_M_Deaths_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
      # Reordering columns
      df_M_Confirmed_Orig = df_M_Confirmed_Orig.reindex(['ReportedDate','Country','NewConfirmed'], axis=1)
      df_M_Deaths_Orig = df_M_Deaths_Orig.reindex(['ReportedDate','Country','NewDeaths'], axis=1)
      l.logr('53.df_M_Confirmed_Orig_' + var1 + '.csv', DInd, df_M_Confirmed_Orig, 'log')
      l.logr('54.df_M_Deaths_Orig_' + var1 + '.csv', DInd, df_M_Deaths_Orig, 'log')
      # Filter out only the predicted data
      filterDF_1 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewConfirmed')
      l.logr('55.filterDF_1_' + var1 + '.csv', DInd, filterDF_1, 'log')
      filterDF_2 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewDeaths')
      l.logr('56.filterDF_2_' + var1 + '.csv', DInd, filterDF_2, 'log')
      # Calling the final publish events
      retVa = publishEvents(df_M_Confirmed_Orig, df_M_Deaths_Orig, filterDF_1, filterDF_2, var1, DInd)
      if retVa == 0:
      print('Successfully stream processed!')
      else:
      print('Failed to process stream!')
      var2 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      print('End Time: ' + str(var2))
      print('*' *60)
      except Exception as e:
      x = str(e)
      print(x)
      if __name__ == "__main__":
      main()

      Let us understand the enhancement part of this script –

      We’ve taken out the plotly part as we will use a separate dashboard script to visualize the data trend.

      However, we need to understand the initial consumed data from API & how we transform the data, which will be helpful for visualization.

      The initial captured data should look like this after extracting only the relevant elements from the API response.

      Initial Data from API

      As you can see that based on the country & reported date, our application is consuming attributes like Total-Reported-Death, Total-Recovered, New-death, New-Confirmed & so on.

      From this list, we’ve taken two attributes for our use cases & they are New-Death & New-Confirmed. Also, we’re predicting the Future-New-Death & Future-New-Confirmed based on the historical data using Facebook’s prophet API.

      And, we would be transposing them & extract the countries & put them as columns for better representations.

      Transposed Data

      Hence, here is the code that we should be exploring –

      def toPivot(inDF, colName):
          try:
              iDF = inDF
      
              iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
              iDF_Piv.reset_index( drop=False, inplace=True )
      
              list1 = ['ReportedDate']
      
              iDF_Arr = iDF['Country'].unique()
              list2 = iDF_Arr.tolist()
      
              listV = list1 + list2
      
              iDF_Piv.reindex([listV], axis=1)
      
              return iDF_Piv
          except Exception as e:
              x = str(e)
              print(x)
      
              df = p.DataFrame()
      
              return df

      Now, using the pivot_table function, we’re transposing the row values into the columns. And, later we’ve realigned the column heading as per our desired format.

      However, we still have the data as per individual daily dates in this case. We want to eliminate that by removing the daypart & then aggregate them by month as shown below –

      Aggregated Data

      And, here is the code for that –

      def toAgg(inDF, var, debugInd, flg):
          try:
              iDF = inDF
              colName = "ReportedDate"
      
              list1 = list(iDF.columns.values)
              list1.remove(colName)
      
              list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]
      
              iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
              iDF.drop(columns=[colName], axis=1, inplace=True)
      
              ColNameGrp = "Year_Mon"
              print('List1 Aggregate:: ', str(list1))
              print('ColNameGrp :: ', str(ColNameGrp))
      
              iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
              iDF_T.fillna(0, inplace = True)
              print('iDF_T:: ')
              print(iDF_T)
      
              iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
              iDF_1_max_group['Status'] = flg
      
              return iDF_1_max_group
          except Exception as e:
              x = str(e)
              print(x)
      
              df = p.DataFrame()
      
              return df

      From the above snippet we can conclude that the application is taking out the daypart & then aggregate it based on the Year_Mon attribute.

      The following snippet will push the final transformed data to Ably queue –

      x1 = cps.clsPublishStream()
      
      # Pushing both the Historical Confirmed Cases
      retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)
      
      if retVal_1 == 0:
          print('Successfully historical event pushed!')
      else:
          print('Failed to push historical events!')

      5. dashboard_realtime.py ( Main calling script to consume the data from Ably queue & then visualize the trend. )


      ##############################################
      #### Written By: SATYAKI DE ####
      #### Written On: 08-Sep-2021 ####
      #### Modified On 08-Sep-2021 ####
      #### ####
      #### Objective: This is the main script ####
      #### to invoke dashboard after consuming ####
      #### streaming real-time predicted data ####
      #### using Facebook API & Ably message Q. ####
      #### ####
      #### This script will show the trend ####
      #### comparison between major democracies ####
      #### of the world. ####
      #### ####
      ##############################################
      import datetime
      import dash
      from dash import dcc
      from dash import html
      import plotly
      from dash.dependencies import Input, Output
      from ably import AblyRest
      from clsConfig import clsConfig as cf
      import pandas as p
      # Main Class to consume streaming
      import clsStreamConsume as ca
      import numpy as np
      # Create the instance of the Covid API Class
      x1 = ca.clsStreamConsume()
      external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css'%5D
      app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
      app.layout = html.Div(
      html.Div([
      html.H1("Covid-19 Trend Dashboard",
      className='text-center text-primary mb-4'),
      html.H5(children='''
      Dash: Covid-19 Trend – (Present Vs Future)
      '''),
      html.P("Covid-19: New Confirmed Cases:",
      style={"textDecoration": "underline"}),
      dcc.Graph(id='live-update-graph-1'),
      html.P("Covid-19: New Death Cases:",
      style={"textDecoration": "underline"}),
      dcc.Graph(id='live-update-graph-2'),
      dcc.Interval(
      id='interval-component',
      interval=5*1000, # in milliseconds
      n_intervals=0
      )
      ], className="row", style={'marginBottom': 10, 'marginTop': 10})
      )
      def to_OptimizeString(row):
      try:
      x_str = str(row['Year_Mon'])
      dt_format = '%Y%m%d'
      finStr = x_str + '01'
      strReportDate = datetime.datetime.strptime(finStr, dt_format)
      return strReportDate
      except Exception as e:
      x = str(e)
      print(x)
      dt_format = '%Y%m%d'
      var = '20990101'
      strReportDate = datetime.strptime(var, dt_format)
      return strReportDate
      def fetchEvent(var1, DInd):
      try:
      # Let's pass this to our map section
      iDF_M = x1.conStream(var1, DInd)
      # Converting Year_Mon to dates
      iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)
      # Dropping old columns
      iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)
      #Renaming new column to old column
      iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)
      return iDF_M
      except Exception as e:
      x = str(e)
      print(x)
      iDF_M = p.DataFrame()
      return iDF_M
      # Multiple components can update everytime interval gets fired.
      @app.callback(Output('live-update-graph-1', 'figure'),
      Input('interval-component', 'n_intervals'))
      def update_graph_live(n):
      try:
      var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      print('*' *60)
      DInd = 'Y'
      # Let's pass this to our map section
      retDF = fetchEvent(var1, DInd)
      # Create the graph with subplots
      #fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
      fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
      # Routing data to dedicated DataFrame
      retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]
      # Adding different chart into one dashboard
      # First Use Case – New Confirmed
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)
      return fig
      except Exception as e:
      x = str(e)
      print(x)
      # Create the graph with subplots
      fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
      fig['layout']['margin'] = {
      'l': 30, 'r': 10, 'b': 30, 't': 10
      }
      fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
      return fig
      # Multiple components can update everytime interval gets fired.
      @app.callback(Output('live-update-graph-2', 'figure'),
      Input('interval-component', 'n_intervals'))
      def update_graph_live(n):
      try:
      var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      print('*' *60)
      DInd = 'Y'
      # Let's pass this to our map section
      retDF = fetchEvent(var1, DInd)
      # Create the graph with subplots
      #fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
      fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
      # Routing data to dedicated DataFrame
      retDFND = retDF.loc[(retDF['Status'] == 'NewDeaths')]
      # Adding different chart into one dashboard
      # Second Use Case – New Confirmed
      fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Brazil,'type':'bar','name':'Brazil'},1,1)
      fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Canada,'type':'bar','name':'Canada'},1,1)
      fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Germany,'type':'bar','name':'Germany'},1,1)
      fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.India,'type':'bar','name':'India'},1,1)
      fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Indonesia,'type':'bar','name':'Indonesia'},1,1)
      fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedKingdom,'type':'bar','name':'United Kingdom'},1,1)
      fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedStates,'type':'bar','name':'United States'},1,1)
      return fig
      except Exception as e:
      x = str(e)
      print(x)
      # Create the graph with subplots
      fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
      fig['layout']['margin'] = {
      'l': 30, 'r': 10, 'b': 30, 't': 10
      }
      fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
      return fig
      if __name__ == '__main__':
      app.run_server(debug=True)

      Let us explore the critical snippet as this is a brand new script –

      external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
      
      app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
      
      app.layout = html.Div(
          html.Div([
              html.H1("Covid-19 Trend Dashboard",
                              className='text-center text-primary mb-4'),
              html.H5(children='''
                  Dash: Covid-19 Trend - (Present Vs Future)
              '''),
              html.P("Covid-19: New Confirmed Cases:",
                     style={"textDecoration": "underline"}),
              dcc.Graph(id='live-update-graph-1'),
              html.P("Covid-19: New Death Cases:",
                     style={"textDecoration": "underline"}),
              dcc.Graph(id='live-update-graph-2'),
              dcc.Interval(
                  id='interval-component',
                  interval=5*1000, # in milliseconds
                  n_intervals=0
              )
          ], className="row", style={'marginBottom': 10, 'marginTop': 10})
      )

      You need to understand the basics of HTML as this framework works seamlessly with it. To know more about the supported HTML, one needs to visit the following link.

      def to_OptimizeString(row):
          try:
              x_str = str(row['Year_Mon'])
      
              dt_format = '%Y%m%d'
              finStr = x_str + '01'
      
              strReportDate = datetime.datetime.strptime(finStr, dt_format)
      
              return strReportDate
      
          except Exception as e:
              x = str(e)
              print(x)
      
              dt_format = '%Y%m%d'
              var = '20990101'
      
              strReportDate = datetime.strptime(var, dt_format)
      
              return strReportDate

      The application is converting Year-Month combinations from string to date for better projection.

      Also, we’ve implemented a dashboard that will refresh every five milliseconds.

      def fetchEvent(var1, DInd):
          try:
              # Let's pass this to our map section
              iDF_M = x1.conStream(var1, DInd)
      
              # Converting Year_Mon to dates
              iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)
      
              # Dropping old columns
              iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)
      
              #Renaming new column to old column
              iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)
      
              return iDF_M
      
          except Exception as e:
              x = str(e)
              print(x)
      
              iDF_M = p.DataFrame()
      
              return iDF_M

      The application will consume all the events from the Ably Queue using the above snippet.

      @app.callback(Output('live-update-graph-1', 'figure'),
                    Input('interval-component', 'n_intervals'))
      def update_graph_live(n):

      We’ve implemented the callback mechanism to get the latest data from the Queue & then update the graph accordingly & finally share the updated chart & return that to our method, which is calling it.

      # Routing data to dedicated DataFrame
      retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]

      Based on the flag, we’re pushing the data into our target dataframe, from where the application will consume the data into the charts.

      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
      fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)

      Different country’s KPI elements are fetched & mapped into their corresponding axis to project the graph with visual details.

      Same approach goes for the other graph as well.


      Run:

      Let us run the application –

      Run – Beginning
      Run – Finishing Stage

      Dashboard:

      Dashboard Job Run
      Dashboard Visualization

      So, we’ve done it.

      You will get the complete codebase in the following Github link.

      I’ll bring some more exciting topic in the coming days from the Python verse.

      Till then, Happy Avenging! 😀


      Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

      One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

      There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

      Converting text to voice in Python

      Hi Guys!

      Today, we’ll be discussing one new post of converting text into a voice using some third-party APIs. This is particularly very useful in many such cases, where you can use this method to get more realistic communication.

      There are many such providers, where you can get an almost realistic voice for both males & females. However, most of them are subscription-based. So, you have to be very careful about your budget & how to proceed.

      For testing purposes, I’ll be using voice.org to simulate this.

      Let’s look out the architecture of this process –

      FlowS

      As you can see, the user-initiated the application & provide some input in the form of plain text. Once the data is given, the app will send it to the third-party API for the process. Now, the Third-party API will verify the authentication & then it will check all the associate parameters before it starting to generate the audio response. After that, it will send the payload & that will be received by the calling python application. Here, it will be decoded & create the audio file & finally, that will be played at the invoking computer.

      This third-party API has lots of limitations. However, they are giving you the platform to test your concept.

      As of now, they support the following languages – English, Chinese, Catalan, French, Finnish, Dutch, Danish, German, Italian, Japanese, Korean, Polish, Norwegian, Portuguese, Russian, Spanish & Sweedish.

      In our case, we’ll be checking with English.

      To work with this, you need to have the following modules installed in python –

      • playsound
      • requests
      • base64

      Let’s see the directory structure –

      1. Directory

      Again, we are not going to discuss any script, which we’ve already discussed here.

      Hence, we’re skipping clsL.py here.

      1. clsConfig.py (This script contains all the parameters of the server.)

      ##############################################
      #### Written By: SATYAKI DE               ####
      #### Written On: 12-Oct-2019              ####
      ####                                      ####
      #### Objective: This script is a config   ####
      #### file, contains all the keys for      ####
      #### azure cosmos db. Application will    ####
      #### process these information & perform  ####
      #### various CRUD operation on Cosmos DB. ####
      ##############################################
      
      import os
      import platform as pl
      
      class clsConfig(object):
          Curr_Path = os.path.dirname(os.path.realpath(__file__))
      
          os_det = pl.system()
          if os_det == "Windows":
              sep = '\\'
          else:
              sep = '/'
      
          config = {
              'APP_ID': 1,
              'url': "https://voicerss-text-to-speech.p.rapidapi.com/",
              'host': "voicerss-text-to-speech.p.rapidapi.com",
              'api_key': "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
              'targetFile': "Bot_decode.mp3",
              'pitch_speed': "-6",
              'bot_language': "en-us",
              'audio_type': "mp3",
              'audio_freq': "22khz_8bit_stereo",
              'query_string_api': "hhhhhhhhhhhhhhhhhhhhhhhhhhhh",
              'b64_encoding': True,
              'APP_DESC_1': 'Text to voice conversion.',
              'DEBUG_IND': 'N',
              'INIT_PATH': Curr_Path,
              'LOG_PATH': Curr_Path + sep + 'log' + sep
          }

      For security reasons, sensitive information masked with the dummy value.

      ‘api_key’: “xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx”,

      ‘query_string_api’: “hhhhhhhhhhhhhhhhhhhhhhhhhhhh”,

      This two information is private to each subscriber. Hence, I’ve removed them & updated with some dummy values.

      You have to fill-up with your subscribed information.

      2. clsText2Voice.py (This script will convert the text data into an audio file using a GET API request from the third-party API & then play that using the web media player.)

      ###############################################
      #### Written By: SATYAKI DE                ####
      #### Written On: 27-Oct-2019               ####
      #### Modified On 27-Oct-2019               ####
      ####                                       ####
      #### Objective: Main class converting      ####
      #### text to voice using third-party API.  ####
      ###############################################
      
      from playsound import playsound
      import requests
      import base64
      from clsConfig import clsConfig as cf
      
      class clsText2Voice:
          def __init__(self):
              self.url = cf.config['url']
              self.api_key = cf.config['api_key']
              self.targetFile = cf.config['targetFile']
              self.pitch_speed = cf.config['pitch_speed']
              self.bot_language = cf.config['bot_language']
              self.audio_type = cf.config['audio_type']
              self.audio_freq = cf.config['audio_freq']
              self.b64_encoding = cf.config['b64_encoding']
              self.query_string_api = cf.config['query_string_api']
              self.host = cf.config['host']
      
          def getAudio(self, srcString):
              try:
                  url = self.url
                  api_key = self.api_key
                  tarFile = self.targetFile
                  pitch_speed = self.pitch_speed
                  bot_language = self.bot_language
                  audio_type = self.audio_type
                  audio_freq = self.audio_freq
                  b64_encoding = self.b64_encoding
                  query_string_api = self.query_string_api
                  host = self.host
      
                  querystring = {
                      "r": pitch_speed,
                      "c": audio_type,
                      "f": audio_freq,
                      "src": srcString,
                      "hl": bot_language,
                      "key": query_string_api,
                      "b64": b64_encoding
                  }
      
                  headers = {
                      'x-rapidapi-host': host,
                      'x-rapidapi-key': api_key
                  }
      
                  response = requests.request("GET", url, headers=headers, params=querystring)
      
                  # Converting to MP3
                  targetFile = tarFile
                  mp3File_64_decode = base64.decodebytes(bytes(response.text, encoding="utf-8"))
                  mp3File_result = open(targetFile, 'wb')
      
                  # create a writable mp3File and write the decoding result
                  mp3File_result.write(mp3File_64_decode)
                  mp3File_result.close()
      
                  playsound(targetFile)
      
                  return 0
              except Exception as e:
                  x = str(e)
                  print('Error: ', x)
      
                  return 1

      Few crucial lines from the above script –

      querystring = {
          "r": pitch_speed,
          "c": audio_type,
          "f": audio_freq,
          "src": srcString,
          "hl": bot_language,
          "key": query_string_api,
          "b64": b64_encoding
      }

      You can configure the voice of the audio by adjusting all the configurations. And, the text content will receive at srcString. So, whatever user will be typing that will be directly captured here & form the JSON payload accordingly.

      response = requests.request("GET", url, headers=headers, params=querystring)

      In this case, you will be receiving the audio file in the form of a base64 text file. Hence, you need to convert them back to the sound file by these following lines –

      # Converting to MP3
      targetFile = tarFile
      mp3File_64_decode = base64.decodebytes(bytes(response.text, encoding="utf-8"))
      mp3File_result = open(targetFile, 'wb')
      
      # create a writable mp3File and write the decoding result
      mp3File_result.write(mp3File_64_decode)
      mp3File_result.close()

      As you can see that, we’ve extracted the response.text & then we’ve decoded that to byte object to form the mp3 sound file at the receiving end.

      Once we have our mp3 file ready, the following line simply plays the audio record.

      playsound(targetFile)

      Thus you can hear the actual voice.

      3. callText2Voice.py (This is the main script that will invoke the text to voice API & then playback the audio once it gets the response from the third-party API.)

      ###############################################
      #### Written By: SATYAKI DE                ####
      #### Written On: 27-Oct-2019               ####
      #### Modified On 27-Oct-2019               ####
      ####                                       ####
      #### Objective: Main class converting      ####
      #### text to voice using third-party API.  ####
      ###############################################
      
      from clsConfig import clsConfig as cf
      import clsL as cl
      import logging
      import datetime
      import clsText2Voice as ct
      
      # Disbling Warning
      def warn(*args, **kwargs):
          pass
      
      import warnings
      warnings.warn = warn
      
      var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
      
      def main():
          try:
              ret_2 = ''
              debug_ind = 'Y'
      
              general_log_path = str(cf.config['LOG_PATH'])
      
              # Enabling Logging Info
              logging.basicConfig(filename=general_log_path + 'consolidatedTwitter.log', level=logging.INFO)
      
              # Initiating Log Class
              l = cl.clsL()
      
              # Moving previous day log files to archive directory
              log_dir = cf.config['LOG_PATH']
      
              tmpR0 = "*" * 157
      
              logging.info(tmpR0)
              tmpR9 = 'Start Time: ' + str(var)
              logging.info(tmpR9)
              logging.info(tmpR0)
      
              print("Log Directory::", log_dir)
              tmpR1 = 'Log Directory::' + log_dir
              logging.info(tmpR1)
      
              # Query using parameters
              rawQry = str(input('Enter your string:'))
      
              x1 = ct.clsText2Voice()
              ret_2 = x1.getAudio(rawQry)
      
              if ret_2 == 0:
                  print("Successfully converted from text to voice!")
                  logging.info("Successfully converted from text to voice!")
                  print("*" * 157)
                  logging.info(tmpR0)
              else:
                  print("Successfuly converted!")
                  logging.info("Successfuly converted!")
                  print("*" * 157)
                  logging.info(tmpR0)
      
              print("*" * 157)
              logging.info(tmpR0)
      
              tmpR10 = 'End Time: ' + str(var)
              logging.info(tmpR10)
              logging.info(tmpR0)
      
          except ValueError:
              print("No relevant data to proceed!")
              logging.info("No relevant data to proceed!")
      
          except Exception as e:
              print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
      
      if __name__ == "__main__":
          main()

      Essential lines from the above script –

      # Query using parameters
      rawQry = str(input('Enter your string:'))
      
      x1 = ct.clsText2Voice()
      ret_2 = x1.getAudio(rawQry)

      As you can see, here the user will be passing the text content, which will be given to our class & then it will project the audio sound of that text.

      Let’s see how it runs –

      Input Text: Welcome to Satyaki De’s blog. This site mainly deals with the Python, SQL from different DBs & many useful areas from the leading cloud providers.

      And, here is the run command under Windows OS looks like –

      2. Windows_Run

      And, please find the sample voice that it generates –

      So, We’ve done it! 😀

      Let us know your comment on this.

      So, we’ll come out with another exciting post in the coming days!

      N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.

      Building Azure Databricks Cluster installing desired packages & with a demo run (Time stone from Python Verse)

      Today, I’ll be showing how to prepare a cluster in Azure Databricks from command prompt & will demonstrate any sample csv file process using Pyspark. This can be useful, especially when you want to customize your environment & need to install specific packages inside the clusters with more options.

      This is not like any of my earlier posts, where my primary attention is on the Python side. At the end of this post, I’ll showcase one use of Pyspark script & how we can execute them inside Azure Data bricks.

      Let’s roll the dice!

      Step -1:

      Type Azure Databricks in your search folder inside the Azure portal.

      0. Azure Search

      As shown in the red box, you have to click these options. And, it will take the application to new data bricks sign-in page.

      Step -2:

      Next step would be clicking the “Add” button. For the first time, the application will ask you to create a storage account associated with this brick.

      1. Create Storage

      After creation, the screen should look like this –

      2.5. Azure-Data-Bricks Options

      Now, click the Azure command-line & chose bash as your work environment –

      2. After Creation

      For security reason, I’ve masked the details.

      After successful creation, this page should look like this –

      3. Azure Databricks

      Once, you click the launch workspace, it will take you to this next page –

      4. Detailed Bricks

      As you can see that, there are no notebook or python scripts there under Recents tab.

      Step -3:

      Let’s verify it from the command line shell environment.

      5. Python-Env

      As you can see, by default python version in bricks is 3.5.2.

      Step -4:

      Now, we’ll prepare one environment by creating a local directory under the cloud.

      The directory that we’ll be creating is – “rndBricks.”

      6. Creating Directory

      Step -5:

      Let’s create the virtual environment here –

      Using “virtualenv” function, we’ll be creating the virtual environment & it should look like this –

      7. Creating Python-VM

      As you can see, that – this will create the first python virtual environment along with the pip & wheel, which is essential for your python environment.

      After creating the VM, you need to update Azure CLI, which is shown in the next screenshot given below –

      8. Installing Databricks CLI in Python-VM

      Before you create the cluster, you need to first generate the token, which will be used for the cluster –

      9.1. Generating Token

      As shown in the above screen, the “red” marked area is our primary interest. The “green” box, which represents the account image that you need to click & then you have to click “User Settings” marked in blue. Once you click that, you can see the “purple” area, where you need to click the Generate new token button in case if you are doing it for the first time.

      Now, we’ll be using this newly generated token to configure data bricks are as follows –

      9.2. Configuring with Token

      Make sure, you need to mention the correct zone, i.e. westus2/westus or any region as per your geography & convenience.

      Once, that is done. You can check the cluster list by the following command (In case, if you already created any clusters in your subscription) –

      10. Checking Clusters List

      Since we’re building it from scratch. There is no cluster information showing here.

      Step -6:

      Let’s create the clusters –

      11. Creating-Clusters-From-Command

      Please find the command that you will be using are as follows –

      databricks clusters create –json ‘{ “autoscale”: {“min_workers”: 2, “max_workers”: 8}, “cluster_name”: “pyRnd”, “spark_version”: “5.3.x-scala2.11”, “spark_conf”: {}, “node_type_id”: “Standard_DS3_v2”, “driver_node_type_id”: “Standard_DS3_v2”, “ssh_public_keys”: [], “custom_tags”: {}, “spark_env_vars”: {“PYSPARK_PYTHON”: “/databricks/python3/bin/python3”}, “autotermination_minutes”: 20, “enable_elastic_disk”: true, “cluster_source”: “UI”, “init_scripts”: [] }’

      As you can see, you need to pass the information in JSON format. For your better understanding, please find the JSON in a proper format –

      11.5. JSON

      And, the raw version –

      {
        "autoscale": {
          "min_workers": 2,
          "max_workers": 8
        },
        "cluster_name": "pyRnd",
        "spark_version": "5.3.x-scala2.11",
        "spark_conf": {},
        "node_type_id": "Standard_DS3_v2",
        "driver_node_type_id": "Standard_DS3_v2",
        "ssh_public_keys": [],
        "custom_tags": {},
        "spark_env_vars": {
          "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
        },
        "autotermination_minutes": 20,
        "enable_elastic_disk": true,
        "cluster_source": "UI",
        "init_scripts": []
      }

      Initially, the cluster status will show from the GUI are as follows –

      12. Cluster-Status-In-Progress

      After a few minutes, this will show the running state –

      13. Cluster-Running Status

      Let’s check the detailed configuration once the cluster created –

      14. Initial Cluster Details

      Step -7:

      We need to check the library section. This is important as we might need to install many dependant python package to run your application on Azure data bricks. And, the initial Libraries will look like this –

      15. Libraries

      You can install libraries into an existing cluster either through GUI or through shell command prompt as well. Let’s explore the GUI option.

      GUI Option:

      First, click the Libraries tab under your newly created clusters, as shown in the above picture. Then you need to click “Install New” button. This will pop-up the following windows –

      16. Installing Libraries

      As you can see, you have many options along with the possibilities for your python (marked in red) application as well.

      Case 1 (Installing PyPi packages):

      19. Installing through GUI

      Note: You can either mention the specific version or just simply name the package name.

      Case 2 (Installing Wheel packages):

      16.5. Installing Wheel Libraries

      As you can see, from the upload options, you can upload your local libraries & then click the install button to install the same.

      UI Option:

      Here is another way, you can install your python libraries using the command line as shown in the below screenshots –

      17. Running & Installing Libraries - Alternate Options

      Few things to notice. The first command shows the current running cluster list. Second, command updating your pip packages. And, the third command, install your desired pypi packages.

      Please find the raw commands –

      databricks clusters list

      pip install -U pip

      databricks libraries install –cluster-id “XXXX-XXXXX-leech896” –pypi-package “pandas” –pypi-repo “https://pypi.org/project/pandas/&#8221;

      After installing, the GUI page under the libraries section will look like this –

      18. Installed Libraries

      Note that, for any failed case, you can check the log in this way –

      20. Installation-In-progress

      If you click on the marked red area, it will pop-up the detailed error details, which is as follows –

      19.5. Error Details

      So, we’re done with our initial set-up.

      Let’s upload one sample file into this environment & try to parse the data.

      Step -8:

      You can upload your sample file as follows –

      23.1. First Step

      First, click the “data” & then click the “add data” marked in the red box.

      You can import this entire csv data as tables as shown in the next screenshot –

      23.2. Uploading Data Files

      Also, you can create a local directory here based on your requirements are explained as –

      24. Creating Local Directory For Process

      Step -9:

      Let’s run the code.

      Please find the following snippet in PySpark for our test –

      1. DBFromFile.py (This script will call the Bricks script & process the data to create an SQL like a table for our task.)

      ###########################################
      #### Written By: SATYAKI DE        ########
      #### Written On: 10-Feb-2019       ########
      ####                               ########
      #### Objective: Pyspark File to    ########
      #### parse the uploaded csv file.  ########
      ###########################################
      
      # File location and type
      file_location = "/FileStore/tables/src_file/customer_addr_20180112.csv"
      file_type = "csv"
      
      # CSV options
      infer_schema = "false"
      first_row_is_header = "true"
      delimiter = ","
      
      # The applied options are for CSV files. For other file types, these will be ignored.
      df = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load(file_location)
      
      display(df)
      
      # Create a view or table
      
      temp_table_name = "customer_addr_20180112_csv"
      
      df.createOrReplaceTempView(temp_table_name)
      
      %sql
      
      /* Query the created temp table in a SQL cell */
      
      select * from `customer_addr_20180112_csv`

      From the above sample snippet, one can see that the application is trying to parse the source data by providing all the parsing details & then use that csv as a table in SQL.

      Let’s check step by step execution.

      25. Working With Uploaded File

      So, until this step, you can see that the application has successfully parsed the csv data.

      And, finally, you can view the data –

      25.1. Second Option

      As the highlighted blue box shows that the application is using this csv file as a table. So, you have many options to analyze the information flexibly if you are familiar with SQL.

      After your job run, make sure you terminate your cluster. Otherwise, you’ll receive a large & expensive usage bill, which you might not want!

      So, finally, we’ve done it.

      Let me know what do you think.

      Till then, Happy Avenging! 😀

      Note: All the data posted here are representational data & available over the internet & for educational purpose only.