Validating source data against RAG-response using Open AI, GloVe, FAISS using Python

Today, I’ll be presenting another exciting capability of architecture in the world of LLMs, where you need to answer one crucial point & that is how valid the response generated by these LLMs is against your data. This response is critical when discussing business growth & need to take the right action at the right time.

Why not view the demo before going through it?

Demo

Isn’t it exciting? Great! Let us understand this in detail.

The first dotted box (extreme-left) represents the area that talks about the data ingestion from different sources, including third-party PDFs. It is expected that organizations should have ready-to-digest data sources. Examples: Data Lake, Data Mart, One Lake, or any other equivalent platforms. Those PDFs will provide additional insights beyond the conventional advanced analytics.

You need to have some kind of OCR solution that will extract all the relevant information in the form of text from the documents. 

The next important part is how you define the chunking & embedding of data chunks into Vector DB. Chunking & indexing strategies, along with the overlapping chain, play a crucial importance in tying that segregated piece of context into a single context that will be fed into the source for your preferred LLMs.

This system employs a vector similarity search to browse through unstructured information and concurrently accesses the database to retrieve the context, ensuring that the responses are not only comprehensive but also anchored in validated knowledge.

This approach is particularly vital for addressing multi-hop questions, where a single query can be broken down into multiple sub-questions and may require information from numerous documents to generate an accurate answer.


pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1
pip install faiss-cpu==1.7.4
pip install gensim==4.3.2

Let us understand the key class & snippets.

  • clsFeedVectorDB.py (This is the main class that will invoke the Faiss framework to contextualize the docs inside the vector DB with the source file name to validate the answer from Gen AI using Globe.6B embedding models.)

Let us understand some of the key snippets from the above script (Full scripts will be available in the GitHub Repo) –

# Sample function to convert text to a vector
def text2Vector(self, text):
    # Encode the text using the tokenizer
    words = [word for word in text.lower().split() if word in self.model]

    # If no words in the model, return a zero vector
    if not words:
        return np.zeros(self.model.vector_size)

    # Compute the average of the word vectors
    vector = np.mean([self.model[word] for word in words], axis=0)
    return vector.reshape(1, -1)

This code is for a function called “text2Vector” that takes some text as input and converts it into a numerical vector. Let me break it down step by step:

  • It starts by taking some text as input, and this text is expected to be a sentence or a piece of text.
  • The text is then split into individual words, and each word is converted to lowercase.
  • It checks if each word is present in a pre-trained language model (probably a word embedding model like Word2Vec or GloVe). If a word is not in the model, it’s ignored.
  • If none of the words from the input text are found in the model, the function returns a vector filled with zeros. This vector has the same size as the word vectors in the model.
  • If there are words from the input text in the model, the function calculates the average vector of these words. It does this by taking the word vectors for each word found in the model and computing their mean (average). This results in a single vector that represents the input text.
  • Finally, the function reshapes this vector into a 2D array with one row and as many columns as there are elements in the vector. The reason for this reshaping is often related to compatibility with other parts of the code or libraries used in the project.

So, in simple terms, this function takes a piece of text, looks up the word vectors for the words in that text, and calculates the average of those vectors to create a single numerical representation of the text. If none of the words are found in the model, it returns a vector of zeros.

    def genData(self):
        try:
            basePath = self.basePath
            modelFileName = self.modelFileName
            vectorDBPath = self.vectorDBPath
            vectorDBFileName = self.vectorDBFileName

            # Create a FAISS index
            dimension = int(cf.conf['NO_OF_MODEL_DIM'])  # Assuming 100-dimensional vectors 
            index = faiss.IndexFlatL2(dimension)

            print('*' * 240)
            print('Vector Index Your Data for Retrieval:')
            print('*' * 240)

            FullVectorDBname = vectorDBPath + vectorDBFileName
            indexFile = str(vectorDBPath) + str(vectorDBFileName) + '.index'

            print('File: ', str(indexFile))

            data = {}
            # List all files in the specified directory
            files = os.listdir(basePath)

            # Filter out files that are not text files
            text_files = [file for file in files if file.endswith('.txt')]

            # Read each text file
            for file in text_files:
                file_path = os.path.join(basePath, file)
                print('*' * 240)
                print('Processing File:')
                print(str(file_path))
                try:
                    # Attempt to open with utf-8 encoding
                    with open(file_path, 'r', encoding='utf-8') as file:
                        for line_number, line in enumerate(file, start=1):
                            # Assume each line is a separate document
                            vector = self.text2Vector(line)
                            vector = vector.reshape(-1)
                            index_id = index.ntotal

                            index.add(np.array([vector]))  # Adding the vector to the index
                            data[index_id] = {'text': line, 'line_number': line_number, 'file_name': file_path}  # Storing the line and file name
                except UnicodeDecodeError:
                    # If utf-8 fails, try a different encoding
                    try:
                        with open(file_path, 'r', encoding='ISO-8859-1') as file:
                            for line_number, line in enumerate(file, start=1):
                                # Assume each line is a separate document
                                vector = self.text2Vector(line)
                                vector = vector.reshape(-1)
                                index_id = index.ntotal
                                index.add(np.array([vector]))  # Adding the vector to the index
                                data[index_id] = {'text': line, 'line_number': line_number, 'file_name': file_path}  # Storing the line and file name
                    except Exception as e:
                        print(f"Could not read file {file}: {e}")
                        continue

                print('*' * 240)

            # Save the data dictionary using pickle
            dataCache = vectorDBPath + modelFileName
            with open(dataCache, 'wb') as f:
                pickle.dump(data, f)

            # Save the index and data for later use
            faiss.write_index(index, indexFile)

            print('*' * 240)

            return 0

        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1
  • This code defines a function called genData, and its purpose is to prepare and store data for later retrieval using a FAISS index. Let’s break down what it does step by step:
  • It starts by assigning several variables, such as basePath, modelFileName, vectorDBPath, and vectorDBFileName. These variables likely contain file paths and configuration settings.
  • It creates a FAISS index with a specified dimension (assuming 100-dimensional vectors in this case) using faiss.IndexFlatL2. FAISS is a library for efficient similarity search and clustering of high-dimensional data.
  • It prints the file name and lines where the index will be stored. It initializes an empty dictionary called data to store information about the processed text data.
  • It lists all the files in a directory specified by basePath. It filters out only the files that have a “.txt” extension as text files.
  • It then reads each of these text files one by one. For each file:
  1. It attempts to open the file with UTF-8 encoding.
    • It reads the file line by line.
    • For each line, it calls a function text2Vector to convert the text into a numerical vector representation. This vector is added to the FAISS index.
    • It also stores some information about the line, such as the line number and the file name, in the data dictionary.
    • If there is an issue with UTF-8 encoding, it tries to open the file with a different encoding, “ISO-8859-1”. The same process of reading and storing data continues.
  • If there are any exceptions (errors) during this process, it prints an error message but continues processing other files.
  • Once all the files are processed, it saves the data dictionary using the pickle library to a file specified by dataCache.
  • It also saves the FAISS index to a file specified by indexFile.
  • Finally, it returns 0 if the process completes successfully or 1 if there was an error during execution.

In summary, this function reads text files, converts their contents into numerical vectors, and builds a FAISS index for efficient similarity search. It also saves the processed data and the index for later use. If there are any issues during the process, it prints error messages but continues processing other files.

  • clsRAGOpenAI.py (This is the main class that will invoke the RAG class, which will get the contexts with references including source files, line numbers, and source texts. This will help the customer to validate the source against the OpenAI response to understand & control the data bias & other potential critical issues.)

Let us understand some of the key snippets from the above script (Full scripts will be available in the GitHub Repo) –

def ragAnswerWithHaystackAndGPT3(self, queryVector, k, question):
    modelName = self.modelName
    maxToken = self.maxToken
    temp = self.temp

    # Assuming getTopKContexts is a method that returns the top K contexts
    contexts = self.getTopKContexts(queryVector, k)
    messages = []

    # Add contexts as system messages
    for file_name, line_number, text in contexts:
        messages.append({"role": "system", "content": f"Document: {file_name} \nLine Number: {line_number} \nContent: {text}"})

    prompt = self.generateOpenaiPrompt(queryVector, k)
    prompt = prompt + "Question: " + str(question) + ". \n Answer based on the above documents."

    # Add user question
    messages.append({"role": "user", "content": prompt})

    # Create chat completion
    completion = client.chat.completions.create(
    model=modelName,
    messages=messages,
    temperature = temp,
    max_tokens = maxToken
    )

    # Assuming the last message in the response is the answer
    last_response = completion.choices[0].message.content
    source_refernces = ['FileName: ' + str(context[0]) + ' - Line Numbers: ' + str(context[1]) + ' - Source Text (Reference): ' + str(context[2]) for context in contexts]

    return last_response, source_refernces
  • This code defines a function called ragAnswerWithHaystackAndGPT3. Its purpose is to use a combination of the Haystack search method and OpenAI’s GPT-3 model to generate an answer to a user’s question. Let’s break down what it does step by step:
  • It starts by assigning several variables, such as modelName, maxToken, and temp. These variables likely contain model-specific information and settings for GPT-3.
  • It calls a method getTopKContexts to retrieve the top K contexts (which are likely documents or pieces of text) related to the user’s query. These contexts are stored in the contexts variable.
  • It initializes an empty list called messages to store messages that will be used in the conversation with the GPT-3 model.
  • It iterates through each context and adds them as system messages to the messages list. These system messages provide information about the documents or sources being used in the conversation.
  • It creates a prompt that combines the query, retrieved contexts, and the user’s question. This prompt is then added as a user message to the messages list. It effectively sets up the conversation for GPT-3, where the user’s question is followed by context.
  • It makes a request to the GPT-3 model using the client.chat.completions.create method, passing in the model name, the constructed messages, and other settings such as temperature and maximum tokens.
  • After receiving a response from GPT-3, it assumes that the last message in the response contains the answer generated by the model.
  • It also constructs source_references, which is a list of references to the documents or sources used in generating the answer. This information includes the file name, line numbers, and source text for each context.
  • Finally, it returns the generated answer (last_response) and the source references to the caller.

In summary, this function takes a user’s query, retrieves relevant contexts or documents, sets up a conversation with GPT-3 that includes the query and contexts, and then uses GPT-3 to generate an answer. It also provides references to the sources used in generating the answer.

    def getTopKContexts(self, queryVector, k):
        try:
            distances, indices = index.search(queryVector, k)
            resDict = [(data[i]['file_name'], data[i]['line_number'], data[i]['text']) for i in indices[0]]
            return resDict
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return x

This code defines a function called getTopKContexts. Its purpose is to retrieve the top K relevant contexts or pieces of information from a pre-built index based on a query vector. Here’s a breakdown of what it does:

  1. It takes two parameters as input: queryVector, which is a numerical vector representing a query, and k, which specifies how many relevant contexts to retrieve.
  2. Inside a try-except block, it attempts the following steps:
    • It uses the index.search method to find the top K closest contexts to the given queryVector. This method returns two arrays: distances (measuring how similar the contexts are to the query) and indices (indicating the positions of the closest contexts in the data).
    • It creates a list called “resDict", which contains tuples for each of the top K contexts. Each tuple contains three pieces of information: the file name (file_name), the line number (line_number), and the text content (text) of the context. These details are extracted from a data dictionary.
  3. If the process completes successfully, it returns the list of top K contexts (resDict) to the caller.
  4. If there’s an exception (an error) during this process, it captures the error message as a string (x), prints the error message, and then returns the error message itself.

In summary, this function takes a query vector and finds the K most relevant contexts or pieces of information based on their similarity to the query. It returns these contexts as a list of tuples containing file names, line numbers, and text content. If there’s an error, it prints an error message and returns the error message string.

def generateOpenaiPrompt(self, queryVector, k):
    contexts = self.getTopKContexts(queryVector, k)
    template = ct.templateVal_1
    prompt = template
    for file_name, line_number, text in contexts:
        prompt += f"Document: {file_name}\n Line Number: {line_number} \n Content: {text}\n\n"
    return prompt

This code defines a function called generateOpenaiPrompt. Its purpose is to create a prompt or a piece of text that combines a template with information from the top K relevant contexts retrieved earlier. Let’s break down what it does:

  1. It starts by calling the getTopKContexts function to obtain the top K relevant contexts based on a given queryVector.
  2. It initializes a variable called template with a predefined template value (likely defined elsewhere in the code).
  3. It sets the prompt variable to the initial template.
  4. Then, it enters a loop where it iterates through each of the relevant contexts retrieved earlier (contexts are typically documents or text snippets).
  5. For each context, it appends information to the prompt. Specifically, it adds lines to the prompt that include:
    • The document’s file name (Document: [file_name]).
    • The line number within the document (Line Number: [line_number]).
    • The content of the context itself (Content: [text]).
  6. It adds some extra spacing (newlines) between each context to ensure readability.
  7. Finally, it returns the complete – prompt, which is a combination of the template and information from the relevant contexts.

In summary, this function takes a query vector, retrieves relevant contexts, and creates a prompt by combining a template with information from these contexts. This prompt can then be used as input for an AI model or system, likely for generating responses or answers based on the provided context.

Let us understand the directory structure of this entire application –


To learn more about this package, please visit the following GitHub link.

So, finally, we’ve done it. I know that this post is relatively smaller than my earlier post. But, I think, you can get a good hack to improve some of your long-running jobs by applying this trick.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

The following are the important packages that are essential to this project –

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from flask import Flask, jsonify, request, session
from flask_cors import CORS
from werkzeug.security import check_password_hash, generate_password_hash
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
import pandas as pd
from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsContentScrapper as csc
import clsRAGOpenAI as crao
import csv
from datetime import timedelta
import os
import re
import json

########################################################
################    Global Area   ######################
########################################################
#Initiating Logging Instances
clog = log.clsL()

admin_key = cf.conf['ADMIN_KEY']
secret_key = cf.conf['SECRET_KEY']
session_path = cf.conf['SESSION_PATH']
sessionFile = cf.conf['SESSION_CACHE_FILE']

app = Flask(__name__)
CORS(app)  # This will enable CORS for all routes
app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
app.secret_key = secret_key

jwt = JWTManager(app)

users = cf.conf['USER_NM']
passwd = cf.conf['USER_PWD']

cCScrapper = csc.clsContentScrapper()
cr = crao.clsRAGOpenAI()

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Define the aggregation functions
def join_unique(series):
    unique_vals = series.drop_duplicates().astype(str)
    return ', '.join(filter(lambda x: x != 'nan', unique_vals))

# Building the preaggregate cache
def groupImageWiki():
    try:
        base_path = cf.conf['OUTPUT_PATH']
        inputFile = cf.conf['CLEANED_FILE']
        outputFile = cf.conf['CLEANED_FILE_SHORT']
        subdir = cf.conf['SUBDIR_OUT']
        Ind = cf.conf['DEBUG_IND']

        inputCleanedFileLookUp = base_path + inputFile

        #Opening the file in dataframe
        df = pd.read_csv(inputCleanedFileLookUp)
        hash_values = df['Total_Hash'].unique()

        dFin = df[['primaryImage','Wiki_URL','Total_Hash']]

        # Ensure columns are strings and not NaN
        # Convert columns to string and replace 'nan' with an empty string
        dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
        dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')

        dFin.drop_duplicates()

        # Group by 'Total_Hash' and aggregate
        dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()

        return dfAgg

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df

resDf = groupImageWiki()

########################################################
################  End  Global Area  ####################
########################################################

def extractRemoveUrls(hash_value):
    image_urls = ''
    wiki_urls = ''
    # Parse the inner message JSON string
    try:

        resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
        filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]

        if not filtered_df.empty:
            image_urls = filtered_df['primaryImage'].values[0]
            wiki_urls = filtered_df['Wiki_URL'].values[0]

        return image_urls, wiki_urls

    except Exception as e:
        x = str(e)
        print('extractRemoveUrls Error: ', x)
        return image_urls, wiki_urls

def isIncomplete(line):
    """Check if a line appears to be incomplete."""

    # Check if the line ends with certain patterns indicating it might be incomplete.
    incomplete_patterns = [': [Link](', ': Approximately ', ': ']
    return any(line.endswith(pattern) for pattern in incomplete_patterns)

def filterData(data):
    """Return only the complete lines from the data."""

    lines = data.split('\n')
    complete_lines = [line for line in lines if not isIncomplete(line)]

    return '\n'.join(complete_lines)

def updateCounter(sessionFile):
    try:
        counter = 0

        # Check if the CSV file exists
        if os.path.exists(sessionFile):
            with open(sessionFile, 'r') as f:
                reader = csv.reader(f)
                for row in reader:
                    # Assuming the counter is the first value in the CSV
                    counter = int(row[0])

        # Increment counter
        counter += 1

        # Write counter back to CSV
        with open(sessionFile, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([counter])

        return counter
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

def getPreviousResult():
    try:
        fullFileName = session_path + sessionFile
        newCounterValue = updateCounter(fullFileName)

        return newCounterValue
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username', None)
    password = request.json.get('password', None)

    print('User Name: ', str(username))
    print('Password: ', str(password))

    #if username not in users or not check_password_hash(users.get(username), password):
    if ((username not in users) or (password not in passwd)):
        return jsonify({'login': False}), 401

    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

@app.route('/chat', methods=['POST'])
def get_chat():
    try:
        #session["key"] = "1D98KI"
        #session_id = session.sid
        #print('Session Id: ', str(session_id))

        cnt = getPreviousResult()
        print('Running Session Count: ', str(cnt))

        username = request.json.get('username', None)
        message = request.json.get('message', None)

        print('User: ', str(username))
        print('Content: ', str(message))

        if cnt == 1:
            retList = cCScrapper.extractCatalog()
        else:
            hashValue, cleanedData = cr.getData(str(message))
            print('Main Hash Value:', str(hashValue))

            imageUrls, wikiUrls = extractRemoveUrls(hashValue)
            print('Image URLs: ', str(imageUrls))
            print('Wiki URLs: ', str(wikiUrls))
            print('Clean Text:')
            print(str(cleanedData))
            retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'

        response = {
            'message': retList
        }

        print('JSON: ', str(response))
        return jsonify(response)

    except Exception as e:
        x = str(e)

        response = {
            'message': 'Error: ' + x
        }
        return jsonify(response)

@app.route('/api/data', methods=['GET'])
@jwt_required()
def get_data():
    response = {
        'message': 'Hello from Flask!'
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

Let us understand some of the important sections of the above script –

Function – login():

The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

Function – get_chat():

The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

Function – updateCounter():

The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

Function – extractRemoveUrls():

The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

  • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 27-May-2023                     ####
#### Modified On 28-May-2023                     ####
####                                             ####
#### Objective: This is the main calling         ####
#### python class that will invoke the           ####
#### LangChain of package to extract             ####
#### the transcript from the YouTube videos &    ####
#### then answer the questions based on the      ####
#### topics selected by the users.               ####
####                                             ####
#####################################################

from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)

from googleapiclient.discovery import build

import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf

import os

from flask import jsonify
import requests

###############################################
###           Global Section                ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)

YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###    End of Global Section                ###
###############################################

class clsContentScrapper:
    def __init__(self):
        self.model_name = cf.conf['MODEL_NAME']
        self.temp_val = cf.conf['TEMP_VAL']
        self.max_cnt = int(cf.conf['MAX_CNT'])
        self.url = cf.conf['BASE_URL']
        self.header_token = cf.conf['HEADER_TOKEN']

    def extractCatalog(self):
        try:
            base_url = self.url
            header_token = self.header_token

            url = base_url + '/departments'

            print('Full URL: ', str(url))

            payload={}
            headers = {'Cookie': header_token}

            response = requests.request("GET", url, headers=headers, data=payload)

            x = response.text

            return x
        except Exception as e:
            discussedTopic = []
            x = str(e)
            print('Error: ', x)

            return x

Let us understand the the core part that require from this class.

Function – extractCatalog():

The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

  • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai

from clsConfigClient import clsConfigClient as cf
import clsL as log

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

import os
import re
###############################################
###           Global Section                ###
###############################################
Ind = cf.conf['DEBUG_IND']
queryModel = cf.conf['QUERY_MODEL']
passageModel = cf.conf['PASSAGE_MODEL']

#Initiating Logging Instances
clog = log.clsL()

os.environ["TOKENIZERS_PARALLELISM"] = "false"

vectorDBFileName = cf.conf['VECTORDB_FILE_NM']

indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

print('File: ', str(indexFile))
print('Config: ', str(indexConfig))

# Also, provide `config_path` parameter if you set it when calling the `save()` method:
new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)

# Initialize Retriever
retriever = DensePassageRetriever(document_store=new_document_store,
                                  query_embedding_model=queryModel,
                                  passage_embedding_model=passageModel,
                                  use_gpu=False)


###############################################
###    End of Global Section                ###
###############################################

class clsRAGOpenAI:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.fileName = cf.conf['FILE_NAME']
        self.Ind = cf.conf['DEBUG_IND']
        self.subdir = str(cf.conf['OUT_DIR'])
        self.base_url = cf.conf['BASE_URL']
        self.outputPath = cf.conf['OUTPUT_PATH']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.openAIKey = cf.conf['OPEN_AI_KEY']
        self.temp = cf.conf['TEMP_VAL']
        self.modelName = cf.conf['MODEL_NAME']
        self.maxToken = cf.conf['MAX_TOKEN']

    def extractHash(self, text):
        try:
            # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
            pattern = r"Ref: \{'(\d+)'\}"
            match = re.search(pattern, text)

            if match:
                return match.group(1)
            else:
                return None
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return None

    def removeSentencesWithNaN(self, text):
        try:
            # Split text into sentences using regular expression
            sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
            # Filter out sentences containing 'nan'
            filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
            # Rejoin the sentences
            return ' '.join(filteredSentences)
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return ''

    def retrieveDocumentsReader(self, question, top_k=9):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrieved_docs, question):
        try:
            openai.api_key = self.openAIKey
            temp = self.temp
            modelName = self.modelName
            maxToken = self.maxToken

            documentsText = " ".join([doc.content for doc in retrieved_docs])

            filteredDocs = self.removeSentencesWithNaN(documentsText)
            hashValue = self.extractHash(filteredDocs)

            print('RAG Docs:: ')
            print(filteredDocs)
            #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"

            # Set up a chat-style prompt with your data
            messages = [
                {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                {"role": "user", "content": filteredDocs}
            ]

            # Chat style invoking the latest model
            response = openai.ChatCompletion.create(
                model=modelName,
                messages=messages,
                temperature = temp,
                max_tokens=maxToken
            )
            return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
        except Exception as e:
            x = str(e)
            print('failed to get from OpenAI: ', x)
            return 'Not Available!'

    def ragAnswerWithHaystackAndGPT3(self, question):
        retrievedDocs = self.retrieveDocumentsReader(question)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def getData(self, strVal):
        try:
            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            print('Response from New Docs: ')
            print()

            hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)

            print('GPT3 Answer::')
            print(answer)
            print('Hash Value:')
            print(str(hashValue))

            print('*'*240)
            print('End Of Use RAG to Generate Answers:')
            print('*'*240)

            return hashValue, answer
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            answer = x
            hashValue = 1

            return hashValue, answer

Let us understand some of the important block –

Function – ragAnswerWithHaystackAndGPT3():

The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

Function – generateAnswerWithGPT3():

The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

Function – retrieveDocumentsReader():

The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

  • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
// App.js
import React, { useState } from 'react';
import axios from 'axios';
import './App.css';

const App = () => {
  const [isLoggedIn, setIsLoggedIn] = useState(false);
  const [username, setUsername] = useState('');
  const [password, setPassword] = useState('');
  const [message, setMessage] = useState('');
  const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);

  const handleLogin = async (e) => {
    e.preventDefault();
    try {
      const response = await axios.post('http://localhost:5000/login', { username, password });
      if (response.status === 200) {
        setIsLoggedIn(true);
      }
    } catch (error) {
      console.error('Login error:', error);
    }
  };

  const sendMessage = async (username) => {
    if (message.trim() === '') return;

    // Create a new chat entry
    const newChatEntry = {
      sender: 'user',
      message: message.trim(),
    };

    // Clear the input field
    setMessage('');

    try {
      // Make API request to Python-based API
      const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
      const responseData = response.data;

      // Print the response to the console for debugging
      console.log('API Response:', responseData);

      // Parse the nested JSON from the 'message' attribute
      const jsonData = JSON.parse(responseData.message);

      // Check if the data contains 'departments'
      if (jsonData.departments) {

        // Extract the 'departments' attribute from the parsed data
        const departments = jsonData.departments;

        // Extract the department names and create a single string with line breaks
        const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
      }
      else if (jsonData.records)
      {
        // Data structure 2: Artwork information
        const records = jsonData.records;

        // Prepare chat entries
        const chatEntries = [];

        // Iterate through records and extract text, image, and wiki information
        records.forEach((record) => {
          const textInfo = Object.entries(record).map(([key, value]) => {
            if (key !== 'Image' && key !== 'Wiki') {
              return `${key}: ${value}`;
            }
            return null;
          }).filter((info) => info !== null).join('\n');

          const imageLink = record.Image;
          //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
          //const wikiLinks = record.Wiki;
          const wikiLinks = record.Wiki.split(',').map(link => link.trim());

          console.log('Wiki:', wikiLinks);

          // Check if there is a valid image link
          const hasValidImage = imageLink && imageLink !== '[]';

          const imageElement = hasValidImage ? (
            <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
          ) : null;

          // Create JSX elements for rendering the wiki links (if available)
          const wikiElements = wikiLinks.map((link, index) => (
            <div key={index}>
              <a href={link} target="_blank" rel="noopener noreferrer">
                Wiki Link {index + 1}
              </a>
            </div>
          ));

          if (textInfo) {
            chatEntries.push({ sender: 'bot', message: textInfo });
          }

          if (imageElement) {
            chatEntries.push({ sender: 'bot', message: imageElement });
          }

          if (wikiElements.length > 0) {
            chatEntries.push({ sender: 'bot', message: wikiElements });
          }
        });

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
      }

    } catch (error) {
      console.error('Error sending message:', error);
    }
  };

  if (!isLoggedIn) {
    return (
      <div className="login-container">
        <h2>Welcome to the MuBot</h2>
        <form onSubmit={handleLogin} className="login-form">
          <input
            type="text"
            placeholder="Enter your name"
            value={username}
            onChange={(e) => setUsername(e.target.value)}
            required
          />
          <input
            type="password"
            placeholder="Enter your password"
            value={password}
            onChange={(e) => setPassword(e.target.value)}
            required
          />
          <button type="submit">Login</button>
        </form>
      </div>
    );
  }

  return (
    <div className="chat-container">
      <div className="chat-header">
        <h2>Hello, {username}</h2>
        <h3>Chat with MuBot</h3>
      </div>
      <div className="chat-log">
        {chatLog.map((chatEntry, index) => (
          <div
            key={index}
            className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
          >
            <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
            <p className="chat-message">{chatEntry.message}</p>
          </div>
        ))}
      </div>
      <div className="chat-input">
        <input
          type="text"
          placeholder="Type your message..."
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter') {
              sendMessage();
            }
          }}
        />
        <button onClick={sendMessage}>Send</button>
      </div>
    </div>
  );
};

export default App;

Please find some of the important logic –

Function – handleLogin():

The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

Function – sendMessage():

The sendMessage asynchronous function is designed to handle the user’s chat interaction:

  1. If the message is empty (after trimming spaces), the function exits without further action.
  2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
  3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
  4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
  5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
  6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
  7. Errors, if encountered, are logged to the console.

This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Detecting real-time human emotions using Open-CV, DeepFace & Python

Hi Guys,

Today, I’ll be using another exciting installment of Computer Vision. Our focus will be on getting a sense of human emotions. Let me explain. This post will demonstrate how to read/detect human emotions by analyzing computer vision videos. We will be using part of a Bengali Movie called “Ganashatru (An enemy of the people)” entirely for educational purposes & also as a tribute to the great legendary director late Satyajit Roy. To know more about him, please click the following link.

Why don’t we see the demo first before jumping into the technical details?

Demo

Architecture:

Let us understand the architecture –

Process Flow

From the above diagram, one can see that the application, which uses both the Open-CV & DeepFace, analyzes individual frames from the source. Then predicts the emotions & adds the label in the target B&W frames. Finally, it creates another video by correctly mixing the source audio.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install deepface
pip install opencv-python
pip install ffpyplayer

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsConfig.py (This script will play the video along with audio in sync.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 22-Apr-2022 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': 'GonoshotruClimax',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'FINAL_PATH': Curr_Path + sep + 'Target' + sep,
'APP_DESC_1': 'Video Emotion Capture!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'VIDEO_FILE_EXTN': '.mp4',
'AUDIO_FILE_EXTN': '.mp3',
'IMAGE_FILE_EXTN': '.jpg',
'TITLE': "Gonoshotru – Emotional Analysis"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

All the above inputs are generic & used as normal parameters.

  • clsFaceEmotionDetect.py (This python class will track the human emotions after splitting the audio from the video & put that label on top of the video frame.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Apr-2022 ####
#### Modified On 20-Apr-2022 ####
#### ####
#### Objective: This python class will ####
#### track the human emotions after splitting ####
#### the audio from the video & put that ####
#### label on top of the video frame. ####
#### ####
##################################################
from imutils.video import FileVideoStream
from imutils.video import FPS
import numpy as np
import imutils
import time
import cv2
from clsConfig import clsConfig as cf
from deepface import DeepFace
import clsL as cl
import subprocess
import sys
import os
# Initiating Log class
l = cl.clsL()
class clsFaceEmotionDetect:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.FileName = str(cf.conf['FILE_NAME'])
self.VideoFileExtn = str(cf.conf['VIDEO_FILE_EXTN'])
self.ImageFileExtn = str(cf.conf['IMAGE_FILE_EXTN'])
def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def readEmotion(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
FileName = self.FileName
VideoFileExtn = self.VideoFileExtn
ImageFileExtn = self.ImageFileExtn
font = cv2.FONT_HERSHEY_SIMPLEX
# Load Video
videoFile = Curr_Path + sep + 'Video' + sep + FileName + VideoFileExtn
temp_path = Curr_Path + sep + 'Temp' + sep
# Extracting the audio from the source video
x = self.convert_video_to_audio_ffmpeg(videoFile)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
# Loading the haarcascade xml class
faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# start the file video stream thread and allow the buffer to
# start to fill
print("[INFO] Starting video file thread…")
fvs = FileVideoStream(videoFile).start()
time.sleep(1.0)
cnt = 0
# start the FPS timer
fps = FPS().start()
try:
# loop over frames from the video file stream
while fvs.more():
cnt += 1
# grab the frame from the threaded video file stream, resize
# it, and convert it to grayscale (while still retaining 3
# channels)
try:
frame = fvs.read()
except Exception as e:
x = str(e)
print('Error: ', x)
frame = imutils.resize(frame, width=720)
cv2.imshow("Gonoshotru – Source", frame)
# Enforce Detection to False will continue the sequence even when there is no face
result = DeepFace.analyze(frame, enforce_detection=False, actions = ['emotion'])
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = np.dstack([frame, frame, frame])
faces = faceCascade.detectMultiScale(image=frame, scaleFactor=1.1, minNeighbors=4, minSize=(80,80), flags=cv2.CASCADE_SCALE_IMAGE)
# Draw a rectangle around the face
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x + w, y + h), (0,255,0), 2)
# Use puttext method for inserting live emotion on video
cv2.putText(frame, result['dominant_emotion'], (50,390), font, 3, (0,0,255), 2, cv2.LINE_4)
# display the size of the queue on the frame
#cv2.putText(frame, "Queue Size: {}".format(fvs.Q.qsize()), (10, 30), font, 0.6, (0, 255, 0), 2)
cv2.imwrite(temp_path+'frame-' + str(cnt) + ImageFileExtn, frame)
# show the frame and update the FPS counter
cv2.imshow("Gonoshotru – Emotional Analysis", frame)
fps.update()
if cv2.waitKey(2) & 0xFF == ord('q'):
break
except Exception as e:
x = str(e)
print('Error: ', x)
print('No more frame exists!')
# stop the timer and display FPS information
fps.stop()
print("[INFO] Elasped Time: {:.2f}".format(fps.elapsed()))
print("[INFO] Approx. FPS: {:.2f}".format(fps.fps()))
# do a bit of cleanup
cv2.destroyAllWindows()
fvs.stop()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Key snippets from the above scripts –

def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
    try:
        """Converts video to audio directly using `ffmpeg` command
        with the help of subprocess module"""
        filename, ext = os.path.splitext(video_file)
        subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.STDOUT)

        return 0
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above snippet represents an Audio extraction function that will extract the audio from the source file & store it in the specified directory.

# Loading the haarcascade xml class
faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

Now, Loading is one of the best classes for face detection, which our applications require.

fvs = FileVideoStream(videoFile).start()

Using FileVideoStream will enable our application to process the video faster than cv2.VideoCapture() method.

# start the FPS timer
fps = FPS().start()

The application then invokes the FPS.Start() that will initiate the FPS timer.

# loop over frames from the video file stream
while fvs.more():

The application will check using fvs.more() to find the EOF of the video file. Until then, it will try to read individual frames.

try:
    frame = fvs.read()
except Exception as e:
    x = str(e)
    print('Error: ', x)

The application will read individual frames. In case of any issue, it will capture the correct error without terminating the main program at the beginning. This exception strategy is beneficial when there is no longer any frame to read & yet due to the end frame issue, the entire application throws an error.

frame = imutils.resize(frame, width=720)
cv2.imshow("Gonoshotru - Source", frame)

At this point, the application is resizing the frame for better resolution & performance. Furthermore, identify this video feed as a source.

# Enforce Detection to False will continue the sequence even when there is no face
result = DeepFace.analyze(frame, enforce_detection=False, actions = ['emotion'])

Finally, the application has used the deepface machine-learning API to analyze the subject face & trying to predict its emotions.

frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = np.dstack([frame, frame, frame])

faces = faceCascade.detectMultiScale(image=frame, scaleFactor=1.1, minNeighbors=4, minSize=(80,80), flags=cv2.CASCADE_SCALE_IMAGE)

detectMultiScale function can use to detect the faces. This function will return a rectangle with coordinates (x, y, w, h) around the detected face.

It takes three common arguments — the input image, scaleFactor, and minNeighbours.

scaleFactor specifies how much the image size reduces with each scale. There may be more faces near the camera in a group photo than others. Naturally, such faces would appear more prominent than the ones behind. This factor compensates for that.

minNeighbours specifies how many neighbors each candidate rectangle should have to retain. One may have to tweak these values to get the best results. This parameter specifies the number of neighbors a rectangle should have to be called a face.

# Draw a rectangle around the face
for (x, y, w, h) in faces:
    cv2.rectangle(frame, (x, y), (x + w, y + h), (0,255,0), 2)

As discussed above, the application is now calculating the square’s boundary after receiving the values of x, y, w, & h.

# Use puttext method for inserting live emotion on video
cv2.putText(frame, result['dominant_emotion'], (50,390), font, 3, (0,0,255), 2, cv2.LINE_4)

Finally, capture the dominant emotion from the deepface API & post it on top of the target video.

# display the size of the queue on the frame
cv2.imwrite(temp_path+'frame-' + str(cnt) + ImageFileExtn, frame)

# show the frame and update the FPS counter
cv2.imshow("Gonoshotru - Emotional Analysis", frame)
fps.update()

Also, writing individual frames into a temporary folder, where later they will be consumed & mixed with the source audio.

if cv2.waitKey(2) & 0xFF == ord('q'):
    break

At any given point, if the user wants to quit, the above snippet will allow them by simply pressing either the escape-button or ‘q’-button from the keyboard.

  • clsVideoPlay.py (This script will play the video along with audio in sync.)


###############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 17-Apr-2022 ####
#### ####
#### Objective: This script will play the ####
#### video along with audio in sync. ####
#### ####
###############################################
import os
import platform as pl
import cv2
import numpy as np
import glob
import re
import ffmpeg
import time
from clsConfig import clsConfig as cf
from ffpyplayer.player import MediaPlayer
import logging
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
class clsVideoPlay:
def __init__(self):
self.fileNmFin = str(cf.conf['FILE_NAME'])
self.final_path = str(cf.conf['FINAL_PATH'])
self.title = str(cf.conf['TITLE'])
self.VideoFileExtn = str(cf.conf['VIDEO_FILE_EXTN'])
def videoP(self, file):
try:
cap = cv2.VideoCapture(file)
player = MediaPlayer(file)
start_time = time.time()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
_, val = player.get_frame(show=False)
if val == 'eof':
break
cv2.imshow(file, frame)
elapsed = (time.time() start_time) * 1000 # msec
play_time = int(cap.get(cv2.CAP_PROP_POS_MSEC))
sleep = max(1, int(play_time elapsed))
if cv2.waitKey(sleep) & 0xFF == ord("q"):
break
player.close_player()
cap.release()
cv2.destroyAllWindows()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def stream(self, dInd, var):
try:
VideoFileExtn = self.VideoFileExtn
fileNmFin = self.fileNmFin + VideoFileExtn
final_path = self.final_path
title = self.title
FullFileName = final_path + fileNmFin
ret = self.videoP(FullFileName)
if ret == 0:
print('Successfully Played the Video!')
return 0
else:
return 1
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

view raw

clsVideoPlay.py

hosted with ❤ by GitHub

Let us explore the key snippet –

cap = cv2.VideoCapture(file)
player = MediaPlayer(file)

In the above snippet, the application first reads the video & at the same time, it will create an instance of the MediaPlayer.

play_time = int(cap.get(cv2.CAP_PROP_POS_MSEC))

The application uses cv2.CAP_PROP_POS_MSEC to synchronize video and audio.

  • peopleEmotionRead.py (This is the main calling python script that will invoke the class to initiate the model to read the real-time human emotions from video.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 20-Apr-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsFaceEmotionDetect class to initiate ####
#### the model to read the real-time ####
#### human emotions from video or even from ####
#### Web-CAM & predict it continuously. ####
##################################################
# We keep the setup code in a different class as shown below.
import clsFaceEmotionDetect as fed
import clsFrame2Video as fv
import clsVideoPlay as vp
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the three classes
x1 = fed.clsFaceEmotionDetect()
x2 = fv.clsFrame2Video()
x3 = vp.clsVideoPlay()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Capturing Real-Time Human Emotions!')
# Execute all the pass
r1 = x1.readEmotion(debugInd, var)
r2 = x2.convert2Vid(debugInd, var)
r3 = x3.stream(debugInd, var)
if ((r1 == 0) and (r2 == 0) and (r3 == 0)):
print('Successfully identified human emotions!')
else:
print('Failed to identify the human emotions!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The key-snippet from the above script are as follows –

# Instantiating all the three classes

x1 = fed.clsFaceEmotionDetect()
x2 = fv.clsFrame2Video()
x3 = vp.clsVideoPlay()

As one can see from the above snippet, all the major classes are instantiated & loaded into the memory.

# Execute all the pass
r1 = x1.readEmotion(debugInd, var)
r2 = x2.convert2Vid(debugInd, var)
r3 = x3.stream(debugInd, var)

All the responses are captured into the corresponding variables, which later check for success status.


Let us capture & compare the emotions in a screenshot for better understanding –

Emotion Analysis

So, one can see that most of the frames from the video & above-posted frame correctly identify the human emotions.


FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

Directory

So, we’ve done it.

You will get the complete codebase in the following Github link.

If you want to know more about this legendary director & his famous work, please visit the following link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Prepare analytics based on streaming data from Twitter using Python

Hi Guys!

Today, we will be projecting an analytics storyline based on streaming data from twitter’s developer account.

I want to make sure that this solely for educational purposes & no data analysis has provided to any agency or third-party apps. So, when you are planning to use this API, make sure that you strictly follow these rules.

In order to create a streaming channel from Twitter, you need to create one developer account.

As I’m a huge soccer fan, I would like to refer to one soccer place on Twitter for this. In this case, we’ll be checking BA Analytics for this.

6. Origin_Site

Please find the steps to create one developer account –

Step -1: 

You have to go to the following link. Over there you need to submit the request in order to create the account. You need to provide proper justification as to why you need that account. I’m not going into those forms. They are self-explanatory.

Once, your developer account activated, you need to click the following link as shown below –

1. TwitterSetup

Once you clicked that, the program will lead to you the following page –

2. TwitterSetup - Continue

If you don’t have any app, the first page will look something like the above page.

Step 2:

3. TwiterSetup - Continue

Now, you need to fill-up the following details. For security reasons, I’ll be hiding sensitive data here.

Step 3:

4. TwitterSetUp - Continue

After creating that, you need to go to the next tab i.e. key’s & tokens. The initial screen will only have Consumer API keys.

Step 4:

To generate the Access token, you need to click the create button from the above screenshot & then the new page will look like this –

5. TwitterSetUp - Continue

Our program will be using all these pieces of information.

So, now we’re ready for our Python program.

In order to access Twitter API through python, you need to install the following package –

pip install python-twitter

Let’s see the directory structure –

7. Directory

Let’s check only the relevant scripts here. We’re not going to discuss the clsL.py as we’ve already discussed. Please refer to the old post.

1. clsConfig.py (This script contains all the parameters of the server.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'ACCESS_TOKEN': '99999999-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
        'ACCESS_SECRET': 'YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY',
        'CONSUMER_KEY': "aaaaaaaaaaaaaaaaaaaaaaa",
        'CONSUMER_SECRET': 'HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH',
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

For security reasons, I’ve removed the original keys with dummy keys. You have to fill-up your own keys.

2. clsTwitter.py (This script will fetch data from Twitter API & process the same & send it to the calling method.)

###############################################
#### Written By: SATYAKI DE                ####
#### Written On: 12-Oct-2019               ####
#### Modified On 12-Oct-2019               ####
####                                       ####
#### Objective: Main class fetching sample ####
#### data from Twitter API.                ####
###############################################

import twitter
from clsConfig import clsConfig as cf
import json
import re
import string
import logging

class clsTwitter:
    def __init__(self):
        self.access_token = cf.config['ACCESS_TOKEN']
        self.access_secret = cf.config['ACCESS_SECRET']
        self.consumer_key = cf.config['CONSUMER_KEY']
        self.consumer_secret = cf.config['CONSUMER_SECRET']

    def find_element(self, srcJson, key):
        """Pull all values of specified key from nested JSON."""
        arr = []

        def fetch(srcJson, arr, key):
            """Recursively search for values of key in JSON tree."""
            if isinstance(srcJson, dict):
                for k, v in srcJson.items():
                    if isinstance(v, (dict, list)):
                        fetch(v, arr, key)
                    elif k == key:
                        arr.append(v)
            elif isinstance(srcJson, list):
                for item in srcJson:
                    fetch(item, arr, key)
            return arr

        finJson = fetch(srcJson, arr, key)
        return finJson

    def searchQry(self, rawQry):
        try:
            fin_dict = {}
            finJson = ''
            res = ''
            cnt = 0

            # Parameters to invoke Twitter API
            ACCESS_TOKEN = self.access_token
            ACCESS_SECRET = self.access_secret
            CONSUMER_KEY = self.consumer_key
            CONSUMER_SECRET = self.consumer_secret

            tmpR20 = 'Raw Query: ' + str(rawQry)
            logging.info(tmpR20)

            finJson = '['

            if rawQry == '':
                print('No data to proceed!')
                logging.info('No data to proceed!')
            else:
                t = twitter.Api(
                                  consumer_key = CONSUMER_KEY,
                                  consumer_secret = CONSUMER_SECRET,
                                  access_token_key = ACCESS_TOKEN,
                                  access_token_secret = ACCESS_SECRET
                               )

                response = t.GetSearch(raw_query=rawQry)
                print('Total Records fetched:', str(len(response)))

                for i in response:

                    # Converting them to json
                    data = str(i)
                    res_json = json.loads(data)

                    # Calling individual key
                    id = res_json['id']
                    tmpR19 = 'Id: ' + str(id)
                    logging.info(tmpR19)

                    try:
                        f_count = res_json['quoted_status']['user']['followers_count']
                    except:
                        f_count = 0
                    tmpR21 = 'Followers Count: ' + str(f_count)
                    logging.info(tmpR21)

                    try:
                        r_count = res_json['quoted_status']['retweet_count']
                    except:
                        r_count = 0
                    tmpR22 = 'Retweet Count: ' + str(r_count)
                    logging.info(tmpR22)

                    text = self.find_element(res_json, 'text')

                    for j in text:
                        strF = re.sub(f'[^{re.escape(string.printable)}]', '', str(j))
                        pat = re.compile(r'[\t\n]')
                        strG = pat.sub("", strF)
                        res = "".join(strG)

                    # Forming return dictionary
                    #fin_dict.update({id:'id', f_count: 'followerCount', r_count: 'reTweetCount', res: 'msgPost'})
                    if cnt == 0:
                        finJson = finJson + '{"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'
                    else:
                        finJson = finJson + ', {"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'

                    cnt += 1

            finJson = finJson + ']'

            jdata = json.dumps(finJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails' : x}

            return ResJson

The key lines from this snippet are as follows –

def find_element(self, srcJson, key):
    """Pull all values of specified key from nested JSON."""
    arr = []

    def fetch(srcJson, arr, key):
        """Recursively search for values of key in JSON tree."""
        if isinstance(srcJson, dict):
            for k, v in srcJson.items():
                if isinstance(v, (dict, list)):
                    fetch(v, arr, key)
                elif k == key:
                    arr.append(v)
        elif isinstance(srcJson, list):
            for item in srcJson:
                fetch(item, arr, key)
        return arr

    finJson = fetch(srcJson, arr, key)
    return finJson

This function will check against a specific key & based on that it will search from the supplied JSON & returns the value. This would be particularly very useful when you don’t have any fixed position of your elements.

t = twitter.Api(
                  consumer_key = CONSUMER_KEY,
                  consumer_secret = CONSUMER_SECRET,
                  access_token_key = ACCESS_TOKEN,
                  access_token_secret = ACCESS_SECRET
               )

response = t.GetSearch(raw_query=rawQry)

In this case, Python application will receive the JSON response using the new Twitter API.

id = res_json['id']
try:
    f_count = res_json['quoted_status']['user']['followers_count']
except:
    f_count = 0
try:
    r_count = res_json['quoted_status']['retweet_count']
except:
    r_count = 0

Fetching specific fixed position elements from the response API.

text = self.find_element(res_json, 'text')

Fetching the dynamic position based element using our customized function.

for j in text:
    strF = re.sub(f'[^{re.escape(string.printable)}]', '', str(j))
    pat = re.compile(r'[\t\n]')
    strG = pat.sub("", strF)
    res = "".join(strG)

Removing non-printable characters & white spaces from the extracted text field in order to get clean data.

if cnt == 0:
    finJson = finJson + '{"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'
else:
    finJson = finJson + ', {"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'

Finally, generating a JSON string dynamically.

jdata = json.dumps(finJson)
ResJson = json.loads(jdata)

And, returning the JSON to our calling program.

3. callTwitterAPI.py (This is the main script that will invoke the Twitter API & then project the analytic report based on the available Twitter data.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
#### Modified On 12-Oct-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsTwitter as ct

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def getMaximumFollower(df):
    try:
        d1 = df['followerCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = 0

        return dt_part1

def getMaximumRetweet(df):
    try:
        d1 = df['reTweetCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedTwitter.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        # Query using parameters
        rawQry = 'q=from%3ABlades_analytic&src=typd'

        x1 = ct.clsTwitter()
        ret_2 = x1.searchQry(rawQry)

        # Capturing the JSON Payload
        res = json.loads(ret_2)

        # Converting dictionary to Pandas Dataframe
        df_ret = p.read_json(ret_2, orient='records')

        # Resetting the column orders as per JSON
        df_ret = df_ret[list(res[0].keys())]

        l.logr('1.Twitter_' + var + '.csv', debug_ind, df_ret, 'log')

        print('Realtime Twitter Data:: ')
        logging.info('Realtime Twitter Data:: ')
        print(df_ret)
        print()

        # Checking execution status
        ret_val_2 = df_ret.shape[0]

        if ret_val_2 == 0:
            print("Twitter hasn't returned any rows. Please check your queries!")
            logging.info("Twitter hasn't returned any rows. Please check your queries!")
            print("*" * 157)
            logging.info(tmpR0)
        else:
            print("Successfuly row feteched!")
            logging.info("Successfuly row feteched!")
            print("*" * 157)
            logging.info(tmpR0)

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)

        # Performing Basic Aggregate
        # 1. Find the user who has maximum Followers
        df_ret['MaxFollower'] = getMaximumFollower(df_ret)

        # 2. Find the user who has maximum Re-Tweets
        df_ret['MaxTweet'] = getMaximumRetweet(df_ret)

        # Getting Status
        df_MaxFollower = df_ret[(df_ret['followerCount'] == df_ret['MaxFollower'])]

        # Dropping Columns
        df_MaxFollower.drop(['reTweetCount'], axis=1, inplace=True)
        df_MaxFollower.drop(['MaxTweet'], axis=1, inplace=True)

        l.logr('2.Twitter_Maximum_Follower_' + var + '.csv', debug_ind, df_MaxFollower, 'log')

        print('Maximum Follower:: ')
        print(df_MaxFollower)
        print("*" * 157)
        logging.info(tmpR0)

        df_MaxTwitter = df_ret[(df_ret['reTweetCount'] == df_ret['MaxTweet'])]
        print()

        # Dropping Columns
        df_MaxTwitter.drop(['followerCount'], axis=1, inplace=True)
        df_MaxTwitter.drop(['MaxFollower'], axis=1, inplace=True)

        l.logr('3.Twitter_Maximum_Retweet_' + var + '.csv', debug_ind, df_MaxTwitter, 'log')

        print('Maximum Re-Twitt:: ')
        print(df_MaxTwitter)
        print("*" * 157)
        logging.info(tmpR0)

        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError:
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

And, here are the key lines –

x1 = ct.clsTwitter()
ret_2 = x1.searchQry(rawQry)

Our application is instantiating the newly developed class.

# Capturing the JSON Payload
res = json.loads(ret_2)

# Converting dictionary to Pandas Dataframe
df_ret = p.read_json(ret_2, orient='records')

# Resetting the column orders as per JSON
df_ret = df_ret[list(res[0].keys())]

Converting the JSON to pandas dataframe for our analytic data point.

def getMaximumFollower(df):
    try:
        d1 = df['followerCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = 0

        return dt_part1

def getMaximumRetweet(df):
    try:
        d1 = df['reTweetCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

These two functions declared above in the calling script are generating the maximum data point from the Re-Tweet & Followers from our returned dataset.

# Getting Status
df_MaxFollower = df_ret[(df_ret['followerCount'] == df_ret['MaxFollower'])]

And, this is the way, our application will fetch the maximum twitter dataset –

df_MaxTwitter = df_ret[(df_ret['reTweetCount'] == df_ret['MaxTweet'])]

And, you can customize your output by dropping unwanted columns in the specific dataset.

And, here is the output on Windows, which looks like –

8. WindowsRun

And, here is the windows log directory –

WindowsRunLog

So, we’ve achieved our target data point.

So, we’ll come out with another exciting post in the coming days!

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.

Explaining New Python Library – Regular Expression in JSON

Hi Guys!

As discussed, here is the continuation of the previous post. We’ll explain the regular expression from the library that I’ve created recently.

First, let me share the calling script for regular expression –

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 08-Sep-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from dnpr.clsDnpr import clsDnpr
import datetime as dt
import json

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

def main():
    try:
        # Initializing the class
        t = clsDnpr()
        
        srcJson = [
                    {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
                    {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
                    {"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
                    {"FirstName": "Archi", "LastName": "Bose", "Sal": 7000},
                    {"FirstName": "Deb", "LastName": "Sen", "Sal": 9500}
                  ]

        print("4. Checking regular expression functionality!")
        print()

        var13 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var13))

        print('::Function Regex_Like:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Rexex_Like: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))

        # Invoking the distinct function
        tarJson = t.regex_like(srcJson, tarColumn, inpPattern)

        print('End of Function Regex_Like!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var14 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var14))

        var15 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var15))

        print('::Function Regex_Replace:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Rexex_Replace: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))
        replaceString = 'Ka'
        print('Replacing Character: ', replaceString)

        # Invoking the distinct function
        tarJson = t.regex_replace(srcJson, tarColumn, inpPattern, replaceString)

        print('End of Function Rexex_Replace!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var16 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var16))

        var17 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var17))

        print('::Function Regex_Substr:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Regex_Substr: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))

        # Invoking the distinct function
        tarJson = t.regex_substr(srcJson, tarColumn, inpPattern)

        print('End of Function Regex_Substr!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var18 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var18))

        print("=" * 157)
        print("End of regular expression function!")
        print("=" * 157)



    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

As per the library, we’ll discuss the following functionalities –

  1. regex_like
  2. regex_replace
  3. regex_substr

Now, let us check how to call these functions.

1. regex_like:

Following is the base skeleton in order to invoke this function –

regex_like(Input Json, Target Column, Pattern To Match) return Output Json

Here are the key lines in the script –

srcJson = [
            {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
            {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
            {"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
            {"FirstName": "Archi", "LastName": "Bose", "Sal": 7000},
            {"FirstName": "Deb", "LastName": "Sen", "Sal": 9500}
          ]

# Invoking the distinct function
tarJson = t.regex_like(srcJson, tarColumn, inpPattern)

2. regex_replace:

Following is the base skeleton in order to invoke this function –

regex_replace(Input Json, Target Column, Pattern to Replace) return Output Json

Here are the key lines in the script –

tarColumn = 'FirstName'
print('Target Column for Rexex_Replace: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))
replaceString = 'Ka'
print('Replacing Character: ', replaceString)

# Invoking the distinct function
tarJson = t.regex_replace(srcJson, tarColumn, inpPattern, replaceString)

As you can see, here ‘Sa’ with ‘Ka’ provided it matches the specific pattern in the JSON.

3. regex_replace:

Following is the base skeleton in order to invoke this function –

regex_substr(Input Json, Target Column, Pattern to substring) return Output Json

Here are the key lines –

tarColumn = 'FirstName'
print('Target Column for Regex_Substr: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))

# Invoking the distinct function
tarJson = t.regex_substr(srcJson, tarColumn, inpPattern)

In this case, we’ve subtracted a part of the JSON string & return the final result as JSON.

Let us first see the sample input JSON –

SourceJSON_Regex

Let us check how it looks when we run the calling script –

  • regex_like:
Regex_Like

This function will retrieve the elements, which will start with ‘Sa‘. As a result, we’ll see the following two elements in the Payload.

  • regex_replace:
Regex_Replace

In this case, we’re replacing any string which starts with ‘Sa‘ & replaced with the ‘Ka‘.

  • regex_substr:
Regex_Substr

As you can see that the first element FirstName changed the name from “Satyaki” to “tyaki“.

So, finally, we’ve achieved our target.

I’ll post the next exciting concept very soon.

Till then! Happy Avenging! 😀

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.