RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

The following are the important packages that are essential to this project –

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from flask import Flask, jsonify, request, session
from flask_cors import CORS
from werkzeug.security import check_password_hash, generate_password_hash
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
import pandas as pd
from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsContentScrapper as csc
import clsRAGOpenAI as crao
import csv
from datetime import timedelta
import os
import re
import json

########################################################
################    Global Area   ######################
########################################################
#Initiating Logging Instances
clog = log.clsL()

admin_key = cf.conf['ADMIN_KEY']
secret_key = cf.conf['SECRET_KEY']
session_path = cf.conf['SESSION_PATH']
sessionFile = cf.conf['SESSION_CACHE_FILE']

app = Flask(__name__)
CORS(app)  # This will enable CORS for all routes
app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
app.secret_key = secret_key

jwt = JWTManager(app)

users = cf.conf['USER_NM']
passwd = cf.conf['USER_PWD']

cCScrapper = csc.clsContentScrapper()
cr = crao.clsRAGOpenAI()

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Define the aggregation functions
def join_unique(series):
    unique_vals = series.drop_duplicates().astype(str)
    return ', '.join(filter(lambda x: x != 'nan', unique_vals))

# Building the preaggregate cache
def groupImageWiki():
    try:
        base_path = cf.conf['OUTPUT_PATH']
        inputFile = cf.conf['CLEANED_FILE']
        outputFile = cf.conf['CLEANED_FILE_SHORT']
        subdir = cf.conf['SUBDIR_OUT']
        Ind = cf.conf['DEBUG_IND']

        inputCleanedFileLookUp = base_path + inputFile

        #Opening the file in dataframe
        df = pd.read_csv(inputCleanedFileLookUp)
        hash_values = df['Total_Hash'].unique()

        dFin = df[['primaryImage','Wiki_URL','Total_Hash']]

        # Ensure columns are strings and not NaN
        # Convert columns to string and replace 'nan' with an empty string
        dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
        dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')

        dFin.drop_duplicates()

        # Group by 'Total_Hash' and aggregate
        dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()

        return dfAgg

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df

resDf = groupImageWiki()

########################################################
################  End  Global Area  ####################
########################################################

def extractRemoveUrls(hash_value):
    image_urls = ''
    wiki_urls = ''
    # Parse the inner message JSON string
    try:

        resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
        filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]

        if not filtered_df.empty:
            image_urls = filtered_df['primaryImage'].values[0]
            wiki_urls = filtered_df['Wiki_URL'].values[0]

        return image_urls, wiki_urls

    except Exception as e:
        x = str(e)
        print('extractRemoveUrls Error: ', x)
        return image_urls, wiki_urls

def isIncomplete(line):
    """Check if a line appears to be incomplete."""

    # Check if the line ends with certain patterns indicating it might be incomplete.
    incomplete_patterns = [': [Link](', ': Approximately ', ': ']
    return any(line.endswith(pattern) for pattern in incomplete_patterns)

def filterData(data):
    """Return only the complete lines from the data."""

    lines = data.split('\n')
    complete_lines = [line for line in lines if not isIncomplete(line)]

    return '\n'.join(complete_lines)

def updateCounter(sessionFile):
    try:
        counter = 0

        # Check if the CSV file exists
        if os.path.exists(sessionFile):
            with open(sessionFile, 'r') as f:
                reader = csv.reader(f)
                for row in reader:
                    # Assuming the counter is the first value in the CSV
                    counter = int(row[0])

        # Increment counter
        counter += 1

        # Write counter back to CSV
        with open(sessionFile, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([counter])

        return counter
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

def getPreviousResult():
    try:
        fullFileName = session_path + sessionFile
        newCounterValue = updateCounter(fullFileName)

        return newCounterValue
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username', None)
    password = request.json.get('password', None)

    print('User Name: ', str(username))
    print('Password: ', str(password))

    #if username not in users or not check_password_hash(users.get(username), password):
    if ((username not in users) or (password not in passwd)):
        return jsonify({'login': False}), 401

    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

@app.route('/chat', methods=['POST'])
def get_chat():
    try:
        #session["key"] = "1D98KI"
        #session_id = session.sid
        #print('Session Id: ', str(session_id))

        cnt = getPreviousResult()
        print('Running Session Count: ', str(cnt))

        username = request.json.get('username', None)
        message = request.json.get('message', None)

        print('User: ', str(username))
        print('Content: ', str(message))

        if cnt == 1:
            retList = cCScrapper.extractCatalog()
        else:
            hashValue, cleanedData = cr.getData(str(message))
            print('Main Hash Value:', str(hashValue))

            imageUrls, wikiUrls = extractRemoveUrls(hashValue)
            print('Image URLs: ', str(imageUrls))
            print('Wiki URLs: ', str(wikiUrls))
            print('Clean Text:')
            print(str(cleanedData))
            retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'

        response = {
            'message': retList
        }

        print('JSON: ', str(response))
        return jsonify(response)

    except Exception as e:
        x = str(e)

        response = {
            'message': 'Error: ' + x
        }
        return jsonify(response)

@app.route('/api/data', methods=['GET'])
@jwt_required()
def get_data():
    response = {
        'message': 'Hello from Flask!'
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

Let us understand some of the important sections of the above script –

Function – login():

The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

Function – get_chat():

The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

Function – updateCounter():

The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

Function – extractRemoveUrls():

The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

  • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 27-May-2023                     ####
#### Modified On 28-May-2023                     ####
####                                             ####
#### Objective: This is the main calling         ####
#### python class that will invoke the           ####
#### LangChain of package to extract             ####
#### the transcript from the YouTube videos &    ####
#### then answer the questions based on the      ####
#### topics selected by the users.               ####
####                                             ####
#####################################################

from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)

from googleapiclient.discovery import build

import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf

import os

from flask import jsonify
import requests

###############################################
###           Global Section                ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)

YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###    End of Global Section                ###
###############################################

class clsContentScrapper:
    def __init__(self):
        self.model_name = cf.conf['MODEL_NAME']
        self.temp_val = cf.conf['TEMP_VAL']
        self.max_cnt = int(cf.conf['MAX_CNT'])
        self.url = cf.conf['BASE_URL']
        self.header_token = cf.conf['HEADER_TOKEN']

    def extractCatalog(self):
        try:
            base_url = self.url
            header_token = self.header_token

            url = base_url + '/departments'

            print('Full URL: ', str(url))

            payload={}
            headers = {'Cookie': header_token}

            response = requests.request("GET", url, headers=headers, data=payload)

            x = response.text

            return x
        except Exception as e:
            discussedTopic = []
            x = str(e)
            print('Error: ', x)

            return x

Let us understand the the core part that require from this class.

Function – extractCatalog():

The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

  • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai

from clsConfigClient import clsConfigClient as cf
import clsL as log

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

import os
import re
###############################################
###           Global Section                ###
###############################################
Ind = cf.conf['DEBUG_IND']
queryModel = cf.conf['QUERY_MODEL']
passageModel = cf.conf['PASSAGE_MODEL']

#Initiating Logging Instances
clog = log.clsL()

os.environ["TOKENIZERS_PARALLELISM"] = "false"

vectorDBFileName = cf.conf['VECTORDB_FILE_NM']

indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

print('File: ', str(indexFile))
print('Config: ', str(indexConfig))

# Also, provide `config_path` parameter if you set it when calling the `save()` method:
new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)

# Initialize Retriever
retriever = DensePassageRetriever(document_store=new_document_store,
                                  query_embedding_model=queryModel,
                                  passage_embedding_model=passageModel,
                                  use_gpu=False)


###############################################
###    End of Global Section                ###
###############################################

class clsRAGOpenAI:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.fileName = cf.conf['FILE_NAME']
        self.Ind = cf.conf['DEBUG_IND']
        self.subdir = str(cf.conf['OUT_DIR'])
        self.base_url = cf.conf['BASE_URL']
        self.outputPath = cf.conf['OUTPUT_PATH']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.openAIKey = cf.conf['OPEN_AI_KEY']
        self.temp = cf.conf['TEMP_VAL']
        self.modelName = cf.conf['MODEL_NAME']
        self.maxToken = cf.conf['MAX_TOKEN']

    def extractHash(self, text):
        try:
            # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
            pattern = r"Ref: \{'(\d+)'\}"
            match = re.search(pattern, text)

            if match:
                return match.group(1)
            else:
                return None
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return None

    def removeSentencesWithNaN(self, text):
        try:
            # Split text into sentences using regular expression
            sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
            # Filter out sentences containing 'nan'
            filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
            # Rejoin the sentences
            return ' '.join(filteredSentences)
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return ''

    def retrieveDocumentsReader(self, question, top_k=9):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrieved_docs, question):
        try:
            openai.api_key = self.openAIKey
            temp = self.temp
            modelName = self.modelName
            maxToken = self.maxToken

            documentsText = " ".join([doc.content for doc in retrieved_docs])

            filteredDocs = self.removeSentencesWithNaN(documentsText)
            hashValue = self.extractHash(filteredDocs)

            print('RAG Docs:: ')
            print(filteredDocs)
            #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"

            # Set up a chat-style prompt with your data
            messages = [
                {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                {"role": "user", "content": filteredDocs}
            ]

            # Chat style invoking the latest model
            response = openai.ChatCompletion.create(
                model=modelName,
                messages=messages,
                temperature = temp,
                max_tokens=maxToken
            )
            return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
        except Exception as e:
            x = str(e)
            print('failed to get from OpenAI: ', x)
            return 'Not Available!'

    def ragAnswerWithHaystackAndGPT3(self, question):
        retrievedDocs = self.retrieveDocumentsReader(question)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def getData(self, strVal):
        try:
            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            print('Response from New Docs: ')
            print()

            hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)

            print('GPT3 Answer::')
            print(answer)
            print('Hash Value:')
            print(str(hashValue))

            print('*'*240)
            print('End Of Use RAG to Generate Answers:')
            print('*'*240)

            return hashValue, answer
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            answer = x
            hashValue = 1

            return hashValue, answer

Let us understand some of the important block –

Function – ragAnswerWithHaystackAndGPT3():

The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

Function – generateAnswerWithGPT3():

The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

Function – retrieveDocumentsReader():

The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

  • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
// App.js
import React, { useState } from 'react';
import axios from 'axios';
import './App.css';

const App = () => {
  const [isLoggedIn, setIsLoggedIn] = useState(false);
  const [username, setUsername] = useState('');
  const [password, setPassword] = useState('');
  const [message, setMessage] = useState('');
  const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);

  const handleLogin = async (e) => {
    e.preventDefault();
    try {
      const response = await axios.post('http://localhost:5000/login', { username, password });
      if (response.status === 200) {
        setIsLoggedIn(true);
      }
    } catch (error) {
      console.error('Login error:', error);
    }
  };

  const sendMessage = async (username) => {
    if (message.trim() === '') return;

    // Create a new chat entry
    const newChatEntry = {
      sender: 'user',
      message: message.trim(),
    };

    // Clear the input field
    setMessage('');

    try {
      // Make API request to Python-based API
      const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
      const responseData = response.data;

      // Print the response to the console for debugging
      console.log('API Response:', responseData);

      // Parse the nested JSON from the 'message' attribute
      const jsonData = JSON.parse(responseData.message);

      // Check if the data contains 'departments'
      if (jsonData.departments) {

        // Extract the 'departments' attribute from the parsed data
        const departments = jsonData.departments;

        // Extract the department names and create a single string with line breaks
        const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
      }
      else if (jsonData.records)
      {
        // Data structure 2: Artwork information
        const records = jsonData.records;

        // Prepare chat entries
        const chatEntries = [];

        // Iterate through records and extract text, image, and wiki information
        records.forEach((record) => {
          const textInfo = Object.entries(record).map(([key, value]) => {
            if (key !== 'Image' && key !== 'Wiki') {
              return `${key}: ${value}`;
            }
            return null;
          }).filter((info) => info !== null).join('\n');

          const imageLink = record.Image;
          //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
          //const wikiLinks = record.Wiki;
          const wikiLinks = record.Wiki.split(',').map(link => link.trim());

          console.log('Wiki:', wikiLinks);

          // Check if there is a valid image link
          const hasValidImage = imageLink && imageLink !== '[]';

          const imageElement = hasValidImage ? (
            <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
          ) : null;

          // Create JSX elements for rendering the wiki links (if available)
          const wikiElements = wikiLinks.map((link, index) => (
            <div key={index}>
              <a href={link} target="_blank" rel="noopener noreferrer">
                Wiki Link {index + 1}
              </a>
            </div>
          ));

          if (textInfo) {
            chatEntries.push({ sender: 'bot', message: textInfo });
          }

          if (imageElement) {
            chatEntries.push({ sender: 'bot', message: imageElement });
          }

          if (wikiElements.length > 0) {
            chatEntries.push({ sender: 'bot', message: wikiElements });
          }
        });

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
      }

    } catch (error) {
      console.error('Error sending message:', error);
    }
  };

  if (!isLoggedIn) {
    return (
      <div className="login-container">
        <h2>Welcome to the MuBot</h2>
        <form onSubmit={handleLogin} className="login-form">
          <input
            type="text"
            placeholder="Enter your name"
            value={username}
            onChange={(e) => setUsername(e.target.value)}
            required
          />
          <input
            type="password"
            placeholder="Enter your password"
            value={password}
            onChange={(e) => setPassword(e.target.value)}
            required
          />
          <button type="submit">Login</button>
        </form>
      </div>
    );
  }

  return (
    <div className="chat-container">
      <div className="chat-header">
        <h2>Hello, {username}</h2>
        <h3>Chat with MuBot</h3>
      </div>
      <div className="chat-log">
        {chatLog.map((chatEntry, index) => (
          <div
            key={index}
            className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
          >
            <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
            <p className="chat-message">{chatEntry.message}</p>
          </div>
        ))}
      </div>
      <div className="chat-input">
        <input
          type="text"
          placeholder="Type your message..."
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter') {
              sendMessage();
            }
          }}
        />
        <button onClick={sendMessage}>Send</button>
      </div>
    </div>
  );
};

export default App;

Please find some of the important logic –

Function – handleLogin():

The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

Function – sendMessage():

The sendMessage asynchronous function is designed to handle the user’s chat interaction:

  1. If the message is empty (after trimming spaces), the function exits without further action.
  2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
  3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
  4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
  5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
  6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
  7. Errors, if encountered, are logged to the console.

This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Python-based dash framework visualizing real-time covid-19 trend.

Hi Team,

We’ll enhance our last post on Covid-19 prediction & try to capture them in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes. In short, this is genuinely a real-time visual dashboard displaying all the graphs, trends depending upon the third-party API source data change.

However, I would like to share the run before we dig deep into this.

Demo Run

Architecture:

Let us understand the architecture for this solution –

Streaming Architecture

From the above diagram, one can see that we’re maintaining a similar approach compared to our last initiative. However, we’ve used a different framework to display the data live.

To achieve this, we’ve used a compelling python-based framework called Dash. Other than that, we’ve used Ably, Plotly & Prophet API.

If you need to know more about our last post, please visit this link.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installing Packages

Step – 2:

Installing Packages – Continue

Step – 3:

Installing Packages – Continue

Step – 4:

Installing Packages – Final

And, here is the command to install those packages –

pip install pandas
pip install plotly
pip install prophet
pip install dash
pip install pandas
pip install ably

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py ( This native Python script contains the configuration entries. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 09-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'XXX2LL.93kdkiU2:Kdsldoeie737484E',
"URL":"https://corona-api.com/countries/&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec": 10,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID & FinData.

2. clsPublishStream.py ( This script will publish the data transformed for Covid-19 predictions from the third-party sources. )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.fnc = cf.conf['FNC']
def pushEvents(self, srcDF, debugInd, varVa, flg):
try:
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
{'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Capturing the inbound dataframe
iDF = srcDF
# Adding new selected points
covid_dict = iDF.to_dict('records')
jdata_fin = json.dumps(covid_dict)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’ve already discussed this script. The only new line that appears here is –

json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
            {'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]

This statement is more like a dummy feed, which creates the basic structure of your graph.

3. clsStreamConsume.py ( This script will consume the stream from Ably Queue configuration entries. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### callPredictCovidAnalysisRealtime.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print(x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’ve already discussed this script in one of my earlier posts, which you will get here.

So, I’m not going to discuss all the steps in detail.

The only added part was to introduce some temporary local caching mechanism.

if df_conv.empty:
    df_conv = p.read_csv(fileName, index = True)
else:
    l.logr(fileName, debug_ind, df_conv, 'log')

4. callPredictCovidAnalysisRealtime.py ( Main calling script to fetch the COVID-19 data from the third-party source & then publish it to the Ably message queue after transforming the data & adding the prediction using Facebook’s prophet API. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling multiple API's ####
#### that including Prophet-API developed ####
#### by Facebook for future prediction of ####
#### Covid-19 situations in upcoming days ####
#### for world's major hotspots. ####
##############################################
import json
import clsCovidAPI as ca
from clsConfig import clsConfig as cf
import datetime
import logging
import clsL as cl
import math as m
import clsPublishStream as cps
import clsForecast as f
from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly
import matplotlib.pyplot as plt
import pandas as p
import datetime as dt
import time
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Initiating Log class
l = cl.clsL()
# Helper Function that removes underscores
def countryDet(inputCD):
try:
countryCD = inputCD
if str(countryCD) == 'DE':
cntCD = 'Germany'
elif str(countryCD) == 'BR':
cntCD = 'Brazil'
elif str(countryCD) == 'GB':
cntCD = 'UnitedKingdom'
elif str(countryCD) == 'US':
cntCD = 'UnitedStates'
elif str(countryCD) == 'IN':
cntCD = 'India'
elif str(countryCD) == 'CA':
cntCD = 'Canada'
elif str(countryCD) == 'ID':
cntCD = 'Indonesia'
else:
cntCD = 'N/A'
return cntCD
except:
cntCD = 'N/A'
return cntCD
def lookupCountry(row):
try:
strCD = str(row['CountryCode'])
retVal = countryDet(strCD)
return retVal
except:
retVal = 'N/A'
return retVal
def adjustTrend(row):
try:
flTrend = float(row['trend'])
flTrendUpr = float(row['trend_upper'])
flTrendLwr = float(row['trend_lower'])
retVal = m.trunc((flTrend + flTrendUpr + flTrendLwr)/3)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def ceilTrend(row, colName):
try:
flTrend = str(row[colName])
if flTrend.find('.'):
if float(flTrend) > 0:
retVal = m.trunc(float(flTrend)) + 1
else:
retVal = m.trunc(float(flTrend))
else:
retVal = float(flTrend)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def plot_picture(inputDF, debug_ind, var, countryCD, stat):
try:
iDF = inputDF
# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]
# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
#m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.04525)
#m = Prophet(n_changepoints=['2021-09-10'])
m.fit(iDF)
forecastDF = m.make_future_dataframe(periods=365)
forecastDF = m.predict(forecastDF)
l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
df_M = forecastDF[['ds', 'trend', 'trend_lower', 'trend_upper']]
l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
# Getting Full Country Name
cntCD = countryDet(countryCD)
# Draw forecast results
df_M['Country'] = cntCD
l.logr('17.df_M_C_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
df_M['AdjustTrend'] = df_M.apply(lambda row: adjustTrend(row), axis=1)
l.logr('20.df_M_AdjustTrend_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
return df_M
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def countrySpecificDF(counryDF, val):
try:
countryName = val
df = counryDF
df_lkpFile = df[(df['CountryCode'] == val)]
return df_lkpFile
except:
df = p.DataFrame()
return df
def toNum(row, colName):
try:
flTrend = str(row[colName])
flTr, subpart = flTrend.split(' ')
retVal = int(flTr.replace('-',''))
return retVal
except:
retVal = 0
return retVal
def extractPredictedDF(OrigDF, MergePredictedDF, colName):
try:
iDF_1 = OrigDF
iDF_2 = MergePredictedDF
dt_format = '%Y-%m-%d'
iDF_1_max_group = iDF_1.groupby(["Country"] , as_index=False)["ReportedDate"].max()
iDF_2['ReportedDate'] = iDF_2.apply(lambda row: toNum(row, 'ds'), axis=1)
col_one_list = iDF_1_max_group['Country'].tolist()
col_two_list = iDF_1_max_group['ReportedDate'].tolist()
print('col_one_list: ', str(col_one_list))
print('col_two_list: ', str(col_two_list))
cnt_1_x = 1
cnt_1_y = 1
cnt_x = 0
df_M = p.DataFrame()
for i in col_one_list:
str_countryVal = str(i)
cnt_1_y = 1
for j in col_two_list:
intReportDate = int(str(j).strip().replace('-',''))
if cnt_1_x == cnt_1_y:
print('str_countryVal: ', str(str_countryVal))
print('intReportDate: ', str(intReportDate))
iDF_2_M = iDF_2[(iDF_2['Country'] == str_countryVal) & (iDF_2['ReportedDate'] > intReportDate)]
# Merging with the previous Country Code data
if cnt_x == 0:
df_M = iDF_2_M
else:
d_frames = [df_M, iDF_2_M]
df_M = p.concat(d_frames)
cnt_x += 1
cnt_1_y += 1
cnt_1_x += 1
df_M.drop(columns=['ReportedDate'], axis=1, inplace=True)
df_M.rename(columns={'ds':'ReportedDate'}, inplace=True)
df_M.rename(columns={'AdjustTrend':colName}, inplace=True)
return df_M
except:
df = p.DataFrame()
return df
def toPivot(inDF, colName):
try:
iDF = inDF
iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
iDF_Piv.reset_index( drop=False, inplace=True )
list1 = ['ReportedDate']
iDF_Arr = iDF['Country'].unique()
list2 = iDF_Arr.tolist()
listV = list1 + list2
iDF_Piv.reindex([listV], axis=1)
return iDF_Piv
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def toAgg(inDF, var, debugInd, flg):
try:
iDF = inDF
colName = "ReportedDate"
list1 = list(iDF.columns.values)
list1.remove(colName)
list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]
iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
iDF.drop(columns=[colName], axis=1, inplace=True)
ColNameGrp = "Year_Mon"
print('List1 Aggregate:: ', str(list1))
print('ColNameGrp :: ', str(ColNameGrp))
iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
iDF_T.fillna(0, inplace = True)
print('iDF_T:: ')
print(iDF_T)
iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
iDF_1_max_group['Status'] = flg
return iDF_1_max_group
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def publishEvents(inDF1, inDF2, inDF3, inDF4, var, debugInd):
try:
# Original Covid Data from API
iDF1 = inDF1
iDF2 = inDF2
NC = 'NewConfirmed'
ND = 'NewDeaths'
iDF1_PV = toPivot(iDF1, NC)
iDF1_PV['ReportedDate'] = p.to_datetime(iDF1_PV['ReportedDate'])
l.logr('57.iDF1_PV_' + var + '.csv', debugInd, iDF1_PV, 'log')
iDF2_PV = toPivot(iDF2, ND)
iDF2_PV['ReportedDate'] = p.to_datetime(iDF2_PV['ReportedDate'])
l.logr('58.iDF2_PV_' + var + '.csv', debugInd, iDF2_PV, 'log')
# Predicted Covid Data from Facebook API
iDF3 = inDF3
iDF4 = inDF4
iDF3_PV = toPivot(iDF3, NC)
l.logr('59.iDF3_PV_' + var + '.csv', debugInd, iDF3_PV, 'log')
iDF4_PV = toPivot(iDF4, ND)
l.logr('60.iDF4_PV_' + var + '.csv', debugInd, iDF4_PV, 'log')
# Now aggregating data based on year-month only
iDF1_Agg = toAgg(iDF1_PV, var, debugInd, NC)
l.logr('61.iDF1_Agg_' + var + '.csv', debugInd, iDF1_Agg, 'log')
iDF2_Agg = toAgg(iDF2_PV, var, debugInd, ND)
l.logr('62.iDF2_Agg_' + var + '.csv', debugInd, iDF2_Agg, 'log')
iDF3_Agg = toAgg(iDF3_PV, var, debugInd, NC)
l.logr('63.iDF3_Agg_' + var + '.csv', debugInd, iDF3_Agg, 'log')
iDF4_Agg = toAgg(iDF4_PV, var, debugInd, ND)
l.logr('64.iDF4_Agg_' + var + '.csv', debugInd, iDF4_Agg, 'log')
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)
if retVal_1 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
# Pushing both the Historical Death Cases
retVal_3 = x1.pushEvents(iDF2_Agg, debugInd, var, ND)
if retVal_3 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
time.sleep(5)
# Pushing both the New Confirmed Cases
retVal_2 = x1.pushEvents(iDF3_Agg, debugInd, var, NC)
if retVal_2 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
# Pushing both the New Death Cases
retVal_4 = x1.pushEvents(iDF4_Agg, debugInd, var, ND)
if retVal_4 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def main():
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
NC = 'New Confirmed'
ND = 'New Dead'
SM = 'data process Successful!'
FM = 'data process Failure!'
print("Calling the custom Package for large file splitting..")
print('Start Time: ' + str(var1))
countryList = str(cf.conf['coList']).split(',')
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'CovidAPI.log', level=logging.INFO)
# Create the instance of the Covid API Class
x1 = ca.clsCovidAPI()
# Let's pass this to our map section
retDF = x1.searchQry(var1, DInd)
retVal = int(retDF.shape[0])
if retVal > 0:
print('Successfully Covid Data Extracted from the API-source.')
else:
print('Something wrong with your API-source!')
# Extracting Skeleton Data
df = retDF[['data.code', 'date', 'deaths', 'confirmed', 'recovered', 'new_confirmed', 'new_recovered', 'new_deaths', 'active']]
df.columns = ['CountryCode', 'ReportedDate', 'TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases']
df.dropna()
print('Returned Skeleton Data Frame: ')
print(df)
l.logr('5.df_' + var1 + '.csv', DInd, df, 'log')
# Due to source data issue, application will perform of
# avg of counts based on dates due to multiple entries
g_df = df.groupby(["CountryCode", "ReportedDate"] , as_index=False)["TotalReportedDead","TotalConfirmedCase","TotalRecovered","NewConfirmed","NewRecovered","NewDeaths","ActiveCaases"].mean()
g_df['TotalReportedDead_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalReportedDead'), axis=1)
g_df['TotalConfirmedCase_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalConfirmedCase'), axis=1)
g_df['TotalRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalRecovered'), axis=1)
g_df['NewConfirmed_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewConfirmed'), axis=1)
g_df['NewRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewRecovered'), axis=1)
g_df['NewDeaths_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewDeaths'), axis=1)
g_df['ActiveCaases_M'] = g_df.apply(lambda row: ceilTrend(row, 'ActiveCaases'), axis=1)
# Dropping old columns
g_df.drop(columns=['TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases'], axis=1, inplace=True)
# Renaming the new columns to old columns
g_df.rename(columns={'TotalReportedDead_M':'TotalReportedDead'}, inplace=True)
g_df.rename(columns={'TotalConfirmedCase_M':'TotalConfirmedCase'}, inplace=True)
g_df.rename(columns={'TotalRecovered_M':'TotalRecovered'}, inplace=True)
g_df.rename(columns={'NewConfirmed_M':'NewConfirmed'}, inplace=True)
g_df.rename(columns={'NewRecovered_M':'NewRecovered'}, inplace=True)
g_df.rename(columns={'NewDeaths_M':'NewDeaths'}, inplace=True)
g_df.rename(columns={'ActiveCaases_M':'ActiveCaases'}, inplace=True)
l.logr('5.g_df_' + var1 + '.csv', DInd, g_df, 'log')
# Working with forecast
# Create the instance of the Forecast API Class
x2 = f.clsForecast()
# Fetching each country name & then get the details
cnt = 6
cnt_x = 0
cnt_y = 0
df_M_Confirmed = p.DataFrame()
df_M_Deaths = p.DataFrame()
for i in countryList:
try:
cntryIndiv = i.strip()
cntryFullName = countryDet(cntryIndiv)
print('Country Porcessing: ' + str(cntryFullName))
# Creating dataframe for each country
# Germany Main DataFrame
dfCountry = countrySpecificDF(g_df, cntryIndiv)
l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
# Let's pass this to our map section
retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
statVal = str(NC)
a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_x == 0:
df_M_Confirmed = a1
else:
d_frames = [df_M_Confirmed, a1]
df_M_Confirmed = p.concat(d_frames)
cnt_x += 1
retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
statVal = str(ND)
a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_y == 0:
df_M_Deaths = a2
else:
d_frames = [df_M_Deaths, a2]
df_M_Deaths = p.concat(d_frames)
cnt_y += 1
# Printing Proper message
if (a1 + a2) == 0:
oprMsg = cntryFullName + ' ' + SM
print(oprMsg)
else:
oprMsg = cntryFullName + ' ' + FM
print(oprMsg)
# Resetting the dataframe value for the next iteration
dfCountry = p.DataFrame()
cntryIndiv = ''
oprMsg = ''
cntryFullName = ''
a1 = 0
a2 = 0
statVal = ''
cnt += 1
except Exception as e:
x = str(e)
print(x)
l.logr('49.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('50.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Removing unwanted columns
df_M_Confirmed.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
df_M_Deaths.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
l.logr('51.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('52.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Creating original dataframe from the source API
df_M_Confirmed_Orig = g_df[['CountryCode', 'ReportedDate','NewConfirmed']]
df_M_Deaths_Orig = g_df[['CountryCode', 'ReportedDate','NewDeaths']]
# Transforming Country Code
df_M_Confirmed_Orig['Country'] = df_M_Confirmed_Orig.apply(lambda row: lookupCountry(row), axis=1)
df_M_Deaths_Orig['Country'] = df_M_Deaths_Orig.apply(lambda row: lookupCountry(row), axis=1)
# Dropping unwanted column
df_M_Confirmed_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
df_M_Deaths_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
# Reordering columns
df_M_Confirmed_Orig = df_M_Confirmed_Orig.reindex(['ReportedDate','Country','NewConfirmed'], axis=1)
df_M_Deaths_Orig = df_M_Deaths_Orig.reindex(['ReportedDate','Country','NewDeaths'], axis=1)
l.logr('53.df_M_Confirmed_Orig_' + var1 + '.csv', DInd, df_M_Confirmed_Orig, 'log')
l.logr('54.df_M_Deaths_Orig_' + var1 + '.csv', DInd, df_M_Deaths_Orig, 'log')
# Filter out only the predicted data
filterDF_1 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewConfirmed')
l.logr('55.filterDF_1_' + var1 + '.csv', DInd, filterDF_1, 'log')
filterDF_2 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewDeaths')
l.logr('56.filterDF_2_' + var1 + '.csv', DInd, filterDF_2, 'log')
# Calling the final publish events
retVa = publishEvents(df_M_Confirmed_Orig, df_M_Deaths_Orig, filterDF_1, filterDF_2, var1, DInd)
if retVa == 0:
print('Successfully stream processed!')
else:
print('Failed to process stream!')
var2 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var2))
print('*' *60)
except Exception as e:
x = str(e)
print(x)
if __name__ == "__main__":
main()

Let us understand the enhancement part of this script –

We’ve taken out the plotly part as we will use a separate dashboard script to visualize the data trend.

However, we need to understand the initial consumed data from API & how we transform the data, which will be helpful for visualization.

The initial captured data should look like this after extracting only the relevant elements from the API response.

Initial Data from API

As you can see that based on the country & reported date, our application is consuming attributes like Total-Reported-Death, Total-Recovered, New-death, New-Confirmed & so on.

From this list, we’ve taken two attributes for our use cases & they are New-Death & New-Confirmed. Also, we’re predicting the Future-New-Death & Future-New-Confirmed based on the historical data using Facebook’s prophet API.

And, we would be transposing them & extract the countries & put them as columns for better representations.

Transposed Data

Hence, here is the code that we should be exploring –

def toPivot(inDF, colName):
    try:
        iDF = inDF

        iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
        iDF_Piv.reset_index( drop=False, inplace=True )

        list1 = ['ReportedDate']

        iDF_Arr = iDF['Country'].unique()
        list2 = iDF_Arr.tolist()

        listV = list1 + list2

        iDF_Piv.reindex([listV], axis=1)

        return iDF_Piv
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

Now, using the pivot_table function, we’re transposing the row values into the columns. And, later we’ve realigned the column heading as per our desired format.

However, we still have the data as per individual daily dates in this case. We want to eliminate that by removing the daypart & then aggregate them by month as shown below –

Aggregated Data

And, here is the code for that –

def toAgg(inDF, var, debugInd, flg):
    try:
        iDF = inDF
        colName = "ReportedDate"

        list1 = list(iDF.columns.values)
        list1.remove(colName)

        list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]

        iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
        iDF.drop(columns=[colName], axis=1, inplace=True)

        ColNameGrp = "Year_Mon"
        print('List1 Aggregate:: ', str(list1))
        print('ColNameGrp :: ', str(ColNameGrp))

        iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
        iDF_T.fillna(0, inplace = True)
        print('iDF_T:: ')
        print(iDF_T)

        iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
        iDF_1_max_group['Status'] = flg

        return iDF_1_max_group
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

From the above snippet we can conclude that the application is taking out the daypart & then aggregate it based on the Year_Mon attribute.

The following snippet will push the final transformed data to Ably queue –

x1 = cps.clsPublishStream()

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)

if retVal_1 == 0:
    print('Successfully historical event pushed!')
else:
    print('Failed to push historical events!')

5. dashboard_realtime.py ( Main calling script to consume the data from Ably queue & then visualize the trend. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 08-Sep-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: This is the main script ####
#### to invoke dashboard after consuming ####
#### streaming real-time predicted data ####
#### using Facebook API & Ably message Q. ####
#### ####
#### This script will show the trend ####
#### comparison between major democracies ####
#### of the world. ####
#### ####
##############################################
import datetime
import dash
from dash import dcc
from dash import html
import plotly
from dash.dependencies import Input, Output
from ably import AblyRest
from clsConfig import clsConfig as cf
import pandas as p
# Main Class to consume streaming
import clsStreamConsume as ca
import numpy as np
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css&#39;]
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H1("Covid-19 Trend Dashboard",
className='text-center text-primary mb-4'),
html.H5(children='''
Dash: Covid-19 Trend – (Present Vs Future)
'''),
html.P("Covid-19: New Confirmed Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-1'),
html.P("Covid-19: New Death Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-2'),
dcc.Interval(
id='interval-component',
interval=5*1000, # in milliseconds
n_intervals=0
)
], className="row", style={'marginBottom': 10, 'marginTop': 10})
)
def to_OptimizeString(row):
try:
x_str = str(row['Year_Mon'])
dt_format = '%Y%m%d'
finStr = x_str + '01'
strReportDate = datetime.datetime.strptime(finStr, dt_format)
return strReportDate
except Exception as e:
x = str(e)
print(x)
dt_format = '%Y%m%d'
var = '20990101'
strReportDate = datetime.strptime(var, dt_format)
return strReportDate
def fetchEvent(var1, DInd):
try:
# Let's pass this to our map section
iDF_M = x1.conStream(var1, DInd)
# Converting Year_Mon to dates
iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)
# Dropping old columns
iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)
#Renaming new column to old column
iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)
return iDF_M
except Exception as e:
x = str(e)
print(x)
iDF_M = p.DataFrame()
return iDF_M
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-1', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]
# Adding different chart into one dashboard
# First Use Case – New Confirmed
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-2', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFND = retDF.loc[(retDF['Status'] == 'NewDeaths')]
# Adding different chart into one dashboard
# Second Use Case – New Confirmed
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Brazil,'type':'bar','name':'Brazil'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Canada,'type':'bar','name':'Canada'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Germany,'type':'bar','name':'Germany'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.India,'type':'bar','name':'India'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Indonesia,'type':'bar','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedKingdom,'type':'bar','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedStates,'type':'bar','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
if __name__ == '__main__':
app.run_server(debug=True)

Let us explore the critical snippet as this is a brand new script –

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)

app.layout = html.Div(
    html.Div([
        html.H1("Covid-19 Trend Dashboard",
                        className='text-center text-primary mb-4'),
        html.H5(children='''
            Dash: Covid-19 Trend - (Present Vs Future)
        '''),
        html.P("Covid-19: New Confirmed Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-1'),
        html.P("Covid-19: New Death Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-2'),
        dcc.Interval(
            id='interval-component',
            interval=5*1000, # in milliseconds
            n_intervals=0
        )
    ], className="row", style={'marginBottom': 10, 'marginTop': 10})
)

You need to understand the basics of HTML as this framework works seamlessly with it. To know more about the supported HTML, one needs to visit the following link.

def to_OptimizeString(row):
    try:
        x_str = str(row['Year_Mon'])

        dt_format = '%Y%m%d'
        finStr = x_str + '01'

        strReportDate = datetime.datetime.strptime(finStr, dt_format)

        return strReportDate

    except Exception as e:
        x = str(e)
        print(x)

        dt_format = '%Y%m%d'
        var = '20990101'

        strReportDate = datetime.strptime(var, dt_format)

        return strReportDate

The application is converting Year-Month combinations from string to date for better projection.

Also, we’ve implemented a dashboard that will refresh every five milliseconds.

def fetchEvent(var1, DInd):
    try:
        # Let's pass this to our map section
        iDF_M = x1.conStream(var1, DInd)

        # Converting Year_Mon to dates
        iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)

        # Dropping old columns
        iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)

        #Renaming new column to old column
        iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)

        return iDF_M

    except Exception as e:
        x = str(e)
        print(x)

        iDF_M = p.DataFrame()

        return iDF_M

The application will consume all the events from the Ably Queue using the above snippet.

@app.callback(Output('live-update-graph-1', 'figure'),
              Input('interval-component', 'n_intervals'))
def update_graph_live(n):

We’ve implemented the callback mechanism to get the latest data from the Queue & then update the graph accordingly & finally share the updated chart & return that to our method, which is calling it.

# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]

Based on the flag, we’re pushing the data into our target dataframe, from where the application will consume the data into the charts.

fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)

Different country’s KPI elements are fetched & mapped into their corresponding axis to project the graph with visual details.

Same approach goes for the other graph as well.


Run:

Let us run the application –

Run – Beginning
Run – Finishing Stage

Dashboard:

Dashboard Job Run
Dashboard Visualization

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Analyzing Language using IBM Watson using Python

Hi Guys,

Today, I’ll be discussing the following topic – “How to analyze text using IBM Watson implementing through Python.”

IBM has significantly improved in the field of Visual Image Analysis or Text language analysis using its IBM Watson cloud platform. In this particular topic, we’ll be exploring the natural languages only.

To access IBM API, we need to first create an IBM Cloud account from this site.

Let us quickly go through the steps to create the IBM Language Understanding service. Click the Catalog on top of your browser menu as shown in the below picture –

6. Creating an Instance for Watson

After that, click the AI option on your left-hand side of the panel marked in RED.

Click the Watson-Studio & later choose the plan. In our case, We’ll select the “Lite” option as IBM provided this platform for all the developers to explore their cloud for free.

7. Choosing AI
8. Choosing Plan

Clicking the create option will lead to a blank page of Watson Studio as shown below –

9. Choosing Watson Studio

And, now, we need to click the Get Started button to launch it. This will lead to Create Project page, which can be done using the following steps –

10. Create Project Initial Screen

Now, clicking the create a project will lead you to the next screen –

11. Create Project - Continue

You can choose either an empty project, or you can create it from a sample file. In this case, we’ll be selecting the first option & this will lead us to the below page –

12. Creating a Project

And, then you will click the “Create” option, which will lead you to the next screen –

13. Adding to project

Now, you need to click “Add to Project.” This will give you a variety of services that you want to explore/use from the list. If you want to create your own natural language classifier, which you can do that as follows –

14. Adding Natural Language Components from IBM Cloud

Once, you click it – you need to select the associate service –

15. Adding Associte Service - Sound

Here, you need to click the hyperlink, which prompts to the next screen –

16. Choosing Associate Service - Sound

You need to check the price for both the Visual & Natural Language Classifier. They are pretty expensive. The visual classifier has the Lite plan. However, it has limitations of output.

Clicking the “Create” will prompt to the next screen –

18. Selecting Region - Sound

After successful creation, you will be redirected to the following page –

19. Landing Page - Sound

Now, We’ll be adding our “Natural Language Understand” for our test –

29. Choosing Natural Language Understanding

This will prompt the next screen –

7. Choosing AI - Natural Language Understanding

Once, it is successful. You will see the service registered as shown below –

3. Watson Services - Sound

If you click the service marked in RED, it will lead you to another page, where you will get the API Key & Url. You need both of this information in Python application to access this API as shown below –

4. Watson API Details - Sound

Now, we’re ready with the necessary cloud set-up. After this, we need to install the Python package for IBM Cloud as shown below –

1. Installing_Packages

We’ve noticed that, recently, IBM has launched one upgraded package. Hence, we installed that one as well. I would recommend you to install this second package directly instead of the first one shown above –

2. Installing Latest IBM_Watson Package

Now, we’re done with our set-up.

Let’s see the directory structure –

31. Directory Structure

We’ll be discussing only the main calling script & class script. However, we’ll be posting the parameters without discussing it. And, we won’t discuss clsL.py as we’ve already discussed that in our previous post.

1. clsConfig.py (This script contains all the parameter details.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### IBM Cloud API.   Application will    ####
#### process these information & perform  ####
#### various analysis on IBM Watson cloud.####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'SERVICE_URL': "https://api.eu-gb.natural-language-understanding.watson.cloud.ibm.com/instances/xxxxxxxxxxxxxxXXXXXXXXXXxxxxxxxxxxxxxxxx",
        'API_KEY': "Xxxxxxxxxxxxxkdkdfifd984djddkkdkdkdsSSdkdkdd",
        'API_TYPE': "application/json",
        'CACHE': "no-cache",
        'CON': "keep-alive",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'SRC_PATH': Curr_Path + sep + 'Src_File' + sep,
        'APP_DESC_1': 'IBM Watson Language Understand!',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

Note that you will be placing your API_KEY & URL here, as shown in the configuration file.

2. clsIBMWatson.py (This is the main script, which will invoke the IBM Watson API based on the input from the user & return 0 if successful.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
#### Modified On 04-Apr-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### IBM Watson Language Understand API.  ####
##############################################

import logging
from clsConfig import clsConfig as cf
import clsL as cl
import json
from ibm_watson import NaturalLanguageUnderstandingV1
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from ibm_watson.natural_language_understanding_v1 import Features, EntitiesOptions, KeywordsOptions, SentimentOptions, CategoriesOptions, ConceptsOptions
from ibm_watson import ApiException

class clsIBMWatson:
    def __init__(self):
        self.api_key =  cf.config['API_KEY']
        self.service_url = cf.config['SERVICE_URL']

    def calculateExpressionFromUrl(self, inputUrl, inputVersion):
        try:
            api_key = self.api_key
            service_url = self.service_url
            print('-' * 60)
            print('Beginning of the IBM Watson for Input Url.')
            print('-' * 60)

            authenticator = IAMAuthenticator(api_key)

            # Authentication via service credentials provided in our config files
            service = NaturalLanguageUnderstandingV1(version=inputVersion, authenticator=authenticator)
            service.set_service_url(service_url)

            response = service.analyze(
                url=inputUrl,
                features=Features(entities=EntitiesOptions(),
                                  sentiment=SentimentOptions(),
                                  concepts=ConceptsOptions())).get_result()

            print(json.dumps(response, indent=2))

            return 0

        except ApiException as ex:
            print('-' * 60)
            print("Method failed for Url with status code " + str(ex.code) + ": " + ex.message)
            print('-' * 60)

            return 1

    def calculateExpressionFromText(self, inputText, inputVersion):
        try:
            api_key = self.api_key
            service_url = self.service_url
            print('-' * 60)
            print('Beginning of the IBM Watson for Input Url.')
            print('-' * 60)

            authenticator = IAMAuthenticator(api_key)

            # Authentication via service credentials provided in our config files
            service = NaturalLanguageUnderstandingV1(version=inputVersion, authenticator=authenticator)
            service.set_service_url(service_url)

            response = service.analyze(
                text=inputText,
                features=Features(entities=EntitiesOptions(),
                                  sentiment=SentimentOptions(),
                                  concepts=ConceptsOptions())).get_result()

            print(json.dumps(response, indent=2))

            return 0

        except ApiException as ex:
            print('-' * 60)
            print("Method failed for Url with status code " + str(ex.code) + ": " + ex.message)
            print('-' * 60)

            return 1

Some of the key lines from the above snippet –

authenticator = IAMAuthenticator(api_key)

# Authentication via service credentials provided in our config files
service = NaturalLanguageUnderstandingV1(version=inputVersion, authenticator=authenticator)
service.set_service_url(service_url)

By providing the API Key & Url, the application is initiating the service for Watson.

response = service.analyze(
    url=inputUrl,
    features=Features(entities=EntitiesOptions(),
                      sentiment=SentimentOptions(),
                      concepts=ConceptsOptions())).get_result()

Based on your type of input, it will bring the features of entities, sentiment & concepts here. Apart from that, you can additionally check the following features as well – Keywords & Categories.

3. callIBMWatsonAPI.py (This is the first calling script. Based on user choice, it will receive input either as Url or as the plain text & then analyze it.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
#### Modified On 04-Apr-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsIBMWatson as cw

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        ret_1 = 0
        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'IBMWatson_NaturalLanguageAnalysis.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']
        curr_ver =datetime.datetime.now().strftime("%Y-%m-%d")

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        print('Welcome to IBM Wantson Language Understanding Calling Program: ')
        print('-' * 60)
        print('Please Press 1 for Understand the language from Url.')
        print('Please Press 2 for Understand the language from your input-text.')
        input_choice = int(input('Please provide your choice:'))

        # Create the instance of the IBM Watson Class
        x2 = cw.clsIBMWatson()

        # Let's pass this to our map section
        if input_choice == 1:
            textUrl = str(input('Please provide the complete input url:'))
            ret_1 = x2.calculateExpressionFromUrl(textUrl, curr_ver)
        elif input_choice == 2:
            inputText = str(input('Please provide the input text:'))
            ret_1 = x2.calculateExpressionFromText(inputText, curr_ver)
        else:
            print('Invalid options!')

        if ret_1 == 0:
            print('Successful IBM Watson Language Understanding Generated!')
        else:
            print('Failed to generate IBM Watson Language Understanding!')

        print("-" * 60)
        print()

        print('Finding Analysis points..')
        print("*" * 157)
        logging.info('Finding Analysis points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("Invalid option!")
        logging.info("Invalid option!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

This script is pretty straight forward as it is first creating an instance of the main class & then based on the user input, it is calling the respective functions here.

As of now, IBM Watson can work on a list of languages, which are available here.

If you want to start from scratch, please refer to the following link.

Please find the screenshot of our application run –

Case 1 (With Url): 

21. Win_Run_1_Url
23. Win_Run_3_Url

Case 2 (With Plain text):

25. Win_Run_1_InputText
26. Win_Run_2_InputText
27. Win_Run_3_InputText

Now, Don’t forget to delete all the services from your IBM Cloud.

32. Delete Service

As you can see, from the service, you need to delete all the services one-by-one as shown in the figure.

So, we’ve done it.

To explore my photography, you can visit the following link.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Creating a Cross-platform GUI based application using native Python using PyQt5

Hi Guys!

Today, We’ll be discussing one more graphical package in Python, which is also known as PyQt. To faster design the GUI, we’ll be exploring another tool called Qt Designer, which is available for multiple OS platforms.

Please find the QT Designer here.

This is similar to any other GUI based IDE like Microsoft Visual Studio, where you can quickly generate your GUI template.

The majority of the internet post talks about using PyQt5 or PyQt4 packages. But, when speaking about using the .ui file inside your Python code – they either demonstrate fundamental options without any event or, they convert & generate the .ui file into .py file & then they use it. This certainly not making it very useful for many of the developers who are trying to use it for the first time. Hence, My main goal is to use the .ui file inside my Python script as it is & use all the components out of it & assign various working events.

In this post, we’ll discuss only with one script & then we’ll showcase the output in the form of video (No audio). You can verify the output for both MAC & Windows.

Before we start, let us check the directory structure between Windows & MAC –

2. MAC & Win Directory Structure

Let us explore how the GUI should look like ->

3. GUI Design

So, as you can see that this tool is like any other GUI based tool, basically you can create anything by simply drag & drop method.

Before we start discussing our code, here is the sample basicAdv.ui file for your reference.

You need to install the following framework –

pip install PyQt5

1. GUIPyQt5.py (This script contains all the GUI details & it will invoke the instance along with the logic.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Mar-2020              ####
#### Modified On 12-Mar-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from PyQt5 import QtWidgets, uic, QtGui, QtCore
from PyQt5.QtWidgets import *
import sys

class Ui(QtWidgets.QMainWindow):
    def __init__(self):
        # Instantiating the main class
        super(Ui, self).__init__()

        # Loading the Graphical Design without
        # converting it to any kind of Python code
        uic.loadUi('basicAdv.ui', self)

        # Adding all the essential buttons
        self.prtBtn = self.findChild(QtWidgets.QPushButton, 'prtBtn') # Find the button
        self.prtBtn.clicked.connect(self.printButtonClick) # Remember to pass the definition/method, not the return value!

        self.clrBtn = self.findChild(QtWidgets.QPushButton, 'clrBtn')  # Find the button
        self.clrBtn.clicked.connect(self.clearButtonClick)  # Remember to pass the definition/method, not the return value!

        self.addBtn = self.findChild(QtWidgets.QPushButton, 'addBtn')  # Find the button
        self.addBtn.clicked.connect(self.addItem)  # Remember to pass the definition/method, not the return value!

        self.selectImgBtn = self.findChild(QtWidgets.QPushButton, 'selectImgBtn')  # Find the button
        self.selectImgBtn.clicked.connect(self.setImage)  # Remember to pass the definition/method, not the return value!

        self.cnfBtn = self.findChild(QtWidgets.QPushButton, 'cnfBtn')  # Find the button
        self.cnfBtn.clicked.connect(self.showDialog)  # Remember to pass the definition/method, not the return value!

        # Adding other static input/output elements
        self.input = self.findChild(QtWidgets.QLineEdit, 'input')
        self.qlabel = self.findChild(QtWidgets.QLabel, 'qlabel')
        self.lineEdit = self.findChild(QtWidgets.QLineEdit, 'lineEdit')
        self.listWidget = self.findChild(QtWidgets.QListWidget, 'listWidget')
        self.imageLbl = self.findChild(QtWidgets.QLabel, 'imageLbl')

        # Adding Combobox
        self.combo = self.findChild(QtWidgets.QComboBox, 'sComboBox')  # Find the ComboBox

        # Adding static element to it
        self.combo.addItem("Sourav Ganguly")
        self.combo.addItem("Kapil Dev")
        self.combo.addItem("Sunil Gavaskar")
        self.combo.addItem("M. S. Dhoni")

        # Click Event
        self.combo.activated[str].connect(self.onChanged)  # Remember to pass the definition/method, not the return value!

        # Adding list Box
        self.listwidget2 = self.findChild(QtWidgets.QListWidget, 'listwidget2')  # Find the List

        # Adding static element to it
        self.listwidget2.insertItem(0, "Aamir Khan")
        self.listwidget2.insertItem(1, "Shahruk Khan")
        self.listwidget2.insertItem(2, "Salman Khan")
        self.listwidget2.insertItem(3, "Hrittik Roshon")
        self.listwidget2.insertItem(4, "Amitabh Bachhan")

        # Click Event
        self.listwidget2.clicked.connect(self.showIndividualElement)

        # Adding Group Box
        self.groupBox = self.findChild(QtWidgets.QGroupBox, 'groupBox')  # Find the ComboBox
        self.groupBox.setCheckable(True)

        # Adding Individual Radio Button
        self.rdButton1 = self.findChild(QtWidgets.QRadioButton, 'rdButton1')  # Find the button
        self.rdButton1.setChecked(True)
        self.rdButton1.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton1))  # Remember to pass the definition/method, not the return value!

        self.rdButton2 = self.findChild(QtWidgets.QRadioButton, 'rdButton2')  # Find the button
        self.rdButton2.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton2))  # Remember to pass the definition/method, not the return value!

        self.rdButton3 = self.findChild(QtWidgets.QRadioButton, 'rdButton3')  # Find the button
        self.rdButton3.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton3))  # Remember to pass the definition/method, not the return value!

        self.rdButton4 = self.findChild(QtWidgets.QRadioButton, 'rdButton4')  # Find the button
        self.rdButton4.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton4))  # Remember to pass the definition/method, not the return value!

        self.show()

    def printRadioButtonClick(self, radioOption):

        if radioOption.text() == 'China':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

        if radioOption.text() == 'India':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

        if radioOption.text() == 'Japan':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

        if radioOption.text() == 'France':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

    def printButtonClick(self):
        # This is executed when the button is pressed
        print('Input text:' + self.input.text())

    def clearButtonClick(self):
        # This is executed when the button is pressed
        self.input.clear()

    def onChanged(self, text):
        self.qlabel.setText(text)
        self.qlabel.adjustSize()
        self.lineEdit.clear()  # Clear the text

    def addItem(self):
        value = self.lineEdit.text() # Get the value of the lineEdit
        self.lineEdit.clear() # Clear the text
        self.listWidget.addItem(value) # Add the value we got to the list

    def setImage(self):
        fileName, _ = QtWidgets.QFileDialog.getOpenFileName(None, "Select Image", "", "Image Files (*.png *.jpg *jpeg *.bmp);;All Files (*)") # Ask for file
        if fileName: # If the user gives a file
            pixmap = QtGui.QPixmap(fileName) # Setup pixmap with the provided image
            pixmap = pixmap.scaled(self.imageLbl.width(), self.imageLbl.height(), QtCore.Qt.KeepAspectRatio) # Scale pixmap
            self.imageLbl.setPixmap(pixmap) # Set the pixmap onto the label
            self.imageLbl.setAlignment(QtCore.Qt.AlignCenter) # Align the label to center

    def showDialog(self):
        msgBox = QMessageBox()
        msgBox.setIcon(QMessageBox.Information)
        msgBox.setText("Message box pop up window")
        msgBox.setWindowTitle("MessageBox Example")
        msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
        msgBox.buttonClicked.connect(self.msgButtonClick)

        returnValue = msgBox.exec()
        if returnValue == QMessageBox.Ok:
            print('OK clicked')

    def msgButtonClick(self, i):
        print("Button clicked is:", i.text())

    def showIndividualElement(self, qmodelindex):
        item = self.listwidget2.currentItem()
        print(item.text())

if __name__ == "__main__":

    import sys
    app = QtWidgets.QApplication(sys.argv)
    window = Ui()
    window.show()
    sys.exit(app.exec_())

Let us explore a few key lines from this script. Rests are almost identical.

# Loading the Graphical Design without
# converting it to any kind of Python code
uic.loadUi('basicAdv.ui', self)

Loading the GUI created using Qt Designer into the Python environment.

# Adding all the essential buttons
self.prtBtn = self.findChild(QtWidgets.QPushButton, 'prtBtn') # Find the button
self.prtBtn.clicked.connect(self.printButtonClick) # Remember to pass the definition/method, not the return value!

In this case, we’re dynamically binding the component from the GUI by using the findChild method & then on the next line, we’re invoking the appropriate event associated with that. In this case, it is – self.printButtonClick.

The printButtonClick as mentioned earlier is a method & that contains the following snippet –

def printButtonClick(self):
    # This is executed when the button is pressed
    print('Input text:' + self.input.text())

As you can see, this event will capture the text from the input textbox & print it on our terminal.

Here is the snippet for those widgets, which is part of only input/output & they generally don’t have an event of their own. But, we need to bind them with our Python application.

# Adding other static input/output elements
self.input = self.findChild(QtWidgets.QLineEdit, 'input')
self.qlabel = self.findChild(QtWidgets.QLabel, 'qlabel')
self.lineEdit = self.findChild(QtWidgets.QLineEdit, 'lineEdit')
self.listWidget = self.findChild(QtWidgets.QListWidget, 'listWidget')

This application has drop-down list & hence, we’ve added some static value during our load of this application & that can be seen here –

# Adding list Box
self.listwidget2 = self.findChild(QtWidgets.QListWidget, 'listwidget2')  # Find the List

# Adding static element to it
self.listwidget2.insertItem(0, "Aamir Khan")
self.listwidget2.insertItem(1, "Shahruk Khan")
self.listwidget2.insertItem(2, "Salman Khan")
self.listwidget2.insertItem(3, "Hrittik Roshon")
self.listwidget2.insertItem(4, "Amitabh Bachhan")

Once, the user will select a specific value from this list, the app will execute the following event as shown below –

# Click Event
self.listwidget2.clicked.connect(self.showIndividualElement)

Again, to explore the method, you need to view the given logic –

def showIndividualElement(self, qmodelindex):
    item = self.listwidget2.currentItem()
    print(item.text())

Group Box, along with the radio button, works slightly different than our drop-down list.

For each radio button, we’ll have a dedicated text value that represents a different country in this context.

And, our application will bind all the radio button & then they will use one standard method for all of these four options as shown below –

# Adding Individual Radio Button
self.rdButton1 = self.findChild(QtWidgets.QRadioButton, 'rdButton1')  # Find the button
self.rdButton1.setChecked(True)
self.rdButton1.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton1))  # Remember to pass the definition/method, not the return value!

self.rdButton2 = self.findChild(QtWidgets.QRadioButton, 'rdButton2')  # Find the button
self.rdButton2.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton2))  # Remember to pass the definition/method, not the return value!

self.rdButton3 = self.findChild(QtWidgets.QRadioButton, 'rdButton3')  # Find the button
self.rdButton3.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton3))  # Remember to pass the definition/method, not the return value!

self.rdButton4 = self.findChild(QtWidgets.QRadioButton, 'rdButton4')  # Find the button
self.rdButton4.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton4))  # Remember to pass the definition/method, not the return value!

Also, note that, by default, rdButton1 is set to True i.e., it will be selected when the form load initially.

Let’s explore the printRadioButtonClick event.

def printRadioButtonClick(self, radioOption):

    if radioOption.text() == 'China':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

    if radioOption.text() == 'India':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

    if radioOption.text() == 'Japan':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

    if radioOption.text() == 'France':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

This will capture the radio button option & based on the currently clicked button, it will fetch the text out of it. Finally, that will match with the logic here & based on that, our application will display the output.

Finally, the Image process is slightly different.

Initially, our application will load the component from the .ui file & bind them with the Python environment –

self.imageLbl = self.findChild(QtWidgets.QLabel, 'imageLbl')

Image load option will only work when the user clicks the button that triggers the following sets of actions –

self.selectImgBtn = self.findChild(QtWidgets.QPushButton, 'selectImgBtn')  # Find the button
self.selectImgBtn.clicked.connect(self.setImage)  # Remember to pass the definition/method, not the return value!

Let’s explore the setImage method –

def setImage(self):
    fileName, _ = QtWidgets.QFileDialog.getOpenFileName(None, "Select Image", "", "Image Files (*.png *.jpg *jpeg *.bmp);;All Files (*)") # Ask for file
    if fileName: # If the user gives a file
        pixmap = QtGui.QPixmap(fileName) # Setup pixmap with the provided image
        pixmap = pixmap.scaled(self.imageLbl.width(), self.imageLbl.height(), QtCore.Qt.KeepAspectRatio) # Scale pixmap
        self.imageLbl.setPixmap(pixmap) # Set the pixmap onto the label
        self.imageLbl.setAlignment(QtCore.Qt.AlignCenter) # Align the label to center

This will prompt the corresponding dialogue box for choosing the right images out of the respective O/S.

Last but not least, the use of MsgBox, which can be extremely useful for many GUI based programming.

This msgbox doesn’t exist in the form. However, we’re creating it on the event of the “Confirm Button” as shown below –

self.cnfBtn = self.findChild(QtWidgets.QPushButton, 'cnfBtn')  # Find the button
self.cnfBtn.clicked.connect(self.showDialog)  # Remember to pass the definition/method, not the return value!

This will prompt the showDialog method to trigger –

def showDialog(self):
    msgBox = QMessageBox()
    msgBox.setIcon(QMessageBox.Information)
    msgBox.setText("Message box pop up window")
    msgBox.setWindowTitle("MessageBox Example")
    msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
    msgBox.buttonClicked.connect(self.msgButtonClick)

    returnValue = msgBox.exec()
    if returnValue == QMessageBox.Ok:
        print('OK clicked')

And, based on your options (“OK”/”Cancel”), it will prompt the final captured message in your console.

Let’s explore the videos of output from Windows O/S –

Let’s explore the video output from MAC VM –

For more information on this package – please check the following link.

So, as you can see, finally we’ve achieved it. We’ve demonstrated cross-platform GUI applications using native Python. And, here we didn’t even convert the ui design file to python script either.

Please share your feedback.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Creating a native mobile application using beeware framework using native Python

Hi Guys,

Today, I’ll explain a relatively new GUI framework with which you can create native mobile applications across all the different platforms. Note that this framework is still in the preview phase on many fronts. And you also can contribute here in many ways.

Let’s jump into making a chat application using this. It is relatively easy to build.

You need to install the following framework –

pip install beeware==0.3.0.dev3

As I’ve told you that this package is in the preview stage. So, you need to wait for few more days to get this package available for production use.

However, the following diagram presented in Pycon explains that all –

8. All Umbrella

Let’s jump into our objective.

To create the project, the following are steps that need to perform –

Functions that you need to use –

briefcase new

This will lead to several options that you need to fill as shown in the next couple of slides –

1. Project_Init_Win
2. Project_Init_Win

Note that, this one I’ve created in the Windows environment. So, you need to provide all these details before creating the app.

This will create a template of code as shown below –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
"""
Chat with my friends
"""
import toga
from toga.style import Pack
from toga.style.pack import COLUMN, ROW


class SDApp(toga.App):

    def startup(self):
        """
        Construct and show the Toga application.

        Usually, you would add your application to a main content box.
        We then create a main window (with a name matching the app), and
        show the main window.
        """
        main_box = toga.Box()

        self.main_window = toga.MainWindow(title=self.formal_name)
        self.main_window.content = main_box
        self.main_window.show()


def main():
    return SDApp()

And, you can run this template to see the default template output by using the following command –

briefcase dev

3. Run_Basic_In_Windows

Now, I want to take this & add some lines of codes to create a chat-based application in MAC & see how it behaves.

But, before that, the directory structure will look like this –

9. Dir_Structure

As you can see, SDChat is my application name. And, based on that, the following directories.

And, inside the final SDChat directory, the following files are created –

__init__.py

__main__.py

app.py

Let’s take this & modify it for MAC. To do that, we need to change app.py script & layout our all the essential GUI ingredients.

Note that I’m not going to discuss the custom bot that I created. Only, I’ll be referring to it.

1. app.py (This script contains all the GUI details & it will invoke the custom bot.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 17-Feb-2020              ####
#### Modified On 17-Feb-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################
"""
This is a Chat Application with custom made bot
"""
import toga
from toga.style import Pack
from toga.style.pack import COLUMN, ROW

# My Custom Built Bot
from SDChatbots import clsTalk2Bot as e

#-- New
class SDChat(toga.App):

    def startup(self):
        """
        Construct and show the Toga application.

        Usually, you would add your application to a main content box.
        We then create a main window (with a name matching the app), and
        show the main window.
        """

        self.chat = toga.DetailedList(data=[], style=Pack(flex=1))

        self.chat.data.append(icon = toga.Icon('resources/brutus.png'), title='SDChat', subtitle='Hi! How are you doing today?')

        self.text_input = toga.TextInput(style=Pack(flex=1))
        #send_button = toga.Button('Send')

        send_button = toga.Button(
            'Send',
            on_press=self.greet,
            style=Pack(padding_right=5)
        )

        input_box = toga.Box(
            children=[self.text_input, send_button],
            style=Pack(direction=ROW, alignment='center', padding=5)
        )

        main_box = toga.Box(children=[self.chat, input_box], style=Pack(direction=COLUMN))
        # main_box.add(send_button)

        self.main_window = toga.MainWindow(title=self.formal_name)

        self.main_window.content = main_box
        self.main_window.show()

    def greet(self, widget):
        print('Value: ', self.text_input.value)
        input_text = self.text_input.value

        self.chat.data.append(icon=toga.Icon('resources/user.png'), title='You', subtitle = input_text)

        # Chatbot
        y = e.clsTalk2Bot()
        ret_val = y.TalkNow(input_text)

        self.chat.data.append(icon=toga.Icon('resources/brutus.png'), title='SDChat', subtitle=ret_val)

        self.text_input.value = ''
        self.chat.scroll_to_bottom()


def main():
    return SDChat()

Let’s discuss a couple of essential lines from the above snippet –

self.chat = toga.DetailedList(data=[], style=Pack(flex=1))

self.chat.data.append(icon = toga.Icon('resources/brutus.png'), title='SDChat', subtitle='Hi! How are you doing today?')

This is the main display box, where you can see all the chat details. And, also, by default, it will prompt the initial conversion starter.

self.text_input = toga.TextInput(style=Pack(flex=1))

This is the place where you will be typing your text.

send_button = toga.Button(
    'Send',
    on_press=self.greet,
    style=Pack(padding_right=5)
)

Here is the button that you are creating. As you can see, almost all the places I’ve provided the “Style,” which is key to your object alignment inside your mobile app.

main_box = toga.Box(children=[self.chat, input_box], style=Pack(direction=COLUMN))

self.main_window = toga.MainWindow(title=self.formal_name)

self.main_window.content = main_box
self.main_window.show()

Finally, you need to bind all the components & you will show the page.

Now, let’s discuss the function that I’ve created for my button –

def greet(self, widget):
    input_text = self.text_input.value

    self.chat.data.append(icon=toga.Icon('resources/user.png'), title='You', subtitle = input_text)

    # Chatbot
    y = e.clsTalk2Bot()
    ret_val = y.TalkNow(input_text)

    self.chat.data.append(icon=toga.Icon('resources/brutus.png'), title='SDChat', subtitle=ret_val)

    self.text_input.value = ''
    self.chat.scroll_to_bottom()

As you can see, here, our application will capture the user input & based on that, our program will pass the input text to our chatbot. Also, you can see once that communication & response achieved, the input box will be cleared & the control will move down to the end of the chat screen. This is required. Otherwise, the user won’t be able to view the latest communication.

10. Run_MAC

So, as you can see that, this extremely easy to create & you can enhance it as per your need.

For more information on this framework, please go through the following link ->

https://pypi.org/project/beeware/0.3.0.dev3/

https://docs.beeware.org/en/latest/

https://beeware.org/project/about/

For more details, view the main page ->

https://beeware.org/

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Sending SMS using 3rd party API by integrating with custom-built BOT in Python

Hi Guys!

Today, We’re going to discuss the way to send SMS through popular 3rd-party API (Twilio) using Python 3.7.

Before that, you need to register with Twilio. By default, they will give you some credit in order to explore their API.

And, then you can get a virtual number from them, which will be used to exchange SMS between your trusted numbers for trial Account.

1. Booking Phone Number

The basic architecture can be depicted are as follows –

14. FeatureImage

How to get a verified number for your trial account?

Here is the way, you have to do that –

10. VerifiedNumbers

You can create your own trial account by using this link.

Apart from that, you need to download & install Ngrok. This is available for multi-platform. For our case, we’re using Windows.

The purpose is to run your local web service through a global API like interface. I’ll explain that later.

You need to register & install that on your computer –

2. Ngrok

Once, you download & install you need to use the global link of any running local server application like this –

3. GetURL

This is the dummy link. I’ll hide the original link. However, every time when you restart the application, you’ll get a new link. So, you will be safe anyway. 🙂

4. UpdateLink

Once, you get the link, you have to update that global link under the messaging section. Remember that, you have to keep the “/sms” part after that.

Let’s see our sample code. here, I would be integrating my custom developed BOT developed in Python. However, I’ll be only calling that library. We’re not going post any script or explain that over here.

1. serverSms.py ( This script is a server script, which is using flask framework & it will respond to the user’s text message by my custom developed BOT using Python)

# /usr/bin/env python
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 03-Nov-2019              ####
####                                      ####
#### Objective: This script will respond  ####
#### by BOT created by me. And, reply to  ####
#### sender about their queries.          ####
#### We're using Twillio API for this.    ####
####                                      ####
##############################################

from flask import Flask, request, redirect
from twilio import twiml
from twilio.twiml.messaging_response import Message, MessagingResponse
import logging
from flask import request
from SDChatbots.clsTalk2Bot import clsTalk2Bot

app = Flask(__name__)

@app.route("/sms", methods=['GET', 'POST'])
def sms_ahoy_reply():
    """Respond to incoming messages with a friendly SMS."""
    # Start our response
    # resp = twiml.Response()
    message_body = request.form['Body']

    print(message_body)
    logging.info(message_body)

    y = clsTalk2Bot()
    ret_val = y.TalkNow(message_body)
    zMsg = str(ret_val)
    print('Response: ', str(zMsg))

    resp = MessagingResponse()

    # Add a message
    resp.message(zMsg)

    return str(resp)

if __name__ == "__main__":
    app.run(debug=True)

Key lines from the above scripts are –

@app.route("/sms", methods=['GET', 'POST'])

The route is a way to let your application understand to trigger the appropriate functionalities inside your API.

message_body = request.form['Body']

Here, the application is capturing the incoming SMS & print that in your server log. We’ll see that when we run our application.

y = clsTalk2Bot()
ret_val = y.TalkNow(message_body)
zMsg = str(ret_val)

Now, the application is calling my developed python BOT & retrieve the response & convert it as a string before pushing the response SMS to the user, who originally send the SMS.

resp = MessagingResponse() --This is for Python 3.7 +

# Add a message
resp.message(zMsg)

return str(resp)

Finally, you are preparing the return SMS & send it back to the user.

For the old version, the following line might work –

resp = twiml.Response()

But, just check with the Twilio API.

Let’s run our server application. You will see the following screen –

11. ServerResponse

Let’s see, if one someone ask some question. How the application will respond –

7.1. BotIntegratedSMS

And, let’s explore how our server application is receiving it & the response from the server –

6. ServerResponse

Note that, we’ll be only sending the text to SMS, not the statistics sent by my BOT marked in RED.  😀

Let’s check the response from the BOT –

7.2. BotIntegratedSMS

Yes! We did it. 😀

But, make sure you are regularly checking your billing as this will cost you money. Always, check the current balance –

9. BillingInfo

You can check the usage from the following tab –

12. Usage

You can create a billing alarm to monitor your usage –

13. BillingAlert

Let me know, how do you like it.

So, we’ll come out with another exciting post in the coming days!

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.

Converting text to voice in Python

Hi Guys!

Today, we’ll be discussing one new post of converting text into a voice using some third-party APIs. This is particularly very useful in many such cases, where you can use this method to get more realistic communication.

There are many such providers, where you can get an almost realistic voice for both males & females. However, most of them are subscription-based. So, you have to be very careful about your budget & how to proceed.

For testing purposes, I’ll be using voice.org to simulate this.

Let’s look out the architecture of this process –

FlowS

As you can see, the user-initiated the application & provide some input in the form of plain text. Once the data is given, the app will send it to the third-party API for the process. Now, the Third-party API will verify the authentication & then it will check all the associate parameters before it starting to generate the audio response. After that, it will send the payload & that will be received by the calling python application. Here, it will be decoded & create the audio file & finally, that will be played at the invoking computer.

This third-party API has lots of limitations. However, they are giving you the platform to test your concept.

As of now, they support the following languages – English, Chinese, Catalan, French, Finnish, Dutch, Danish, German, Italian, Japanese, Korean, Polish, Norwegian, Portuguese, Russian, Spanish & Sweedish.

In our case, we’ll be checking with English.

To work with this, you need to have the following modules installed in python –

  • playsound
  • requests
  • base64

Let’s see the directory structure –

1. Directory

Again, we are not going to discuss any script, which we’ve already discussed here.

Hence, we’re skipping clsL.py here.

1. clsConfig.py (This script contains all the parameters of the server.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'url': "https://voicerss-text-to-speech.p.rapidapi.com/",
        'host': "voicerss-text-to-speech.p.rapidapi.com",
        'api_key': "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
        'targetFile': "Bot_decode.mp3",
        'pitch_speed': "-6",
        'bot_language': "en-us",
        'audio_type': "mp3",
        'audio_freq': "22khz_8bit_stereo",
        'query_string_api': "hhhhhhhhhhhhhhhhhhhhhhhhhhhh",
        'b64_encoding': True,
        'APP_DESC_1': 'Text to voice conversion.',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'LOG_PATH': Curr_Path + sep + 'log' + sep
    }

For security reasons, sensitive information masked with the dummy value.

‘api_key’: “xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx”,

‘query_string_api’: “hhhhhhhhhhhhhhhhhhhhhhhhhhhh”,

This two information is private to each subscriber. Hence, I’ve removed them & updated with some dummy values.

You have to fill-up with your subscribed information.

2. clsText2Voice.py (This script will convert the text data into an audio file using a GET API request from the third-party API & then play that using the web media player.)

###############################################
#### Written By: SATYAKI DE                ####
#### Written On: 27-Oct-2019               ####
#### Modified On 27-Oct-2019               ####
####                                       ####
#### Objective: Main class converting      ####
#### text to voice using third-party API.  ####
###############################################

from playsound import playsound
import requests
import base64
from clsConfig import clsConfig as cf

class clsText2Voice:
    def __init__(self):
        self.url = cf.config['url']
        self.api_key = cf.config['api_key']
        self.targetFile = cf.config['targetFile']
        self.pitch_speed = cf.config['pitch_speed']
        self.bot_language = cf.config['bot_language']
        self.audio_type = cf.config['audio_type']
        self.audio_freq = cf.config['audio_freq']
        self.b64_encoding = cf.config['b64_encoding']
        self.query_string_api = cf.config['query_string_api']
        self.host = cf.config['host']

    def getAudio(self, srcString):
        try:
            url = self.url
            api_key = self.api_key
            tarFile = self.targetFile
            pitch_speed = self.pitch_speed
            bot_language = self.bot_language
            audio_type = self.audio_type
            audio_freq = self.audio_freq
            b64_encoding = self.b64_encoding
            query_string_api = self.query_string_api
            host = self.host

            querystring = {
                "r": pitch_speed,
                "c": audio_type,
                "f": audio_freq,
                "src": srcString,
                "hl": bot_language,
                "key": query_string_api,
                "b64": b64_encoding
            }

            headers = {
                'x-rapidapi-host': host,
                'x-rapidapi-key': api_key
            }

            response = requests.request("GET", url, headers=headers, params=querystring)

            # Converting to MP3
            targetFile = tarFile
            mp3File_64_decode = base64.decodebytes(bytes(response.text, encoding="utf-8"))
            mp3File_result = open(targetFile, 'wb')

            # create a writable mp3File and write the decoding result
            mp3File_result.write(mp3File_64_decode)
            mp3File_result.close()

            playsound(targetFile)

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

Few crucial lines from the above script –

querystring = {
    "r": pitch_speed,
    "c": audio_type,
    "f": audio_freq,
    "src": srcString,
    "hl": bot_language,
    "key": query_string_api,
    "b64": b64_encoding
}

You can configure the voice of the audio by adjusting all the configurations. And, the text content will receive at srcString. So, whatever user will be typing that will be directly captured here & form the JSON payload accordingly.

response = requests.request("GET", url, headers=headers, params=querystring)

In this case, you will be receiving the audio file in the form of a base64 text file. Hence, you need to convert them back to the sound file by these following lines –

# Converting to MP3
targetFile = tarFile
mp3File_64_decode = base64.decodebytes(bytes(response.text, encoding="utf-8"))
mp3File_result = open(targetFile, 'wb')

# create a writable mp3File and write the decoding result
mp3File_result.write(mp3File_64_decode)
mp3File_result.close()

As you can see that, we’ve extracted the response.text & then we’ve decoded that to byte object to form the mp3 sound file at the receiving end.

Once we have our mp3 file ready, the following line simply plays the audio record.

playsound(targetFile)

Thus you can hear the actual voice.

3. callText2Voice.py (This is the main script that will invoke the text to voice API & then playback the audio once it gets the response from the third-party API.)

###############################################
#### Written By: SATYAKI DE                ####
#### Written On: 27-Oct-2019               ####
#### Modified On 27-Oct-2019               ####
####                                       ####
#### Objective: Main class converting      ####
#### text to voice using third-party API.  ####
###############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsText2Voice as ct

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedTwitter.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        # Query using parameters
        rawQry = str(input('Enter your string:'))

        x1 = ct.clsText2Voice()
        ret_2 = x1.getAudio(rawQry)

        if ret_2 == 0:
            print("Successfully converted from text to voice!")
            logging.info("Successfully converted from text to voice!")
            print("*" * 157)
            logging.info(tmpR0)
        else:
            print("Successfuly converted!")
            logging.info("Successfuly converted!")
            print("*" * 157)
            logging.info(tmpR0)

        print("*" * 157)
        logging.info(tmpR0)

        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError:
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Essential lines from the above script –

# Query using parameters
rawQry = str(input('Enter your string:'))

x1 = ct.clsText2Voice()
ret_2 = x1.getAudio(rawQry)

As you can see, here the user will be passing the text content, which will be given to our class & then it will project the audio sound of that text.

Let’s see how it runs –

Input Text: Welcome to Satyaki De’s blog. This site mainly deals with the Python, SQL from different DBs & many useful areas from the leading cloud providers.

And, here is the run command under Windows OS looks like –

2. Windows_Run

And, please find the sample voice that it generates –

So, We’ve done it! 😀

Let us know your comment on this.

So, we’ll come out with another exciting post in the coming days!

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.

Explaining New Python Library – Regular Expression in JSON

Hi Guys!

As discussed, here is the continuation of the previous post. We’ll explain the regular expression from the library that I’ve created recently.

First, let me share the calling script for regular expression –

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 08-Sep-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from dnpr.clsDnpr import clsDnpr
import datetime as dt
import json

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

def main():
    try:
        # Initializing the class
        t = clsDnpr()
        
        srcJson = [
                    {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
                    {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
                    {"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
                    {"FirstName": "Archi", "LastName": "Bose", "Sal": 7000},
                    {"FirstName": "Deb", "LastName": "Sen", "Sal": 9500}
                  ]

        print("4. Checking regular expression functionality!")
        print()

        var13 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var13))

        print('::Function Regex_Like:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Rexex_Like: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))

        # Invoking the distinct function
        tarJson = t.regex_like(srcJson, tarColumn, inpPattern)

        print('End of Function Regex_Like!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var14 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var14))

        var15 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var15))

        print('::Function Regex_Replace:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Rexex_Replace: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))
        replaceString = 'Ka'
        print('Replacing Character: ', replaceString)

        # Invoking the distinct function
        tarJson = t.regex_replace(srcJson, tarColumn, inpPattern, replaceString)

        print('End of Function Rexex_Replace!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var16 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var16))

        var17 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var17))

        print('::Function Regex_Substr:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Regex_Substr: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))

        # Invoking the distinct function
        tarJson = t.regex_substr(srcJson, tarColumn, inpPattern)

        print('End of Function Regex_Substr!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var18 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var18))

        print("=" * 157)
        print("End of regular expression function!")
        print("=" * 157)



    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

As per the library, we’ll discuss the following functionalities –

  1. regex_like
  2. regex_replace
  3. regex_substr

Now, let us check how to call these functions.

1. regex_like:

Following is the base skeleton in order to invoke this function –

regex_like(Input Json, Target Column, Pattern To Match) return Output Json

Here are the key lines in the script –

srcJson = [
            {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
            {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
            {"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
            {"FirstName": "Archi", "LastName": "Bose", "Sal": 7000},
            {"FirstName": "Deb", "LastName": "Sen", "Sal": 9500}
          ]

# Invoking the distinct function
tarJson = t.regex_like(srcJson, tarColumn, inpPattern)

2. regex_replace:

Following is the base skeleton in order to invoke this function –

regex_replace(Input Json, Target Column, Pattern to Replace) return Output Json

Here are the key lines in the script –

tarColumn = 'FirstName'
print('Target Column for Rexex_Replace: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))
replaceString = 'Ka'
print('Replacing Character: ', replaceString)

# Invoking the distinct function
tarJson = t.regex_replace(srcJson, tarColumn, inpPattern, replaceString)

As you can see, here ‘Sa’ with ‘Ka’ provided it matches the specific pattern in the JSON.

3. regex_replace:

Following is the base skeleton in order to invoke this function –

regex_substr(Input Json, Target Column, Pattern to substring) return Output Json

Here are the key lines –

tarColumn = 'FirstName'
print('Target Column for Regex_Substr: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))

# Invoking the distinct function
tarJson = t.regex_substr(srcJson, tarColumn, inpPattern)

In this case, we’ve subtracted a part of the JSON string & return the final result as JSON.

Let us first see the sample input JSON –

SourceJSON_Regex

Let us check how it looks when we run the calling script –

  • regex_like:
Regex_Like

This function will retrieve the elements, which will start with ‘Sa‘. As a result, we’ll see the following two elements in the Payload.

  • regex_replace:
Regex_Replace

In this case, we’re replacing any string which starts with ‘Sa‘ & replaced with the ‘Ka‘.

  • regex_substr:
Regex_Substr

As you can see that the first element FirstName changed the name from “Satyaki” to “tyaki“.

So, finally, we’ve achieved our target.

I’ll post the next exciting concept very soon.

Till then! Happy Avenging! 😀

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.