RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

The following are the important packages that are essential to this project –

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from flask import Flask, jsonify, request, session
from flask_cors import CORS
from werkzeug.security import check_password_hash, generate_password_hash
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
import pandas as pd
from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsContentScrapper as csc
import clsRAGOpenAI as crao
import csv
from datetime import timedelta
import os
import re
import json

########################################################
################    Global Area   ######################
########################################################
#Initiating Logging Instances
clog = log.clsL()

admin_key = cf.conf['ADMIN_KEY']
secret_key = cf.conf['SECRET_KEY']
session_path = cf.conf['SESSION_PATH']
sessionFile = cf.conf['SESSION_CACHE_FILE']

app = Flask(__name__)
CORS(app)  # This will enable CORS for all routes
app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
app.secret_key = secret_key

jwt = JWTManager(app)

users = cf.conf['USER_NM']
passwd = cf.conf['USER_PWD']

cCScrapper = csc.clsContentScrapper()
cr = crao.clsRAGOpenAI()

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Define the aggregation functions
def join_unique(series):
    unique_vals = series.drop_duplicates().astype(str)
    return ', '.join(filter(lambda x: x != 'nan', unique_vals))

# Building the preaggregate cache
def groupImageWiki():
    try:
        base_path = cf.conf['OUTPUT_PATH']
        inputFile = cf.conf['CLEANED_FILE']
        outputFile = cf.conf['CLEANED_FILE_SHORT']
        subdir = cf.conf['SUBDIR_OUT']
        Ind = cf.conf['DEBUG_IND']

        inputCleanedFileLookUp = base_path + inputFile

        #Opening the file in dataframe
        df = pd.read_csv(inputCleanedFileLookUp)
        hash_values = df['Total_Hash'].unique()

        dFin = df[['primaryImage','Wiki_URL','Total_Hash']]

        # Ensure columns are strings and not NaN
        # Convert columns to string and replace 'nan' with an empty string
        dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
        dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')

        dFin.drop_duplicates()

        # Group by 'Total_Hash' and aggregate
        dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()

        return dfAgg

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df

resDf = groupImageWiki()

########################################################
################  End  Global Area  ####################
########################################################

def extractRemoveUrls(hash_value):
    image_urls = ''
    wiki_urls = ''
    # Parse the inner message JSON string
    try:

        resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
        filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]

        if not filtered_df.empty:
            image_urls = filtered_df['primaryImage'].values[0]
            wiki_urls = filtered_df['Wiki_URL'].values[0]

        return image_urls, wiki_urls

    except Exception as e:
        x = str(e)
        print('extractRemoveUrls Error: ', x)
        return image_urls, wiki_urls

def isIncomplete(line):
    """Check if a line appears to be incomplete."""

    # Check if the line ends with certain patterns indicating it might be incomplete.
    incomplete_patterns = [': [Link](', ': Approximately ', ': ']
    return any(line.endswith(pattern) for pattern in incomplete_patterns)

def filterData(data):
    """Return only the complete lines from the data."""

    lines = data.split('\n')
    complete_lines = [line for line in lines if not isIncomplete(line)]

    return '\n'.join(complete_lines)

def updateCounter(sessionFile):
    try:
        counter = 0

        # Check if the CSV file exists
        if os.path.exists(sessionFile):
            with open(sessionFile, 'r') as f:
                reader = csv.reader(f)
                for row in reader:
                    # Assuming the counter is the first value in the CSV
                    counter = int(row[0])

        # Increment counter
        counter += 1

        # Write counter back to CSV
        with open(sessionFile, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([counter])

        return counter
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

def getPreviousResult():
    try:
        fullFileName = session_path + sessionFile
        newCounterValue = updateCounter(fullFileName)

        return newCounterValue
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username', None)
    password = request.json.get('password', None)

    print('User Name: ', str(username))
    print('Password: ', str(password))

    #if username not in users or not check_password_hash(users.get(username), password):
    if ((username not in users) or (password not in passwd)):
        return jsonify({'login': False}), 401

    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

@app.route('/chat', methods=['POST'])
def get_chat():
    try:
        #session["key"] = "1D98KI"
        #session_id = session.sid
        #print('Session Id: ', str(session_id))

        cnt = getPreviousResult()
        print('Running Session Count: ', str(cnt))

        username = request.json.get('username', None)
        message = request.json.get('message', None)

        print('User: ', str(username))
        print('Content: ', str(message))

        if cnt == 1:
            retList = cCScrapper.extractCatalog()
        else:
            hashValue, cleanedData = cr.getData(str(message))
            print('Main Hash Value:', str(hashValue))

            imageUrls, wikiUrls = extractRemoveUrls(hashValue)
            print('Image URLs: ', str(imageUrls))
            print('Wiki URLs: ', str(wikiUrls))
            print('Clean Text:')
            print(str(cleanedData))
            retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'

        response = {
            'message': retList
        }

        print('JSON: ', str(response))
        return jsonify(response)

    except Exception as e:
        x = str(e)

        response = {
            'message': 'Error: ' + x
        }
        return jsonify(response)

@app.route('/api/data', methods=['GET'])
@jwt_required()
def get_data():
    response = {
        'message': 'Hello from Flask!'
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

Let us understand some of the important sections of the above script –

Function – login():

The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

Function – get_chat():

The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

Function – updateCounter():

The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

Function – extractRemoveUrls():

The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

  • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 27-May-2023                     ####
#### Modified On 28-May-2023                     ####
####                                             ####
#### Objective: This is the main calling         ####
#### python class that will invoke the           ####
#### LangChain of package to extract             ####
#### the transcript from the YouTube videos &    ####
#### then answer the questions based on the      ####
#### topics selected by the users.               ####
####                                             ####
#####################################################

from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)

from googleapiclient.discovery import build

import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf

import os

from flask import jsonify
import requests

###############################################
###           Global Section                ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)

YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###    End of Global Section                ###
###############################################

class clsContentScrapper:
    def __init__(self):
        self.model_name = cf.conf['MODEL_NAME']
        self.temp_val = cf.conf['TEMP_VAL']
        self.max_cnt = int(cf.conf['MAX_CNT'])
        self.url = cf.conf['BASE_URL']
        self.header_token = cf.conf['HEADER_TOKEN']

    def extractCatalog(self):
        try:
            base_url = self.url
            header_token = self.header_token

            url = base_url + '/departments'

            print('Full URL: ', str(url))

            payload={}
            headers = {'Cookie': header_token}

            response = requests.request("GET", url, headers=headers, data=payload)

            x = response.text

            return x
        except Exception as e:
            discussedTopic = []
            x = str(e)
            print('Error: ', x)

            return x

Let us understand the the core part that require from this class.

Function – extractCatalog():

The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

  • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai

from clsConfigClient import clsConfigClient as cf
import clsL as log

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

import os
import re
###############################################
###           Global Section                ###
###############################################
Ind = cf.conf['DEBUG_IND']
queryModel = cf.conf['QUERY_MODEL']
passageModel = cf.conf['PASSAGE_MODEL']

#Initiating Logging Instances
clog = log.clsL()

os.environ["TOKENIZERS_PARALLELISM"] = "false"

vectorDBFileName = cf.conf['VECTORDB_FILE_NM']

indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

print('File: ', str(indexFile))
print('Config: ', str(indexConfig))

# Also, provide `config_path` parameter if you set it when calling the `save()` method:
new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)

# Initialize Retriever
retriever = DensePassageRetriever(document_store=new_document_store,
                                  query_embedding_model=queryModel,
                                  passage_embedding_model=passageModel,
                                  use_gpu=False)


###############################################
###    End of Global Section                ###
###############################################

class clsRAGOpenAI:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.fileName = cf.conf['FILE_NAME']
        self.Ind = cf.conf['DEBUG_IND']
        self.subdir = str(cf.conf['OUT_DIR'])
        self.base_url = cf.conf['BASE_URL']
        self.outputPath = cf.conf['OUTPUT_PATH']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.openAIKey = cf.conf['OPEN_AI_KEY']
        self.temp = cf.conf['TEMP_VAL']
        self.modelName = cf.conf['MODEL_NAME']
        self.maxToken = cf.conf['MAX_TOKEN']

    def extractHash(self, text):
        try:
            # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
            pattern = r"Ref: \{'(\d+)'\}"
            match = re.search(pattern, text)

            if match:
                return match.group(1)
            else:
                return None
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return None

    def removeSentencesWithNaN(self, text):
        try:
            # Split text into sentences using regular expression
            sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
            # Filter out sentences containing 'nan'
            filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
            # Rejoin the sentences
            return ' '.join(filteredSentences)
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return ''

    def retrieveDocumentsReader(self, question, top_k=9):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrieved_docs, question):
        try:
            openai.api_key = self.openAIKey
            temp = self.temp
            modelName = self.modelName
            maxToken = self.maxToken

            documentsText = " ".join([doc.content for doc in retrieved_docs])

            filteredDocs = self.removeSentencesWithNaN(documentsText)
            hashValue = self.extractHash(filteredDocs)

            print('RAG Docs:: ')
            print(filteredDocs)
            #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"

            # Set up a chat-style prompt with your data
            messages = [
                {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                {"role": "user", "content": filteredDocs}
            ]

            # Chat style invoking the latest model
            response = openai.ChatCompletion.create(
                model=modelName,
                messages=messages,
                temperature = temp,
                max_tokens=maxToken
            )
            return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
        except Exception as e:
            x = str(e)
            print('failed to get from OpenAI: ', x)
            return 'Not Available!'

    def ragAnswerWithHaystackAndGPT3(self, question):
        retrievedDocs = self.retrieveDocumentsReader(question)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def getData(self, strVal):
        try:
            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            print('Response from New Docs: ')
            print()

            hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)

            print('GPT3 Answer::')
            print(answer)
            print('Hash Value:')
            print(str(hashValue))

            print('*'*240)
            print('End Of Use RAG to Generate Answers:')
            print('*'*240)

            return hashValue, answer
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            answer = x
            hashValue = 1

            return hashValue, answer

Let us understand some of the important block –

Function – ragAnswerWithHaystackAndGPT3():

The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

Function – generateAnswerWithGPT3():

The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

Function – retrieveDocumentsReader():

The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

  • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
// App.js
import React, { useState } from 'react';
import axios from 'axios';
import './App.css';

const App = () => {
  const [isLoggedIn, setIsLoggedIn] = useState(false);
  const [username, setUsername] = useState('');
  const [password, setPassword] = useState('');
  const [message, setMessage] = useState('');
  const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);

  const handleLogin = async (e) => {
    e.preventDefault();
    try {
      const response = await axios.post('http://localhost:5000/login', { username, password });
      if (response.status === 200) {
        setIsLoggedIn(true);
      }
    } catch (error) {
      console.error('Login error:', error);
    }
  };

  const sendMessage = async (username) => {
    if (message.trim() === '') return;

    // Create a new chat entry
    const newChatEntry = {
      sender: 'user',
      message: message.trim(),
    };

    // Clear the input field
    setMessage('');

    try {
      // Make API request to Python-based API
      const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
      const responseData = response.data;

      // Print the response to the console for debugging
      console.log('API Response:', responseData);

      // Parse the nested JSON from the 'message' attribute
      const jsonData = JSON.parse(responseData.message);

      // Check if the data contains 'departments'
      if (jsonData.departments) {

        // Extract the 'departments' attribute from the parsed data
        const departments = jsonData.departments;

        // Extract the department names and create a single string with line breaks
        const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
      }
      else if (jsonData.records)
      {
        // Data structure 2: Artwork information
        const records = jsonData.records;

        // Prepare chat entries
        const chatEntries = [];

        // Iterate through records and extract text, image, and wiki information
        records.forEach((record) => {
          const textInfo = Object.entries(record).map(([key, value]) => {
            if (key !== 'Image' && key !== 'Wiki') {
              return `${key}: ${value}`;
            }
            return null;
          }).filter((info) => info !== null).join('\n');

          const imageLink = record.Image;
          //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
          //const wikiLinks = record.Wiki;
          const wikiLinks = record.Wiki.split(',').map(link => link.trim());

          console.log('Wiki:', wikiLinks);

          // Check if there is a valid image link
          const hasValidImage = imageLink && imageLink !== '[]';

          const imageElement = hasValidImage ? (
            <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
          ) : null;

          // Create JSX elements for rendering the wiki links (if available)
          const wikiElements = wikiLinks.map((link, index) => (
            <div key={index}>
              <a href={link} target="_blank" rel="noopener noreferrer">
                Wiki Link {index + 1}
              </a>
            </div>
          ));

          if (textInfo) {
            chatEntries.push({ sender: 'bot', message: textInfo });
          }

          if (imageElement) {
            chatEntries.push({ sender: 'bot', message: imageElement });
          }

          if (wikiElements.length > 0) {
            chatEntries.push({ sender: 'bot', message: wikiElements });
          }
        });

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
      }

    } catch (error) {
      console.error('Error sending message:', error);
    }
  };

  if (!isLoggedIn) {
    return (
      <div className="login-container">
        <h2>Welcome to the MuBot</h2>
        <form onSubmit={handleLogin} className="login-form">
          <input
            type="text"
            placeholder="Enter your name"
            value={username}
            onChange={(e) => setUsername(e.target.value)}
            required
          />
          <input
            type="password"
            placeholder="Enter your password"
            value={password}
            onChange={(e) => setPassword(e.target.value)}
            required
          />
          <button type="submit">Login</button>
        </form>
      </div>
    );
  }

  return (
    <div className="chat-container">
      <div className="chat-header">
        <h2>Hello, {username}</h2>
        <h3>Chat with MuBot</h3>
      </div>
      <div className="chat-log">
        {chatLog.map((chatEntry, index) => (
          <div
            key={index}
            className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
          >
            <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
            <p className="chat-message">{chatEntry.message}</p>
          </div>
        ))}
      </div>
      <div className="chat-input">
        <input
          type="text"
          placeholder="Type your message..."
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter') {
              sendMessage();
            }
          }}
        />
        <button onClick={sendMessage}>Send</button>
      </div>
    </div>
  );
};

export default App;

Please find some of the important logic –

Function – handleLogin():

The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

Function – sendMessage():

The sendMessage asynchronous function is designed to handle the user’s chat interaction:

  1. If the message is empty (after trimming spaces), the function exits without further action.
  2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
  3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
  4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
  5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
  6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
  7. Errors, if encountered, are logged to the console.

This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Enable OpenAI chatbot with the selected YouTube video content using LangChain, FAISS & YouTube data-API.

Today, I’m very excited to demonstrate an effortless & new way to extract the transcript from YouTube videos & then answer the questions based on the topics selected by the users. In this post, I plan to deal with the user inputs to consider the case first & then it can summarize the video content through useful advanced analytics with the help of the LangChain & OpenAI-based model.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.
Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it very exciting? This will lead to a whole new ballgame, where one can get critical decision-making information from these human sources along with their traditional advanced analytical data.

How will it help?

Let’s say as per your historical data & analytics, the dashboard is recommending prod-A, prod-B & prod-C as the top three products for potential top-performing brands. Whereas, you are getting some alerts from the TV news on prod-B due to the recent incidents. So, in that case, you don’t want to continue with the prod-B investment. You may find a new product named prod-Z. That may reduce the risk of your investment.


What is LangChain?

LangChain is a framework for developing applications powered by language models. We believe that the most powerful and differentiated applications will not only call out to a language model but will also be:

  1. Data-aware: connect a language model to other sources of data
  2. Agentic: allow a language model to interact with its environment

The LangChain framework works around these principles.

To know more about this, please click the following link.

As you can see, this is one of the critical components in our solution, which will bind the OpenAI bot & it will feed the necessary data to provide the correct response.


What is FAISS?

Faiss is a library for efficient similarity search and clustering of dense vectors. It contains algorithms that search in sets of vectors of any size, up to ones that do not fit in RAM. It also has supporting code for evaluation and parameter tuning.

Faiss developed using C++ with complete wrappers for Python—some of the most beneficial algorithms available both on CPU & in GPU as well. Facebook AI Research develops it.

To know more about this, please click the following link.


FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

Here are the steps that will follow in sequence –

  • The application will first get the topic on which it needs to look from YouTube & find the top 5 videos using the YouTube data-API.
  • Once the application returns a list of websites from the above step, LangChain will drive the application will extract the transcripts from the video & then optimize the response size in smaller chunks to address the costly OpenAI calls. During this time, it will invoke FAISS to create document DBs.
  • Finally, it will send those chunks to OpenAI for the best response based on your supplied template that performs the final analysis with small data required for your query & gets the appropriate response with fewer costs.

CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-May-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal OpenAI-based video content ####
#### enable bot. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'LangChain Demo!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Output.csv',
'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
'TITLE': "LangChain Demo!",
'TEMP_VAL': 0.2,
'PATH' : Curr_Path,
'MAX_CNT' : 5,
'OUT_DIR': 'data'
}

Some of the key entries from the above scripts are as follows –

'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
'TEMP_VAL': 0.2,

From the above code snippet, one can understand that we need both the API keys for YouTube & OpenAI. And they have separate costs & usage, which I’ll share later in the post. Also, notice that the temperature sets to 0.2 ( range between 0 to 1). That means our AI bot will be consistent in response. And our application will use the GPT-3.5-turbo model for its analytic response.

  • clsTemplate.py (Contains all the templates for OpenAI.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On: 28-May-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the template for ####
#### OpenAI prompts to get the correct ####
#### response. ####
#### ####
################################################
# Template to use for the system message prompt
templateVal_1 = """
You are a helpful assistant that that can answer questions about youtube videos
based on the video's transcript: {docs}
Only use the factual information from the transcript to answer the question.
If you feel like you don't have enough information to answer the question, say "I don't know".
Your answers should be verbose and detailed.
"""

view raw

clsTemplate.py

hosted with ❤ by GitHub

The above code is self-explanatory. Here, we’re keeping the correct instructions for our OpenAI to respond within these guidelines.

  • clsVideoContentScrapper.py (Main class to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On 28-May-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python class that will invoke the ####
#### LangChain of package to extract ####
#### the transcript from the YouTube videos & ####
#### then answer the questions based on the ####
#### topics selected by the users. ####
#### ####
#####################################################
from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts.chat import (
ChatPromptTemplate,
SystemMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from googleapiclient.discovery import build
import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf
import os
###############################################
### Global Section ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)
YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### End of Global Section ###
###############################################
class clsVideoContentScrapper:
def __init__(self):
self.model_name = cf.conf['MODEL_NAME']
self.temp_val = cf.conf['TEMP_VAL']
self.max_cnt = int(cf.conf['MAX_CNT'])
def createDBFromYoutubeVideoUrl(self, video_url):
try:
loader = YoutubeLoader.from_youtube_url(video_url)
transcript = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
docs = text_splitter.split_documents(transcript)
db = FAISS.from_documents(docs, embeddings)
return db
except Exception as e:
x = str(e)
print('Error: ', x)
return ''
def getResponseFromQuery(self, db, query, k=4):
try:
"""
gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
the number of tokens to analyze.
"""
mod_name = self.model_name
temp_val = self.temp_val
docs = db.similarity_search(query, k=k)
docs_page_content = " ".join([d.page_content for d in docs])
chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)
# Template to use for the system message prompt
template = ct.templateVal_1
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
# Human question prompt
human_template = "Answer the following question: {question}"
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
chat_prompt = ChatPromptTemplate.from_messages(
[system_message_prompt, human_message_prompt]
)
chain = LLMChain(llm=chat, prompt=chat_prompt)
response = chain.run(question=query, docs=docs_page_content)
response = response.replace("\n", "")
return response, docs
except Exception as e:
x = str(e)
print('Error: ', x)
return '', ''
def topFiveURLFromYouTube(self, service, **kwargs):
try:
video_urls = []
channel_list = []
results = service.search().list(**kwargs).execute()
for item in results['items']:
print("Title: ", item['snippet']['title'])
print("Description: ", item['snippet']['description'])
channel = item['snippet']['channelId']
print("Channel Id: ", channel)
# Fetch the channel name using the channel ID
channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
channel_title = channel_response['items'][0]['snippet']['title']
print("Channel Title: ", channel_title)
channel_list.append(channel_title)
print("Video Id: ", item['id']['videoId'])
vidURL = "https://www.youtube.com/watch?v=&quot; + item['id']['videoId']
print("Video URL: " + vidURL)
video_urls.append(vidURL)
print("\n")
return video_urls, channel_list
except Exception as e:
video_urls = []
channel_list = []
x = str(e)
print('Error: ', x)
return video_urls, channel_list
def extractContentInText(self, topic, query):
try:
discussedTopic = []
strKeyText = ''
cnt = 0
max_cnt = self.max_cnt
urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
print('Returned List: ')
print(urlList)
print()
for video_url in urlList:
print('Processing Video: ')
print(video_url)
db = self.createDBFromYoutubeVideoUrl(video_url)
response, docs = self.getResponseFromQuery(db, query)
if len(response) > 0:
strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
discussedTopic.append(strKeyText + response)
cnt += 1
return discussedTopic
except Exception as e:
discussedTopic = []
x = str(e)
print('Error: ', x)
return discussedTopic

Let us understand the key methods step by step in detail –

def topFiveURLFromYouTube(self, service, **kwargs):
    try:
        video_urls = []
        channel_list = []
        results = service.search().list(**kwargs).execute()

        for item in results['items']:
            print("Title: ", item['snippet']['title'])
            print("Description: ", item['snippet']['description'])
            channel = item['snippet']['channelId']
            print("Channel Id: ", channel)

            # Fetch the channel name using the channel ID
            channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
            channel_title = channel_response['items'][0]['snippet']['title']
            print("Channel Title: ", channel_title)
            channel_list.append(channel_title)

            print("Video Id: ", item['id']['videoId'])
            vidURL = "https://www.youtube.com/watch?v=" + item['id']['videoId']
            print("Video URL: " + vidURL)
            video_urls.append(vidURL)
            print("\n")

        return video_urls, channel_list

    except Exception as e:
        video_urls = []
        channel_list = []
        x = str(e)
        print('Error: ', x)

        return video_urls, channel_list

The above code will fetch the most relevant YouTube URLs & bind them into a list along with the channel names & then share the lists with the main functions.

def createDBFromYoutubeVideoUrl(self, video_url):
    try:
        loader = YoutubeLoader.from_youtube_url(video_url)
        transcript = loader.load()

        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
        docs = text_splitter.split_documents(transcript)

        db = FAISS.from_documents(docs, embeddings)
        return db

    except Exception as e:
        x = str(e)
        print('Error: ', x)
        return ''

The provided Python code defines a function createDBFromYoutubeVideoUrl which appears to create a database of text documents from the transcript of a YouTube video. Here’s the explanation in simple English:

  1. The function createDBFromYoutubeVideoUrl has defined with one argument: video_url.
  2. The function uses a try-except block to handle any potential exceptions or errors that may occur.
  3. Inside the try block, the following steps are going to perform:
  • First, it creates a YoutubeLoader object from the provided video_url. This object is likely responsible for interacting with the YouTube video specified by the URL.
  • The loader object then loads the transcript of the video. This object is the text version of everything spoken in the video.
  • It then creates a RecursiveCharacterTextSplitter object with a specified chunk_size of 1000 and chunk_overlap of 100. This object may split the transcript into smaller chunks (documents) of text for easier processing or analysis. Each piece will be around 1000 characters long, and there will overlap of 100 characters between consecutive chunks.
  • The split_documents method of the text_splitter object will split the transcript into smaller documents. These documents are stored in the docs variable.
  • The FAISS.from_documents method is then called with docs and embeddings as arguments to create a FAISS (Facebook AI Similarity Search) index. This index is a database used for efficient similarity search and clustering of high-dimensional vectors, which in this case, are the embeddings of the documents. The FAISS index is stored in the db variable.
  • Finally, the db variable is returned, representing the created database from the video transcript.

4. If an exception occurs during the execution of the try block, the code execution moves to the except block:

  • Here, it first converts the exception e to a string x.
  • Then it prints an error message.
  • Finally, it returns an empty string as an indication of the error.

def getResponseFromQuery(self, db, query, k=4):
      try:
          """
          gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
          the number of tokens to analyze.
          """

          mod_name = self.model_name
          temp_val = self.temp_val

          docs = db.similarity_search(query, k=k)
          docs_page_content = " ".join([d.page_content for d in docs])

          chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)

          # Template to use for the system message prompt
          template = ct.templateVal_1

          system_message_prompt = SystemMessagePromptTemplate.from_template(template)

          # Human question prompt
          human_template = "Answer the following question: {question}"
          human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)

          chat_prompt = ChatPromptTemplate.from_messages(
              [system_message_prompt, human_message_prompt]
          )

          chain = LLMChain(llm=chat, prompt=chat_prompt)

          response = chain.run(question=query, docs=docs_page_content)
          response = response.replace("\n", "")
          return response, docs

      except Exception as e:
          x = str(e)
          print('Error: ', x)

          return '', ''

The Python function getResponseFromQuery is designed to search a given database (db) for a specific query and then generate a response using a language model (possibly GPT-3.5-turbo). The answer is based on the content found and the particular question. Here is a simple English summary:

  1. The function getResponseFromQuery takes three parameters: db, query, and k. The k parameter is optional and defaults to 4 if not provided. db is the database to search, the query is the question or prompts to analyze, and k is the number of similar items to return.
  2. The function initiates a try-except block for handling any errors that might occur.
  3. Inside the try block:
  • The function retrieves the model name and temperature value from the instance of the class this function is a part of.
  • The function then searches the db database for documents similar to the query and saves these in docs.
  • It concatenates the content of the returned documents into a single string docs_page_content.
  • It creates a ChatOpenAI object with the model name and temperature value.
  • It creates a system message prompt from a predefined template.
  • It creates a human message prompt, which is the query.
  • It combines these two prompts to form a chat prompt.
  • An LLMChain object is then created using the ChatOpenAI object and the chat prompt.
  • This LLMChain object is used to generate a response to the query using the content of the documents found in the database. The answer is then formatted by replacing all newline characters with empty strings.
  • Finally, the function returns this response along with the original documents.
  1. If any error occurs during these operations, the function goes to the except block where:
  • The error message is printed.
  • The function returns two empty strings to indicate an error occurred, and no response or documents could be produced.

def extractContentInText(self, topic, query):
    try:
        discussedTopic = []
        strKeyText = ''
        cnt = 0
        max_cnt = self.max_cnt

        urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
        print('Returned List: ')
        print(urlList)
        print()

        for video_url in urlList:
            print('Processing Video: ')
            print(video_url)
            db = self.createDBFromYoutubeVideoUrl(video_url)

            response, docs = self.getResponseFromQuery(db, query)

            if len(response) > 0:
                strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
                discussedTopic.append(strKeyText + response)

            cnt += 1

        return discussedTopic
    except Exception as e:
        discussedTopic = []
        x = str(e)
        print('Error: ', x)

        return discussedTopic

This Python function, extractContentInText, is aimed to extract relevant content from the transcripts of top YouTube videos on a specific topic and generate responses to a given query. Here’s a simple English translation:

  1. The function extractContentInText is defined with topic and query as parameters.
  2. It begins with a try-except block to catch and handle any possible exceptions.
  3. In the try block:
  • It initializes several variables: an empty list discussedTopic to store the extracted information, an empty string strKeyText to keep specific parts of the content, a counter cnt initialized at 0, and max_cnt retrieved from the self-object to specify the maximum number of YouTube videos to consider.
  • It calls the topFiveURLFromYouTube function (defined previously) to get the URLs of the top videos on the given topic from YouTube. It also retrieves the list of channel names associated with these videos.
  • It prints the returned list of URLs.
  • Then, it starts a loop over each URL in the urlList.
    • For each URL, it prints the URL, then creates a database from the transcript of the YouTube video using the function createDBFromYoutubeVideoUrl.
    • It then uses the getResponseFromQuery function to get a response to the query based on the content of the database.
    • If the length of the response is greater than 0 (meaning there is a response), it forms a string strKeyText to indicate the channel that the topic was discussed on and then appends the answer to this string. This entire string is then added to the discussedTopic list.
    • It increments the counter cnt by one after each iteration.
    • Finally, it returns the discussedTopic list, which now contains relevant content extracted from the videos.
  1. If any error occurs during these operations, the function goes into the except block:
  • It first resets discussedTopic to an empty list.
  • Then it converts the exception e to a string and prints the error message.
  • Lastly, it returns the empty discussedTopic list, indicating that no content could be extracted due to the error.
  • testLangChain.py (Main Python script to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On 28-May-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsVideoContentScrapper class to extract ####
#### the transcript from the YouTube videos. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import textwrap
import clsVideoContentScrapper as cvsc
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
cVCScrapper = cvsc.clsVideoContentScrapper()
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
#query = "What are they saying about Microsoft?"
print('Please share your topic!')
inputTopic = input('User: ')
print('Please ask your questions?')
inputQry = input('User: ')
print()
retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
cnt = 0
for discussedTopic in retList:
finText = str(cnt + 1) + ') ' + discussedTopic
print()
print(textwrap.fill(finText, width=150))
cnt += 1
r1 = len(retList)
if r1 > 0:
print()
print('Successfully Scrapped!')
else:
print()
print('Failed to Scrappe!')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Please find the key snippet –

def main():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        #query = "What are they saying about Microsoft?"
        print('Please share your topic!')
        inputTopic = input('User: ')
        print('Please ask your questions?')
        inputQry = input('User: ')
        print()

        retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
        cnt = 0

        for discussedTopic in retList:
            finText = str(cnt + 1) + ') ' + discussedTopic
            print()
            print(textwrap.fill(finText, width=150))

            cnt += 1

        r1 = len(retList)

        if r1 > 0:
            print()
            print('Successfully Scrapped!')
        else:
            print()
            print('Failed to Scrappe!')

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == "__main__":
    main()

The above main application will capture the topics from the user & then will give the user a chance to ask specific questions on the topics, invoking the main class to extract the transcript from YouTube & then feed it as a source using ChainLang & finally deliver the response. If there is no response, then it will skip the overall options.

USAGE & COST FACTOR:

Please find the OpenAI usage –

Please find the YouTube API usage –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! ðŸ™‚

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality. Sample video taken from Santrel Media & you would find the link over here.

Documenting undocumented python scripts using Python-OpenAI

Today, I will discuss another very impressive & innovative new AI, which is now operational in Python. We’ll document a dummy python code with no comment captured through OpenAI’s ChatGPT model. But before we start, don’t we see the demo first?

Demo

Great! Let us understand we can leverage this by writing a tiny snippet using this new AI model.

Architecture:

Let us understand the flow of events –

The above diagram represents the newly released OpenAI ChatGPT, where one needs to supply the code, which was missed to capture the logic earlier due to whatever may be the reasons. We need to provide these scripts (maybe in parts) as source code to be analyzed. Then it will use this new model & translate that into English-like language & capture the logic/comments for that specific snippet.


Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install pandas
pip install openai

To know more, please click the below – “Continue Reading” link –

Continue reading “Documenting undocumented python scripts using Python-OpenAI”

Predicting ball movement from live sports using Open-CV Python & Kalman filter

Today, I’m going to discuss another Computer Vision installment. I’ll use Open CV & Kalman filter to predict a live ball movement of Cricket, one of the most popular sports in the Indian sub-continent, along with the UK & Australia. But before we start a deep dive, why don’t we first watch the demo?

Demo

Isn’t it exciting? Let’s explore it in detail.


Architecture:

Let us understand the flow of events –

The above diagram shows that the application, which uses Open CV, analyzes individual frames. It detects the cricket ball & finally, it tracks every movement by analyzing each frame & then it predicts (pink line) based on the supplied data points.


Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install opencv-python
pip install numpy
pip install cvzone

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsPredictBodyLine.py (The main class that will handle the prediction of Cricket balls in the real-time video feed.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 20-Nov-2022 ####
#### Modified On 30-Nov-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsPredictBodyLine class to initiate ####
#### the prediction capability in real-time ####
#### & display the result from a live sports. ####
#####################################################
import cv2
import cvzone
from cvzone.ColorModule import ColorFinder
from clsKalmanFilter import clsKalmanFilter
from clsConfigClient import clsConfigClient as cf
import numpy as np
import math
import ssl
import time
# Bypassing SSL Authentication
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
# Legacy python that doesn't verify HTTPS certificates by default
pass
else:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### Global Section ###
###############################################
# Load Kalman filter to predict the trajectory
kf = clsKalmanFilter()
# Create the color ColorFinder
myColorFinder = ColorFinder(False)
posListX = []
posListY = []
xList = [item for item in range(0, 1300)]
prediction=False
###############################################
### End of Global Section ###
###############################################
class clsPredictBodyLine(object):
def __init__(self):
self.inputFile_1 = str(cf.conf['BASE_FILE'])
self.inputFile_2 = str(cf.conf['BASE_IMAGE_FILE'])
self.src_path = str(cf.conf['SRC_PATH'])
self.hsvVals = cf.conf['HSV']
self.pauseTime = cf.conf['PAUSE']
self.pT1 = int(cf.conf['POINT_1'])
self.pT2 = int(cf.conf['POINT_2'])
self.pT3 = int(cf.conf['POINT_3'])
self.pT4 = int(cf.conf['POINT_4'])
def predStream(self, img, hsvVals, FrNo):
try:
pT1 = self.pT1
pT2 = self.pT2
pT3 = self.pT3
pT4 = self.pT4
#Find the color ball
imgColor, mask = myColorFinder.update(img, hsvVals)
#Find location of the red_ball
imgContours, contours = cvzone.findContours(img, mask, minArea=500)
if contours:
posListX.append(contours[0]['center'][0])
posListY.append(contours[0]['center'][1])
if posListX:
# Find the Coefficients
A, B, C = np.polyfit(posListX, posListY, 2)
for i, (posX, posY) in enumerate(zip(posListX, posListY)):
pos = (posX, posY)
cv2.circle(imgContours, pos, 10, (0,255,0), cv2.FILLED)
# Using Karman Filter Prediction
predicted = kf.predict(posX, posY)
cv2.circle(imgContours, (predicted[0], predicted[1]), 12, (255,0,255), cv2.FILLED)
ballDetectFlag = True
if ballDetectFlag:
print('Balls Detected!')
if i == 0:
cv2.line(imgContours, pos, pos, (0,255,0), 5)
cv2.line(imgContours, predicted, predicted, (255,0,255), 5)
else:
predictedM = kf.predict(posListX[i1], posListY[i1])
cv2.line(imgContours, pos, (posListX[i1], posListY[i1]), (0,255,0), 5)
cv2.line(imgContours, predicted, predictedM, (255,0,255), 5)
if len(posListX) < 10:
# Calculation for best place to ball
a1 = A
b1 = B
c1 = C pT1
X1 = int(( b1 math.sqrt(b1**2 (4*a1*c1)))/(2*a1))
prediction1 = pT2 < X1 < pT3
a2 = A
b2 = B
c2 = C pT4
X2 = int(( b2 math.sqrt(b2**2 (4*a2*c2)))/(2*a2))
prediction2 = pT2 < X2 < pT3
prediction = prediction1 | prediction2
if prediction:
print('Good Length Ball!')
sMsg = "Good Length Ball – (" + str(FrNo) + ")"
cvzone.putTextRect(imgContours, sMsg, (50,150), scale=5, thickness=5, colorR=(0,200,0), offset=20)
else:
print('Loose Ball!')
sMsg = "Loose Ball – (" + str(FrNo) + ")"
cvzone.putTextRect(imgContours, sMsg, (50,150), scale=5, thickness=5, colorR=(0,0,200), offset=20)
return imgContours
except Exception as e:
x = str(e)
print('Error predStream:', x)
return img
def processVideo(self, debugInd, var):
try:
cnt = 0
lastRowFlag=True
breakFlag = False
pauseTime = self.pauseTime
src_path = self.src_path
inputFile_1 = self.inputFile_1
inputFile_2 = self.inputFile_2
hsvVals = self.hsvVals
FileName_1 = src_path + inputFile_1
FileName_2 = src_path + inputFile_2
# Initialize the video
cap = cv2.VideoCapture(FileName_1)
while True:
try:
if breakFlag:
break
# Grab the frames
success, img = cap.read()
time.sleep(pauseTime)
cnt+=1
print('*'*60)
print('Frame Number:', str(cnt))
if (cv2.waitKey(1) & 0xFF) == ord("q"):
break
if success:
imgContours = self.predStream(img, hsvVals, cnt)
if imgContours is None:
imgContours = img
imgColor = cv2.resize(imgContours, (0,0), None, 0.7, 0.7)
# Display
cv2.imshow("ImageColor", imgColor)
print('*'*60)
else:
breakFlag=True
except Exception as e:
x = str(e)
print('Error Main:', x)
cv2.destroyAllWindows()
return 0
except Exception as e:
x = str(e)
print('Error:', x)
cv2.destroyAllWindows()
return 1

Please find the key snippet from the above script –

kf = clsKalmanFilter()

The application is instantiating the modified Kalman filter.

myColorFinder = ColorFinder(False)

This command has more purpose than creating a proper mask in debug mode if you want to isolate the color of any object you want to track. To debug this property, one needs to set the flag to True. And you will see the following screen. Click the next video to get the process to generate the accurate HSV.

In the end, you will get a similar entry to the below one –

And you can see the entry that is available in the config for the following parameter –

'HSV': {'hmin': 173, 'smin':177, 'vmin':57, 'hmax':178, 'smax':255, 'vmax':255},

The next important block is –

def predStream(self, img, hsvVals, FrNo):
    try:
        pT1 = self.pT1
        pT2 = self.pT2
        pT3 = self.pT3
        pT4 = self.pT4

The four points mentioned above will help us determine the best region for the ball, forcing the batsman to play the shots & a 90% chance of getting caught behind.


The snippets below will apply the mask & identify the contour of the objects which the program intends to track. In this case, we are talking about the pink cricket ball.

#Find the color ball
imgColor, mask = myColorFinder.update(img, hsvVals)

#Find location of the red_ball
imgContours, contours = cvzone.findContours(img, mask, minArea=500)

if contours:
    posListX.append(contours[0]['center'][0])
    posListY.append(contours[0]['center'][1])

The next key snippets are as follows –

if posListX:
    # Find the Coefficients
    A, B, C = np.polyfit(posListX, posListY, 2)

    for i, (posX, posY) in enumerate(zip(posListX, posListY)):
        pos = (posX, posY)
        cv2.circle(imgContours, pos, 10, (0,255,0), cv2.FILLED)

        # Using Karman Filter Prediction
        predicted = kf.predict(posX, posY)
        cv2.circle(imgContours, (predicted[0], predicted[1]), 12, (255,0,255), cv2.FILLED)

        ballDetectFlag = True
        if ballDetectFlag:
            print('Balls Detected!')

        if i == 0:
            cv2.line(imgContours, pos, pos, (0,255,0), 5)
            cv2.line(imgContours, predicted, predicted, (255,0,255), 5)
        else:
            predictedM = kf.predict(posListX[i-1], posListY[i-1])

            cv2.line(imgContours, pos, (posListX[i-1], posListY[i-1]), (0,255,0), 5)
            cv2.line(imgContours, predicted, predictedM, (255,0,255), 5)

The above lines will track the original & predicted lines & then it will plot on top of the frame in real time.

The next line will be as follows –

if len(posListX) < 10:

    # Calculation for best place to ball
    a1 = A
    b1 = B
    c1 = C - pT1

    X1 = int((- b1 - math.sqrt(b1**2 - (4*a1*c1)))/(2*a1))
    prediction1 = pT2 < X1 < pT3

    a2 = A
    b2 = B
    c2 = C - pT4

    X2 = int((- b2 - math.sqrt(b2**2 - (4*a2*c2)))/(2*a2))
    prediction2 = pT2 < X2 < pT3

    prediction = prediction1 | prediction2

if prediction:
    print('Good Length Ball!')
    sMsg = "Good Length Ball - (" + str(FrNo) + ")"
    cvzone.putTextRect(imgContours, sMsg, (50,150), scale=5, thickness=5, colorR=(0,200,0), offset=20)
else:
    print('Loose Ball!')
    sMsg = "Loose Ball - (" + str(FrNo) + ")"
    cvzone.putTextRect(imgContours, sMsg, (50,150), scale=5, thickness=5, colorR=(0,0,200), offset=20)
  • predictBodyLine.py (The main python script that will invoke the class to predict Cricket balls in the real-time video feed.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Nov-2022 ####
#### Modified On 30-Nov-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsPredictBodyLine class to initiate ####
#### the predict capability in real-time ####
#### from a cricket (Sports) streaming. ####
#####################################################
# We keep the setup code in a different class as shown below.
import clsPredictBodyLine as pbdl
from clsConfigClient import clsConfigClient as cf
import datetime
import logging
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'predBodyLine.log', level=logging.INFO)
print('Started predicting best bodyline deliveries from the Cricket Streaming!')
# Passing source data csv file
x1 = pbdl.clsPredictBodyLine()
# Execute all the pass
r1 = x1.processVideo(debugInd, var)
if (r1 == 0):
print('Successfully predicted body-line deliveries!')
else:
print('Failed to predict body-line deliveries!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Here is the final key snippet –

# Passing source data csv file
x1 = pbdl.clsPredictBodyLine()

# Execute all the pass
r1 = x1.processVideo(debugInd, var)

if (r1 == 0):
    print('Successfully predicted body-line deliveries!')
else:
    print('Failed to predict body-line deliveries!')

The above lines will first instantiate the main class & then invoke it.

You can find it here if you want to know more about the Kalman filter.

So, finally, we’ve done it.


FOLDER STRUCTURE:

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! ðŸ™‚

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Realtime reading from a Streaming using Computer Vision

This week we’re going to extend one of our earlier posts & trying to read an entire text from streaming using computer vision. If you want to view the previous post, please click the following link.

But, before we proceed, why don’t we view the demo first?

Demo

Architecture:

Let us understand the architecture flow –

Architecture flow

The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & extracts the complete text within the video & displays it on top of the target screen besides prints the same in the console.

Python Packages:

pip install imutils==0.5.4
pip install matplotlib==3.5.2
pip install numpy==1.21.6
pip install opencv-contrib-python==4.6.0.66
pip install opencv-contrib-python-headless==4.6.0.66
pip install opencv-python==4.6.0.66
pip install opencv-python-headless==4.6.0.66
pip install pandas==1.3.5
pip install Pillow==9.1.1
pip install pytesseract==0.3.9
pip install python-dateutil==2.8.2

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsReadingTextFromStream.py (This is the main class of python script that will extract the text from the WebCAM streaming in real-time.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jul-2022 ####
#### Modified On 25-Jul-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will invoke the ####
#### extraction of texts from a WebCAM. ####
#### ####
##################################################
# Importing necessary packages
from clsConfig import clsConfig as cf
from imutils.object_detection import non_max_suppression
import numpy as np
import pytesseract
import imutils
import time
import cv2
import time
###############################################
### Global Section ###
###############################################
# Two output layer names for the text detector model
lNames = cf.conf['LAYER_DET']
# Tesseract OCR text param values
strVal = "-l " + str(cf.conf['LANG']) + " –oem " + str(cf.conf['OEM_VAL']) + " –psm " + str(cf.conf['PSM_VAL']) + ""
config = (strVal)
###############################################
### End of Global Section ###
###############################################
class clsReadingTextFromStream:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.CacheL = int(cf.conf['CACHE_LIM'])
self.modelPath = str(cf.conf['MODEL_PATH']) + str(cf.conf['MODEL_FILE_NAME'])
self.minConf = float(cf.conf['MIN_CONFIDENCE'])
self.wt = int(cf.conf['WIDTH'])
self.ht = int(cf.conf['HEIGHT'])
self.pad = float(cf.conf['PADDING'])
self.title = str(cf.conf['TITLE'])
self.Otitle = str(cf.conf['ORIG_TITLE'])
self.drawTag = cf.conf['DRAW_TAG']
self.aRange = int(cf.conf['ASCII_RANGE'])
self.sParam = cf.conf['SUBTRACT_PARAM']
def findBoundBox(self, boxes, res, rW, rH, orig, origW, origH, pad):
try:
# Loop over the bounding boxes
for (spX, spY, epX, epY) in boxes:
# Scale the bounding box coordinates based on the respective
# ratios
spX = int(spX * rW)
spY = int(spY * rH)
epX = int(epX * rW)
epY = int(epY * rH)
# To obtain a better OCR of the text we can potentially
# apply a bit of padding surrounding the bounding box.
# And, computing the deltas in both the x and y directions
dX = int((epX spX) * pad)
dY = int((epY spY) * pad)
# Apply padding to each side of the bounding box, respectively
spX = max(0, spX dX)
spY = max(0, spY dY)
epX = min(origW, epX + (dX * 2))
epY = min(origH, epY + (dY * 2))
# Extract the actual padded ROI
roi = orig[spY:epY, spX:epX]
# Choose the proper OCR Config
text = pytesseract.image_to_string(roi, config=config)
# Add the bounding box coordinates and OCR'd text to the list
# of results
res.append(((spX, spY, epX, epY), text))
# Sort the results bounding box coordinates from top to bottom
res = sorted(res, key=lambda r:r[0][1])
return res
except Exception as e:
x = str(e)
print(x)
return res
def predictText(self, imgScore, imgGeo):
try:
minConf = self.minConf
# Initializing the bounding box rectangles & confidence score by
# extracting the rows & columns from the imgScore volume.
(numRows, numCols) = imgScore.shape[2:4]
rects = []
confScore = []
for y in range(0, numRows):
# Extract the imgScore probabilities to derive potential
# bounding box coordinates that surround text
imgScoreData = imgScore[0, 0, y]
xVal0 = imgGeo[0, 0, y]
xVal1 = imgGeo[0, 1, y]
xVal2 = imgGeo[0, 2, y]
xVal3 = imgGeo[0, 3, y]
anglesData = imgGeo[0, 4, y]
for x in range(0, numCols):
# If our score does not have sufficient probability,
# ignore it
if imgScoreData[x] < minConf:
continue
# Compute the offset factor as our resulting feature
# maps will be 4x smaller than the input frame
(offX, offY) = (x * 4.0, y * 4.0)
# Extract the rotation angle for the prediction and
# then compute the sin and cosine
angle = anglesData[x]
cos = np.cos(angle)
sin = np.sin(angle)
# Derive the width and height of the bounding box from
# imgGeo
h = xVal0[x] + xVal2[x]
w = xVal1[x] + xVal3[x]
# Compute both the starting and ending (x, y)-coordinates
# for the text prediction bounding box
epX = int(offX + (cos * xVal1[x]) + (sin * xVal2[x]))
epY = int(offY (sin * xVal1[x]) + (cos * xVal2[x]))
spX = int(epX w)
spY = int(epY h)
# Adding bounding box coordinates and probability score
# to the respective lists
rects.append((spX, spY, epX, epY))
confScore.append(imgScoreData[x])
# return a tuple of the bounding boxes and associated confScore
return (rects, confScore)
except Exception as e:
x = str(e)
print(x)
rects = []
confScore = []
return (rects, confScore)
def processStream(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
CacheL = self.CacheL
modelPath = self.modelPath
minConf = self.minConf
wt = self.wt
ht = self.ht
pad = self.pad
title = self.title
Otitle = self.Otitle
drawTag = self.drawTag
aRange = self.aRange
sParam = self.sParam
val = 0
# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] Starting video stream…")
cap = cv2.VideoCapture(0)
# Loading the pre-trained text detector
print("[INFO] Loading Text Detector…")
net = cv2.dnn.readNet(modelPath)
# Loop over the frames from the video stream
while True:
try:
# Grab the frame from our video stream and resize it
success, frame = cap.read()
orig = frame.copy()
(origH, origW) = frame.shape[:2]
# Setting new width and height and then determine the ratio in change
# for both the width and height
(newW, newH) = (wt, ht)
rW = origW / float(newW)
rH = origH / float(newH)
# Resize the frame and grab the new frame dimensions
frame = cv2.resize(frame, (newW, newH))
(H, W) = frame.shape[:2]
# Construct a blob from the frame and then perform a forward pass of
# the model to obtain the two output layer sets
blob = cv2.dnn.blobFromImage(frame, 1.0, (W, H), sParam, swapRB=True, crop=False)
net.setInput(blob)
(confScore, imgGeo) = net.forward(lNames)
# Decode the predictions, then apply non-maxima suppression to
# suppress weak, overlapping bounding boxes
(rects, confidences) = self.predictText(confScore, imgGeo)
boxes = non_max_suppression(np.array(rects), probs=confidences)
# Initialize the list of results
res = []
# Getting BoundingBox boundaries
res = self.findBoundBox(boxes, res, rW, rH, orig, origW, origH, pad)
for ((spX, spY, epX, epY), text) in res:
# Display the text OCR by using Tesseract APIs
print("Reading Text::")
print("=" *60)
print(text)
print("=" *60)
# Removing the non-ASCII text so it can draw the text on the frame
# using OpenCV, then draw the text and a bounding box surrounding
# the text region of the input frame
text = "".join([c if ord(c) < aRange else "" for c in text]).strip()
output = orig.copy()
cv2.rectangle(output, (spX, spY), (epX, epY), drawTag, 2)
cv2.putText(output, text, (spX, spY 20), cv2.FONT_HERSHEY_SIMPLEX, 1.2, drawTag, 3)
# Show the output frame
cv2.imshow(title, output)
#cv2.imshow(Otitle, frame)
# If the `q` key was pressed, break from the loop
if cv2.waitKey(1) == ord('q'):
break
val = 0
except Exception as e:
x = str(e)
print(x)
val = 1
# Performing cleanup at the end
cap.release()
cv2.destroyAllWindows()
return val
except Exception as e:
x = str(e)
print('Error:', x)
return 1

Please find the key snippet from the above script –

# Two output layer names for the text detector model

lNames = cf.conf['LAYER_DET']

# Tesseract OCR text param values

strVal = "-l " + str(cf.conf['LANG']) + " --oem " + str(cf.conf['OEM_VAL']) + " --psm " + str(cf.conf['PSM_VAL']) + ""
config = (strVal)

The first line contains the two output layers’ names for the text detector model. Among them, the first one indicates the outcome possibilities & the second one use to derive the bounding box coordinates of the predicted text.

The second line contains various options for the tesseract APIs. You need to understand the opportunities in detail to make them work. These are the essential options for our use case –

  • Language – The intended language, for example, English, Spanish, Hindi, Bengali, etc.
  • OEM flag – In this case, the application will use 4 to indicate LSTM neural net model for OCR.
  • OEM Value – In this case, the selected value is 7, indicating that the application treats the ROI as a single line of text.

For more details, please refer to the config file.

print("[INFO] Loading Text Detector...")
net = cv2.dnn.readNet(modelPath)

The above lines bring the already created model & load it to memory for evaluation.

# Setting new width and height and then determine the ratio in change
# for both the width and height
(newW, newH) = (wt, ht)
rW = origW / float(newW)
rH = origH / float(newH)

# Resize the frame and grab the new frame dimensions
frame = cv2.resize(frame, (newW, newH))
(H, W) = frame.shape[:2]

# Construct a blob from the frame and then perform a forward pass of
# the model to obtain the two output layer sets
blob = cv2.dnn.blobFromImage(frame, 1.0, (W, H), sParam, swapRB=True, crop=False)
net.setInput(blob)
(confScore, imgGeo) = net.forward(lNames)

# Decode the predictions, then apply non-maxima suppression to
# suppress weak, overlapping bounding boxes
(rects, confidences) = self.predictText(confScore, imgGeo)
boxes = non_max_suppression(np.array(rects), probs=confidences)

The above lines are more of preparing individual frames to get the bounding box by resizing the height & width followed by a forward pass of the model to obtain two output layer sets. And then apply the non-maxima suppression to remove the weak, overlapping bounding box by interpreting the prediction. In short, this will identify the potential text region & put the bounding box surrounding it.

# Initialize the list of results
res = []

# Getting BoundingBox boundaries
res = self.findBoundBox(boxes, res, rW, rH, orig, origW, origH, pad)

The above function will create the bounding box surrounding the predicted text regions. Also, we will capture the expected text inside the result variable.

for (spX, spY, epX, epY) in boxes:
  # Scale the bounding box coordinates based on the respective
  # ratios
  spX = int(spX * rW)
  spY = int(spY * rH)
  epX = int(epX * rW)
  epY = int(epY * rH)

  # To obtain a better OCR of the text we can potentially
  # apply a bit of padding surrounding the bounding box.
  # And, computing the deltas in both the x and y directions
  dX = int((epX - spX) * pad)
  dY = int((epY - spY) * pad)

  # Apply padding to each side of the bounding box, respectively
  spX = max(0, spX - dX)
  spY = max(0, spY - dY)
  epX = min(origW, epX + (dX * 2))
  epY = min(origH, epY + (dY * 2))

  # Extract the actual padded ROI
  roi = orig[spY:epY, spX:epX]

Now, the application will scale the bounding boxes based on the previously computed ratio for actual text recognition. In this process, the application also padded the bounding boxes & then extracted the padded region of interest.

# Choose the proper OCR Config
text = pytesseract.image_to_string(roi, config=config)

# Add the bounding box coordinates and OCR'd text to the list
# of results
res.append(((spX, spY, epX, epY), text))

Using OCR options, the application extracts the text within the video frame & adds that to the res list.

# Sort the results bounding box coordinates from top to bottom
res = sorted(res, key=lambda r:r[0][1])

It then sends a sorted output to the primary calling functions.

for ((spX, spY, epX, epY), text) in res:
  # Display the text OCR by using Tesseract APIs
  print("Reading Text::")
  print("=" *60)
  print(text)
  print("=" *60)

  # Removing the non-ASCII text so it can draw the text on the frame
  # using OpenCV, then draw the text and a bounding box surrounding
  # the text region of the input frame
  text = "".join([c if ord(c) < aRange else "" for c in text]).strip()
  output = orig.copy()

  cv2.rectangle(output, (spX, spY), (epX, epY), drawTag, 2)
  cv2.putText(output, text, (spX, spY - 20), cv2.FONT_HERSHEY_SIMPLEX, 1.2, drawTag, 3)

  # Show the output frame
  cv2.imshow(title, output)

Finally, it fetches the potential text region along with the text & then prints on top of the source video. Also, it removed some non-printable characters during this time to avoid any cryptic texts.

  • readingVideo.py (Main calling script.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jul-2022 ####
#### Modified On 25-Jul-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsReadingTextFromStream class to initiate ####
#### the reading capability in real-time ####
#### & display text via Web-CAM. ####
#####################################################
# We keep the setup code in a different class as shown below.
import clsReadingTextFromStream as rtfs
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the main class
x1 = rtfs.clsReadingTextFromStream()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'readingTextFromVideo.log', level=logging.INFO)
print('Started reading text from videos!')
# Execute all the pass
r1 = x1.processStream(debugInd, var)
if (r1 == 0):
print('Successfully read text from the Live Stream!')
else:
print('Failed to read text from the Live Stream!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

view raw

readingVideo.py

hosted with ❤ by GitHub

Please find the key snippet –

# Instantiating all the main class

x1 = rtfs.clsReadingTextFromStream()

# Execute all the pass
r1 = x1.processStream(debugInd, var)

if (r1 == 0):
    print('Successfully read text from the Live Stream!')
else:
    print('Failed to read text from the Live Stream!')

The above lines instantiate the main calling class & then invoke the function to get the desired extracted text from the live streaming video if that is successful.

FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

You will get the complete codebase in the following Github link.

Unfortunately, I cannot upload the model due to it’s size. I will share on the need basis.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Real-time augmented reality (AR) using Python-based Computer Vision

Hi Team,

Today, I’m going to discuss another Computer Vision installment. I’ll discuss how to implement Augmented Reality using Open-CV Computer Vision with full audio. We will be using part of a Bengali OTT Series called “Feludar Goendagiri” entirely for educational purposes & also as a tribute to the great legendary director, late Satyajit Roy. To know more about him, please click the following link.

Why don’t we see the demo first before jumping into the technical details?


Demo

Architecture:

Let us understand the architecture –

Process Flow

The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & blends that with the video trailer. Finally, it creates another video by correctly mixing the source audio.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install opencv-python
pip install pygame

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsAugmentedReality.py (This is the main class of python script that will embed the source video with the WebCAM streams in real-time.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 20-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will embed the source ####
#### video with the WebCAM streams in ####
#### real-time. ####
##################################################
# Importing necessary packages
import numpy as np
import cv2
from clsConfig import clsConfig as cf
# Initialize our cached reference points
CACHED_REF_PTS = None
class clsAugmentedReality:
def __init__(self):
self.TOP_LEFT_X = int(cf.conf['TOP_LEFT_X'])
self.TOP_LEFT_Y = int(cf.conf['TOP_LEFT_Y'])
self.TOP_RIGHT_X = int(cf.conf['TOP_RIGHT_X'])
self.TOP_RIGHT_Y = int(cf.conf['TOP_RIGHT_Y'])
self.BOTTOM_RIGHT_X = int(cf.conf['BOTTOM_RIGHT_X'])
self.BOTTOM_RIGHT_Y = int(cf.conf['BOTTOM_RIGHT_Y'])
self.BOTTOM_LEFT_X = int(cf.conf['BOTTOM_LEFT_X'])
self.BOTTOM_LEFT_Y = int(cf.conf['BOTTOM_LEFT_Y'])
def getWarpImages(self, frame, source, cornerIDs, arucoDict, arucoParams, zoomFlag, useCache=False):
try:
# Assigning values
TOP_LEFT_X = self.TOP_LEFT_X
TOP_LEFT_Y = self.TOP_LEFT_Y
TOP_RIGHT_X = self.TOP_RIGHT_X
TOP_RIGHT_Y = self.TOP_RIGHT_Y
BOTTOM_RIGHT_X = self.BOTTOM_RIGHT_X
BOTTOM_RIGHT_Y = self.BOTTOM_RIGHT_Y
BOTTOM_LEFT_X = self.BOTTOM_LEFT_X
BOTTOM_LEFT_Y = self.BOTTOM_LEFT_Y
# Grab a reference to our cached reference points
global CACHED_REF_PTS
if source is None:
raise
# Grab the width and height of the frame and source image,
# respectively
# Extracting Frame from Camera
# Exracting Source from Video
(imgH, imgW) = frame.shape[:2]
(srcH, srcW) = source.shape[:2]
# Detect Aruco markers in the input frame
(corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)
print('Ids: ', str(ids))
print('Rejected: ', str(rejected))
# if we *did not* find our four ArUco markers, initialize an
# empty IDs list, otherwise flatten the ID list
print('Detecting Corners: ', str(len(corners)))
ids = np.array([]) if len(corners) != 4 else ids.flatten()
# Initialize our list of reference points
refPts = []
refPtTL1 = []
# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
# Grab the index of the corner with the current ID
j = np.squeeze(np.where(ids == i))
# If we receive an empty list instead of an integer index,
# then we could not find the marker with the current ID
if j.size == 0:
continue
# Otherwise, append the corner (x, y)-coordinates to our list
# of reference points
corner = np.squeeze(corners[j])
refPts.append(corner)
# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
# If we are allowed to use cached reference points, fall
# back on them
if useCache and CACHED_REF_PTS is not None:
refPts = CACHED_REF_PTS
# Otherwise, we cannot use the cache and/or there are no
# previous cached reference points, so return early
else:
return None
# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
CACHED_REF_PTS = refPts
# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)
# For zoom option recalculating all the 4 points
refPtTL1_L_X = refPtTL[0][0]TOP_LEFT_X
refPtTL1_L_Y = refPtTL[0][1]TOP_LEFT_Y
refPtTL1.append((refPtTL1_L_X,refPtTL1_L_Y))
refPtTL1_R_X = refPtTL[1][0]+TOP_RIGHT_X
refPtTL1_R_Y = refPtTL[1][1]+TOP_RIGHT_Y
refPtTL1.append((refPtTL1_R_X,refPtTL1_R_Y))
refPtTD1_L_X = refPtTL[2][0]+BOTTOM_RIGHT_X
refPtTD1_L_Y = refPtTL[2][1]+BOTTOM_RIGHT_Y
refPtTL1.append((refPtTD1_L_X,refPtTD1_L_Y))
refPtTD1_R_X = refPtTL[3][0]BOTTOM_LEFT_X
refPtTD1_R_Y = refPtTL[3][1]+BOTTOM_LEFT_Y
refPtTL1.append((refPtTD1_R_X,refPtTD1_R_Y))
dstMatMod = [refPtTL1[0], refPtTL1[1], refPtTL1[2], refPtTL1[3]]
dstMatMod = np.array(dstMatMod)
# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])
# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
(H, _) = cv2.findHomography(srcMat, dstMat)
else:
(H, _) = cv2.findHomography(srcMat, dstMatMod)
warped = cv2.warpPerspective(source, H, (imgW, imgH))
# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source
# image into the destination)
mask = np.zeros((imgH, imgW), dtype="uint8")
if zoomFlag == 1:
cv2.fillConvexPoly(mask, dstMat.astype("int32"), (255, 255, 255), cv2.LINE_AA)
else:
cv2.fillConvexPoly(mask, dstMatMod.astype("int32"), (255, 255, 255), cv2.LINE_AA)
# This optional step will give the source image a black
# border surrounding it when applied to the source image, you
# can apply a dilation operation
rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.dilate(mask, rect, iterations=2)
# Create a three channel version of the mask by stacking it
# depth-wise, such that we can copy the warped source image
# into the input image
maskScaled = mask.copy() / 255.0
maskScaled = np.dstack([maskScaled] * 3)
# Copy the warped source image into the input image by
# (1) Multiplying the warped image and masked together,
# (2) Then multiplying the original input image with the
# mask (giving more weight to the input where there
# are not masked pixels), and
# (3) Adding the resulting multiplications together
warpedMultiplied = cv2.multiply(warped.astype("float"), maskScaled)
imageMultiplied = cv2.multiply(frame.astype(float), 1.0 maskScaled)
output = cv2.add(warpedMultiplied, imageMultiplied)
output = output.astype("uint8")
# Return the output frame to the calling function
return output
except Exception as e:
# Delibarately raising the issue
# That way the control goes to main calling methods
# exception section
raise

Please find the key snippet from the above script –

(imgH, imgW) = frame.shape[:2]
(srcH, srcW) = source.shape[:2]

# Detect Aruco markers in the input frame
(corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)

Identifying the Aruco markers are key here. The above lines help the program detect all four corners.

However, let us discuss more on the Aruco markers & strategies that I’ve used for several different surfaces.

As you can see, the right-hand side Aruco marker is tiny compared to the left one. Hence, that one will be ideal for a curve surface like Coffee Mug, Bottle rather than a flat surface.

Also, we’ve demonstrated the zoom capability with the smaller Aruco marker that will Augment almost double the original surface area.

Let us understand why we need that; as you know, any spherical surface like a bottle is round-shaped. Hence, detecting relatively more significant Aruco markers in four corners will be difficult for any camera to identify.

Hence, we need a process where close four corners can be extrapolated mathematically to relatively larger projected areas easily detectable by any WebCAM.

Let’s observe the following figure –

Simulated Extrapolated corners

As you can see that the original position of the four corners is represented using the following points, i.e., (x1, y1), (x2, y2), (x3, y3) & (x4, y4).

And these positions are very close to each other. Hence, it will be easier for the camera to detect all the points (like a plain surface) without many retries.

And later, you can add specific values of x & y to them to get the derived four corners as shown in the above figures through the following points, i.e. (x1.1, y1.1), (x2.1, y2.1), (x3.1, y3.1) & (x4.1, y4.1).

# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
  # Grab the index of the corner with the current ID
  j = np.squeeze(np.where(ids == i))

  # If we receive an empty list instead of an integer index,
  # then we could not find the marker with the current ID
  if j.size == 0:
    continue

  # Otherwise, append the corner (x, y)-coordinates to our list
  # of reference points
  corner = np.squeeze(corners[j])
  refPts.append(corner)

# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
  # If we are allowed to use cached reference points, fall
  # back on them
  if useCache and CACHED_REF_PTS is not None:
    refPts = CACHED_REF_PTS

  # Otherwise, we cannot use the cache and/or there are no
  # previous cached reference points, so return early
  else:
    return None

# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
  CACHED_REF_PTS = refPts

# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)

In the above snippet, the application will scan through all the points & try to detect Aruco markers & then create a list of reference points, which will later be used to define the destination transformation matrix.

# For zoom option recalculating all the 4 points
refPtTL1_L_X = refPtTL[0][0]-TOP_LEFT_X
refPtTL1_L_Y = refPtTL[0][1]-TOP_LEFT_Y

refPtTL1.append((refPtTL1_L_X,refPtTL1_L_Y))

refPtTL1_R_X = refPtTL[1][0]+TOP_RIGHT_X
refPtTL1_R_Y = refPtTL[1][1]+TOP_RIGHT_Y

refPtTL1.append((refPtTL1_R_X,refPtTL1_R_Y))

refPtTD1_L_X = refPtTL[2][0]+BOTTOM_RIGHT_X
refPtTD1_L_Y = refPtTL[2][1]+BOTTOM_RIGHT_Y

refPtTL1.append((refPtTD1_L_X,refPtTD1_L_Y))

refPtTD1_R_X = refPtTL[3][0]-BOTTOM_LEFT_X
refPtTD1_R_Y = refPtTL[3][1]+BOTTOM_LEFT_Y

refPtTL1.append((refPtTD1_R_X,refPtTD1_R_Y))

dstMatMod = [refPtTL1[0], refPtTL1[1], refPtTL1[2], refPtTL1[3]]
dstMatMod = np.array(dstMatMod)

The above snippets calculate the revised points for the zoom-out capabilities as discussed in one of the earlier figures.

# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])

The above snippet will create a transformation matrix for the video trailer.

# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
  (H, _) = cv2.findHomography(srcMat, dstMat)
else:
  (H, _) = cv2.findHomography(srcMat, dstMatMod)

warped = cv2.warpPerspective(source, H, (imgW, imgH))

# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source
# image into the destination)
mask = np.zeros((imgH, imgW), dtype="uint8")
if zoomFlag == 1:
  cv2.fillConvexPoly(mask, dstMat.astype("int32"), (255, 255, 255), cv2.LINE_AA)
else:
  cv2.fillConvexPoly(mask, dstMatMod.astype("int32"), (255, 255, 255), cv2.LINE_AA)

# This optional step will give the source image a black
# border surrounding it when applied to the source image, you
# can apply a dilation operation
rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.dilate(mask, rect, iterations=2)

# Create a three channel version of the mask by stacking it
# depth-wise, such that we can copy the warped source image
# into the input image
maskScaled = mask.copy() / 255.0
maskScaled = np.dstack([maskScaled] * 3)

# Copy the warped source image into the input image by
# (1) Multiplying the warped image and masked together,
# (2) Then multiplying the original input image with the
#     mask (giving more weight to the input where there
#     are not masked pixels), and
# (3) Adding the resulting multiplications together
warpedMultiplied = cv2.multiply(warped.astype("float"), maskScaled)
imageMultiplied = cv2.multiply(frame.astype(float), 1.0 - maskScaled)
output = cv2.add(warpedMultiplied, imageMultiplied)
output = output.astype("uint8")

Finally, depending upon the zoom flag, the application will create a warped image surrounded by an optionally black border.

  • clsEmbedVideoWithStream.py (This is the main class of python script that will invoke the clsAugmentedReality class to initiate augment reality after splitting the audio & video & then project them via the Web-CAM with a seamless broadcast.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will invoke the ####
#### clsAugmentedReality class to initiate ####
#### augment reality after splitting the ####
#### audio & video & then project them via ####
#### the Web-CAM with a seamless broadcast. ####
##################################################
# Importing necessary packages
import clsAugmentedReality as ar
from clsConfig import clsConfig as cf
from imutils.video import VideoStream
from collections import deque
import imutils
import time
import cv2
import subprocess
import os
import pygame
import time
import threading
import sys
###############################################
### Global Section ###
###############################################
# Instantiating the dependant class
x1 = ar.clsAugmentedReality()
###############################################
### End of Global Section ###
###############################################
class BreakLoop(Exception):
pass
class clsEmbedVideoWithStream:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.FileName = str(cf.conf['FILE_NAME'])
self.CacheL = int(cf.conf['CACHE_LIM'])
self.FileName_1 = str(cf.conf['FILE_NAME_1'])
self.audioLen = int(cf.conf['audioLen'])
self.audioFreq = float(cf.conf['audioFreq'])
self.videoFrame = float(cf.conf['videoFrame'])
self.stopFlag=cf.conf['stopFlag']
self.zFlag=int(cf.conf['zoomFlag'])
self.title = str(cf.conf['TITLE'])
def playAudio(self, audioFile, audioLen, freq, stopFlag=False):
try:
pygame.mixer.init()
pygame.init()
pygame.mixer.music.load(audioFile)
pygame.mixer.music.set_volume(10)
val = int(audioLen)
i = 0
while i < val:
pygame.mixer.music.play(loops=0, start=float(i))
time.sleep(freq)
i = i + 1
if (i >= val):
raise BreakLoop
if (stopFlag==True):
raise BreakLoop
return 0
except BreakLoop as s:
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def extractAudio(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def processStream(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
FileName = self.FileName
CacheL = self.CacheL
FileName_1 = self.FileName_1
audioLen = self.audioLen
audioFreq = self.audioFreq
videoFrame = self.videoFrame
stopFlag = self.stopFlag
zFlag = self.zFlag
title = self.title
print('audioFreq:')
print(str(audioFreq))
print('videoFrame:')
print(str(videoFrame))
# Construct the source for Video & Temporary Audio
videoFile = Curr_Path + sep + 'Video' + sep + FileName
audioFile = Curr_Path + sep + 'Video' + sep + FileName_1
# Load the Aruco dictionary and grab the Aruco parameters
print("[INFO] initializing marker detector…")
arucoDict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_ARUCO_ORIGINAL)
arucoParams = cv2.aruco.DetectorParameters_create()
# Initialize the video file stream
print("[INFO] accessing video stream…")
vf = cv2.VideoCapture(videoFile)
x = self.extractAudio(videoFile)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
# Initialize a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)
# We need to have a frame in our queue to start our augmented reality
# pipeline, so read the next frame from our video file source and add
# it to our queue
(grabbed, source) = vf.read()
Q.appendleft(source)
# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] starting video stream…")
vs = VideoStream(src=0).start()
time.sleep(2.0)
flg = 0
t = threading.Thread(target=self.playAudio, args=(audioFile, audioLen, audioFreq, stopFlag,))
t.daemon = True
try:
# Loop over the frames from the video stream
while len(Q) > 0:
try:
# Grab the frame from our video stream and resize it
frame = vs.read()
frame = imutils.resize(frame, width=1020)
# Attempt to find the ArUCo markers in the frame, and provided
# they are found, take the current source image and warp it onto
# input frame using our augmented reality technique
warped = x1.getWarpImages(
frame, source,
cornerIDs=(923, 1001, 241, 1007),
arucoDict=arucoDict,
arucoParams=arucoParams,
zoomFlag=zFlag,
useCache=CacheL > 0)
# If the warped frame is not None, then we know (1) we found the
# four ArUCo markers and (2) the perspective warp was successfully
# applied
if warped is not None:
# Set the frame to the output augment reality frame and then
# grab the next video file frame from our queue
frame = warped
source = Q.popleft()
if flg == 0:
t.start()
flg = flg + 1
# For speed/efficiency, we can use a queue to keep the next video
# frame queue ready for us — the trick is to ensure the queue is
# always (or nearly full)
if len(Q) != Q.maxlen:
# Read the next frame from the video file stream
(grabbed, nextFrame) = vf.read()
# If the frame was read (meaning we are not at the end of the
# video file stream), add the frame to our queue
if grabbed:
Q.append(nextFrame)
# Show the output frame
cv2.imshow(title, frame)
time.sleep(videoFrame)
# If the `q` key was pressed, break from the loop
if cv2.waitKey(2) & 0xFF == ord('q'):
stopFlag = True
break
except BreakLoop:
raise BreakLoop
except Exception as e:
pass
if (len(Q) == Q.maxlen):
time.sleep(2)
break
except BreakLoop as s:
print('Processed completed!')
# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()
except Exception as e:
x = str(e)
print(x)
# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()
return 0
except Exception as e:
x = str(e)
print('Error:', x)
return 1

Please find the key snippet from the above script –

def playAudio(self, audioFile, audioLen, freq, stopFlag=False):
  try:
    pygame.mixer.init()
    pygame.init()
    pygame.mixer.music.load(audioFile)

    pygame.mixer.music.set_volume(10)

    val = int(audioLen)
    i = 0

    while i < val:
      pygame.mixer.music.play(loops=0, start=float(i))
      time.sleep(freq)

      i = i + 1

      if (i >= val):
        raise BreakLoop

      if (stopFlag==True):
        raise BreakLoop

    return 0
  except BreakLoop as s:
    return 0
  except Exception as e:
    x = str(e)
    print(x)

    return 1

The above function will initiate the pygame library to run the sound of the video file that has been extracted as part of a separate process.

def extractAudio(self, video_file, output_ext="mp3"):
    try:
        """Converts video to audio directly using `ffmpeg` command
        with the help of subprocess module"""
        filename, ext = os.path.splitext(video_file)
        subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.STDOUT)

        return 0
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above function temporarily extracts the audio file from the source trailer video.

# Initialize the video file stream
print("[INFO] accessing video stream...")
vf = cv2.VideoCapture(videoFile)

x = self.extractAudio(videoFile)

if x == 0:
    print('Successfully Audio extracted from the source file!')
else:
    print('Failed to extract the source audio!')

# Initialize a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)

# We need to have a frame in our queue to start our augmented reality
# pipeline, so read the next frame from our video file source and add
# it to our queue
(grabbed, source) = vf.read()
Q.appendleft(source)

# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] starting video stream...")
vs = VideoStream(src=0).start()

time.sleep(2.0)
flg = 0

The above snippets read the frames from the video file after invoking the audio extraction. Then, it uses a Queue method to store all the video frames for better performance. And finally, it starts consuming the standard streaming video from the WebCAM to augment the trailer video on top of it.

t = threading.Thread(target=self.playAudio, args=(audioFile, audioLen, audioFreq, stopFlag,))
t.daemon = True

Now, the application has instantiated an orphan thread to spin off the audio play function. The reason is to void the performance & video frame frequency impact on top of it.

while len(Q) > 0:
  try:
    # Grab the frame from our video stream and resize it
    frame = vs.read()
    frame = imutils.resize(frame, width=1020)

    # Attempt to find the ArUCo markers in