Enabling & Exploring Stable Defussion – Part 3

Before we dive into the details of this post, let us provide the previous two links that precede it.

Enabling & Exploring Stable Defussion – Part 1

Enabling & Exploring Stable Defussion – Part 2

For, reference, we’ll share the demo before deep dive into the actual follow-up analysis in the below section –


Now, let us continue our discussions from where we left.

class clsText2Image:
    def __init__(self, pipe, output_path, filename):

        self.pipe = pipe
        
        # More aggressive attention slicing
        self.pipe.enable_attention_slicing(slice_size=1)

        self.output_path = f"{output_path}{filename}"
        
        # Warm up the pipeline
        self._warmup()
    
    def _warmup(self):
        """Warm up the pipeline to optimize memory allocation"""
        with torch.no_grad():
            _ = self.pipe("warmup", num_inference_steps=1, height=512, width=512)
        torch.mps.empty_cache()
        gc.collect()
    
    def generate(self, prompt, num_inference_steps=12, guidance_scale=3.0):
        try:
            torch.mps.empty_cache()
            gc.collect()
            
            with torch.autocast(device_type="mps"):
                with torch.no_grad():
                    image = self.pipe(
                        prompt,
                        num_inference_steps=num_inference_steps,
                        guidance_scale=guidance_scale,
                        height=1024,
                        width=1024,
                    ).images[0]
            
            image.save(self.output_path)
            return 0
        except Exception as e:
            print(f'Error: {str(e)}')
            return 1
        finally:
            torch.mps.empty_cache()
            gc.collect()

    def genImage(self, prompt):
        try:

            # Initialize generator
            x = self.generate(prompt)

            if x == 0:
                print('Successfully processed first pass!')
            else:
                print('Failed complete first pass!')
                raise 

            return 0

        except Exception as e:
            print(f"\nAn unexpected error occurred: {str(e)}")

            return 1

This is the initialization method for the clsText2Image class:

  • Takes a pre-configured pipe (text-to-image pipeline), an output_path, and a filename.
  • Enables more aggressive memory optimization by setting “attention slicing.”
  • Prepares the full file path for saving generated images.
  • Calls a _warmup method to pre-load the pipeline and optimize memory allocation.

This private method warms up the pipeline:

  • Sends a dummy “warmup” request with basic parameters to allocate memory efficiently.
  • Clears any cached memory (torch.mps.empty_cache()) and performs garbage collection (gc.collect()).
  • Ensures smoother operation for future image generation tasks.

This method generates an image from a text prompt:

  • Clears memory cache and performs garbage collection before starting.
  • Uses the text-to-image pipeline (pipe) to generate an image:
    • Takes the prompt, number of inference steps, and guidance scale as input.
    • Outputs an image at 1024×1024 resolution.
  • Saves the generated image to the specified output path.
  • Returns 0 on success or 1 on failure.
  • Ensures cleanup by clearing memory and collecting garbage, even in case of errors.

This method simplifies image generation:

  • Calls the generate method with the given prompt.
  • Prints a success message if the image is generated (0 return value).
  • On failure, logs the error and raises an exception.
  • Returns 0 on success or 1 on failure.
class clsImage2Video:
    def __init__(self, pipeline):
        
        # Optimize model loading
        torch.mps.empty_cache()
        self.pipeline = pipeline

    def generate_frames(self, pipeline, init_image, prompt, duration_seconds=10):
        try:
            torch.mps.empty_cache()
            gc.collect()

            base_frames = []
            img = Image.open(init_image).convert("RGB").resize((1024, 1024))
            
            for _ in range(10):
                result = pipeline(
                    prompt=prompt,
                    image=img,
                    strength=0.45,
                    guidance_scale=7.5,
                    num_inference_steps=25
                ).images[0]

                base_frames.append(np.array(result))
                img = result
                torch.mps.empty_cache()

            frames = []
            for i in range(len(base_frames)-1):
                frame1, frame2 = base_frames[i], base_frames[i+1]
                for t in np.linspace(0, 1, int(duration_seconds*24/10)):
                    frame = (1-t)*frame1 + t*frame2
                    frames.append(frame.astype(np.uint8))
            
            return frames
        except Exception as e:
            frames = []
            print(f'Error: {str(e)}')

            return frames
        finally:
            torch.mps.empty_cache()
            gc.collect()

    # Main method
    def genVideo(self, prompt, inputImage, targetVideo, fps):
        try:
            print("Starting animation generation...")
            
            init_image_path = inputImage
            output_path = targetVideo
            fps = fps
            
            frames = self.generate_frames(
                pipeline=self.pipeline,
                init_image=init_image_path,
                prompt=prompt,
                duration_seconds=20
            )
            
            imageio.mimsave(output_path, frames, fps=30)

            print("Animation completed successfully!")

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

This initializes the clsImage2Video class:

  • Clears the GPU cache to optimize memory before loading.
  • Sets up the pipeline for generating frames, which uses an image-to-video transformation model.

This function generates frames for a video:

  • Starts by clearing GPU memory and running garbage collection.
  • Loads the init_image, resizes it to 1024×1024 pixels, and converts it to RGB format.
  • Iteratively applies the pipeline to transform the image:
    • Uses the prompt and specified parameters like strengthguidance_scale, and num_inference_steps.
    • Stores the resulting frames in a list.
  • Interpolates between consecutive frames to create smooth transitions:
    • Uses linear blending for smooth animation across a specified duration and frame rate (24 fps for 10 segments).
  • Returns the final list of generated frames or an empty list if an error occurs.
  • Always clears memory after execution.

This is the main function for creating a video from an image and text prompt:

  • Logs the start of the animation generation process.
  • Calls generate_frames() with the given pipelineinputImage, and prompt to create frames.
  • Saves the generated frames as a video using the imageio library, setting the specified frame rate (fps).
  • Logs a success message and returns 0 if the process is successful.
  • On error, logs the issue and returns 1.

Now, let us understand the performance. But, before that let us explore the device on which we’ve performed these stress test that involves GPU & CPUs as well.

And, here is the performance stats –

From the above snapshot, we can clearly communicate that the GPU is 100% utilized. However, the CPU has shown a significant % of availability.

As you can see, the first pass converts the input prompt to intermediate images within 1 min 30 sec. However, the second pass constitutes multiple hops (11 hops) on an avg 22 seconds. Overall, the application will finish in 5 minutes 36 seconds for a 10-second video clip.


So, we’ve done it.

You can find the detailed code at the GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse.

Till then, Happy Avenging! 🙂

Building solutions using LLM AutoGen in Python – Part 1

Today, I’ll be publishing a series of posts on LLM agents and how they can help you improve your delivery capabilities for various tasks.

Also, we’re providing the demo here –

Isn’t it exciting?


The application will interact with the AutoGen agents, use underlying Open AI APIs to follow the instructions, generate the steps, and then follow that path to generate the desired code. Finally, it will execute the generated scripts if the first outcome of the demo satisfies users.


Let us understand some of the key snippets –

# Create the assistant agent
assistant = autogen.AssistantAgent(
    name="AI_Assistant",
    llm_config={
        "config_list": config_list,
    }
)

Purpose: This line creates an AI assistant agent named “AI_Assistant”.

Function: It uses a language model configuration provided in config_list to define how the assistant behaves.

Role: The assistant serves as the primary agent who will coordinate with other agents to solve problems.

user_proxy = autogen.UserProxyAgent(
    name="Admin",
    system_message=templateVal_1,
    human_input_mode="TERMINATE",
    max_consecutive_auto_reply=10,
    is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"),
    code_execution_config={
        "work_dir": WORK_DIR,
        "use_docker": False,
    },
)

Purpose: This code creates a user proxy agent named “Admin”.

Function:

  • System Message: Uses templateVal_1 as its initial message to set the context.
  • Human Input Mode: Set to "TERMINATE", meaning it will keep interacting until a termination condition is met.
  • Auto-Reply Limit: Can automatically reply up to 10 times without human intervention.
  • Termination Condition: A message is considered a termination message if it ends with the word “TERMINATE”.
  • Code Execution: Configured to execute code in the directory specified by WORK_DIR without using Docker.

Role: Acts as an intermediary between the user and the assistant, handling interactions and managing the conversation flow.

engineer = autogen.AssistantAgent(
    name="Engineer",
    llm_config={
        "config_list": config_list,
    },
    system_message=templateVal_2,
)

Purpose: Creates an assistant agent named “Engineer”.

Function: Uses templateVal_2 as its system message to define its expertise in engineering matters.

Role: Specializes in technical and engineering aspects of the problem.

game_designer = autogen.AssistantAgent(
    name="GameDesigner",
    llm_config={
        "config_list": config_list,
    },
    system_message=templateVal_3,
)

Purpose: Creates an assistant agent named “GameDesigner”.

Function: Uses templateVal_3 to set its focus on game design.

Role: Provides insights and solutions related to game design aspects.

planner = autogen.AssistantAgent(
    name="Planer",
    llm_config={
        "config_list": config_list,
    },
    system_message=templateVal_4,
)

Purpose: Creates an assistant agent named “Planer” (likely intended to be “Planner”).

Function: Uses templateVal_4 to define its role in planning.

Role: Responsible for organizing and planning tasks to solve the problem.

critic = autogen.AssistantAgent(
    name="Critic",
    llm_config={
        "config_list": config_list,
    },
    system_message=templateVal_5,
)

Purpose: Creates an assistant agent named “Critic”.

Function: Uses templateVal_5 to set its function as a critic.

Role: Provide feedback, critique solutions, and help improve the overall response.

logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

Purpose: Configures the logging system.

Function: Sets the logging level to only capture error messages to avoid cluttering the output.

Role: Helps in debugging by capturing and displaying error messages.

def buildAndPlay(self, inputPrompt):
    try:
        user_proxy.initiate_chat(
            assistant,
            message=f"We need to solve the following problem: {inputPrompt}. "
                    "Please coordinate with the admin, engineer, game_designer, planner and critic to provide a comprehensive solution. "
        )

        return 0
    except Exception as e:
        x = str(e)
        print('Error: <<Real-time Translation>>: ', x)

        return 1

Purpose: Defines a method to initiate the problem-solving process.

Function:

  • Parameters: Takes inputPrompt, which is the problem to be solved.
  • Action:
    • Calls user_proxy.initiate_chat() to start a conversation between the user proxy agent and the assistant agent.
    • Sends a message requesting coordination among all agents to provide a comprehensive solution to the problem.
  • Error Handling: If an exception occurs, it prints an error message and returns 1.

Role: Initiates collaboration among all agents to solve the provided problem.

Agents Setup: Multiple agents with specialized roles are created.
Initiating Conversation: The buildAndPlay method starts a conversation, asking agents to collaborate.
Problem Solving: Agents communicate and coordinate to provide a comprehensive solution to the input problem.
Error Handling: The system captures and logs any errors that occur during execution.


We’ll continue to discuss this topic in the upcoming post.

I’ll bring some more exciting topics in the coming days from the Python verse.

Till then, Happy Avenging! 🙂

Navigating the Future of Work: Insights from the Argyle AI Summit

At the recent Argyle AI Summit, a prestigious event in the AI industry, I had the honor of participating as a speaker alongside esteemed professionals like Misha Leybovich from Google Labs. The summit, coordinated by Sylvia Das Chagas, a former senior AI conversation designer at CVS Health, provided an enlightening platform to discuss the evolving role of AI in talent management. Our session focused on the theme “Driving Talent with AI,” addressing some of the most pressing questions in the field. Frequently, relevant use cases were shared in detail to support these threads.

To view the actual page, please click the following link.

One of the critical topics we explored was AI’s impact on talent management in the upcoming year. AI’s influence in hiring and retention is becoming increasingly significant. For example, AI-powered tools can now analyze vast amounts of data to identify the best candidates for a role, going beyond traditional resume screening. In retention, AI is instrumental in identifying patterns that indicate an employee’s likelihood to leave, enabling proactive measures.

A burning question in AI is how leaders address fears that AI might replace manual jobs. We discussed the importance of leaders framing AI as a complement to human skills rather than a replacement. AI enhances employee capabilities by automating mundane tasks, allowing employees to focus on more creative and strategic work.

Regarding new AI tools that organizations should watch out for, the conversation highlighted tools that enhance remote collaboration and workplace inclusivity. Tools like virtual meeting assistants that can transcribe, translate, and summarize meetings in real time are becoming invaluable in today’s global work environment.

AI’s role in boosting employee motivation and productivity was another focal point. We discussed how AI-driven career development programs can offer personalized learning paths, helping employees grow and stay motivated.

Incorporating multiple languages in tools like ChatGPT was highlighted as a critical step towards inclusivity. This expansion allows a broader range of employees to interact with AI tools in their native language, fostering a more inclusive workplace environment.

Lastly, we tackled the challenge of addressing employees’ reluctance to change. Emphasizing the importance of transparent communication and education about AI’s benefits was identified as key. Organizations can alleviate fears and encourage a more accepting attitude towards AI by involving employees in the AI implementation process and providing training.

The Argyle AI Summit offered a compelling glimpse into the future of AI in talent management. The session provided valuable insights for leaders looking to harness AI’s potential to enhance talent management strategies by discussing real-world examples and strategies. To gain more in-depth knowledge and perspectives shared during this summit, I encourage interested parties to visit the recorded session link for a more comprehensive understanding.

Or, you can directly view it from here –


I would greatly appreciate your feedback on the insights shared during the summit. Your thoughts and perspectives are invaluable as we continue to explore and navigate the evolving landscape of AI in the workplace.

Text2SQL Data Extractor (T2SDE) using Python & Open AI LLM

Today, I will share a new post that will contextualize the source files & then read the data into the pandas data frame, and then dynamically create the SQL & execute it. Then, fetch the data from the sources based on the query generated dynamically. This project is for the advanced Python developer and data Science Newbie.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.


The application will take the metadata captured from source data dynamically. It blends the metadata and enhances the prompt to pass to the Flask server. The Flask server has all the limits of contexts.

Once the application receives the correct generated SQL, it will then apply the SQL using the SQLAlchemy package to get the desired results.

The following are the important packages that are essential to this project –

pip install openai==1.6.1
pip install pandas==2.1.4
pip install Flask==3.0.0
pip install SQLAlchemy==2.0.23

We’ll have both the server and the main application. Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • 1_invokeSQLServer.py (This is the main calling Python script to invoke the OpenAI-Server.)

Please find some of the key snippet from this discussion –

@app.route('/message', methods=['POST'])
def message():
    input_text = request.json.get('input_text', None)
    session_id = request.json.get('session_id', None)

    print('*' * 240)
    print('User Input:')
    print(str(input_text))
    print('*' * 240)

    # Retrieve conversation history from the session or database
    conversation_history = session.get(session_id, [])

    # Add the new message to the conversation history
    conversation_history.append(input_text)

    # Call OpenAI API with the updated conversation
    response = client.with_options(max_retries=0).chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": input_text,
            }
        ],
        model=cf.conf['MODEL_NAME'],
    )

    # Extract the content from the first choice's message
    chat_response = response.choices[0].message.content
    print('*' * 240)
    print('Resposne::')
    print(chat_response)
    print('*' * 240)

    conversation_history.append(chat_response)

    # Store the updated conversation history in the session or database
    session[session_id] = conversation_history

    return chat_response

This code defines a web application route that handles POST requests sent to the /message endpoint:

  1. Route Declaration: The @app.route('/message', methods=['POST']) part specifies that the function message() is executed when the server receives a POST request at the /message URL.
  2. Function Definition: Inside the message() function:
    • It retrieves two pieces of data from the request’s JSON body: input_text (the user’s input message) and session_id (a unique identifier for the user’s session).
    • It prints the user’s input message, surrounded by lines of asterisks for emphasis.
  3. Conversation History Management:
    • The code retrieves the conversation history associated with the given session_id. This history is a list of messages.
    • It then adds the new user message (input_text) to this conversation history.
  4. OpenAI API Call:
    • The function makes a call to the OpenAI API, passing the user’s message. It specifies not to retry the request if it fails (max_retries=0).
    • The model used for the OpenAI API call is taken from some configurations (cf.conf['MODEL_NAME']).
  5. Processing API Response:
    • The response from the OpenAI API is processed to extract the content of the chat response.
    • This chat response is printed.
  6. Updating Conversation History:
    • The chat response is added to the conversation history.
    • The updated conversation history is then stored back in the session or database, associated with the session_id.
  7. Returning the Response: Finally, the function returns the chat response.

  • clsDynamicSQLProcess.py (This Python class generates the SQL & then executes the flask server to invoke the OpenAI-Server.)

Now, let us understand the few important piece of snippet –

def text2SQLBegin(self, DBFileNameList, fileDBPath, srcQueryPrompt, joinCond, debugInd='N'):

        question = srcQueryPrompt
        create_table_statement = ''
        jStr = ''

        print('DBFileNameList::', DBFileNameList)
        print('prevSessionDBFileNameList::', self.prevSessionDBFileNameList)

        if set(self.prevSessionDBFileNameList) == set(DBFileNameList):
            self.flag = 'Y'
        else:
            self.flag = 'N'

        if self.flag == 'N':

            for i in DBFileNameList:
                DBFileName = i

                FullDBname = fileDBPath + DBFileName
                print('File: ', str(FullDBname))

                tabName, _ = DBFileName.split('.')

                # Reading the source data
                df = pd.read_csv(FullDBname)

                # Convert all string columns to lowercase
                df = df.apply(lambda x: x.str.lower() if x.dtype == "object" else x)

                # Convert DataFrame to SQL table
                df.to_sql(tabName, con=engine, index=False)

                # Create a MetaData object and reflect the existing database
                metadata = MetaData()
                metadata.reflect(bind=engine)

                # Access the 'users' table from the reflected metadata
                table = metadata.tables[tabName]

                # Generate the CREATE TABLE statement
                create_table_statement = create_table_statement + str(CreateTable(table)) + '; \n'

                tabName = ''

            for joinS in joinCond:
                jStr = jStr + joinS + '\n'

            self.prevSessionDBFileNameList = DBFileNameList
            self.prev_create_table_statement = create_table_statement

            masterSessionDBFileNameList = self.prevSessionDBFileNameList
            mast_create_table_statement = self.prev_create_table_statement

        else:
            masterSessionDBFileNameList = self.prevSessionDBFileNameList
            mast_create_table_statement = self.prev_create_table_statement

        inputPrompt = (templateVal_1 + mast_create_table_statement + jStr + templateVal_2).format(question=question)

        if debugInd == 'Y':
            print('INPUT PROMPT::')
            print(inputPrompt)

        print('*' * 240)
        print('Find the Generated SQL:')
        print()

        DBFileNameList = []
        create_table_statement = ''

        return inputPrompt
  1. Function Overview: The text2SQLBegin function processes a list of database file names (DBFileNameList), a file path (fileDBPath), a query prompt (srcQueryPrompt), join conditions (joinCond), and a debug indicator (debugInd) to generate SQL commands.
  2. Initial Setup: It starts by initializing variables for the question, the SQL table creation statement, and a string for join conditions.
  3. Debug Prints: The function prints the current and previous session database file names for debugging purposes.
  4. Flag Setting: A flag is set to ‘Y’ if the current session’s database file names match the previous session’s; otherwise, it’s set to ‘N’.
  5. Processing New Session Data: If the flag is ‘N’, indicating new session data:
    • For each database file, it reads the data, converts string columns to lowercase, and creates a corresponding SQL table in a database using the pandas library.
    • Metadata is generated for each table and a CREATE TABLE SQL statement is created.
  6. Join Conditions and Statement Aggregation: Join conditions are concatenated, and previous session information is updated with the current session’s data.
  7. Handling Repeated Sessions: If the session data is repeated (flag is ‘Y’), it uses the previous session’s SQL table creation statements and database file names.
  8. Final Input Prompt Creation: It constructs the final input prompt by combining template values with the create table statement, join conditions, and the original question.
  9. Debug Printing: If debug mode is enabled, it prints the final input prompt.
  10. Conclusion: The function clears the DBFileNameList and create_table_statement variables, and returns the constructed input prompt.
  def text2SQLEnd(self, srcContext, debugInd='N'):
      url = self.url

      payload = json.dumps({"input_text": srcContext,"session_id": ""})
      headers = {'Content-Type': 'application/json', 'Cookie': cf.conf['HEADER_TOKEN']}

      response = requests.request("POST", url, headers=headers, data=payload)

      return response.text

The text2SQLEnd function sends an HTTP POST request to a specified URL and returns the response. It takes two parameters: srcContext which contains the input text, and an optional debugInd for debugging purposes. The function constructs the request payload by converting the input text and an empty session ID to JSON format. It sets the request headers, including a content type of ‘application/json’ and a token from the configuration file. The function then sends the POST request using the requests library and returns the text content of the response.

  def sql2Data(self, srcSQL):
      # Executing the query on top of your data
      resultSQL = pd.read_sql_query(srcSQL, con=engine)

      return resultSQL

The sql2Data function is designed to execute a SQL query on a database and return the result. It takes a single parameter, srcSQL, which contains the SQL query to be executed. The function uses the pandas library to run the provided SQL query (srcSQL) against a database connection (engine). It then returns the result of this query, which is typically a DataFrame object containing the data retrieved from the database.

def genData(self, srcQueryPrompt, fileDBPath, DBFileNameList, joinCond, debugInd='N'):
    try:
        authorName = self.authorName
        website = self.website
        var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

        print('*' * 240)
        print('SQL Start Time: ' + str(var))
        print('*' * 240)

        print('*' * 240)
        print()

        if debugInd == 'Y':
            print('Author Name: ', authorName)
            print('For more information, please visit the following Website: ', website)
            print()

            print('*' * 240)
        print('Your Data for Retrieval:')
        print('*' * 240)

        if debugInd == 'Y':

            print()
            print('Converted File to Dataframe Sample:')
            print()

        else:
            print()

        context = self.text2SQLBegin(DBFileNameList, fileDBPath, srcQueryPrompt, joinCond, debugInd)
        srcSQL = self.text2SQLEnd(context, debugInd)

        print(srcSQL)
        print('*' * 240)
        print()
        resDF = self.sql2Data(srcSQL)

        print('*' * 240)
        print('SQL End Time: ' + str(var))
        print('*' * 240)

        return resDF

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df
  1. Initialization and Debug Information: The function begins by initializing variables like authorName, website, and a timestamp (var). It then prints the start time of the SQL process. If the debug indicator (debugInd) is ‘Y’, it prints additional information like the author’s name and website.
  2. Generating SQL Context and Query: The function calls text2SQLBegin with various parameters (file paths, database file names, query prompt, join conditions, and the debug indicator) to generate an SQL context. Then it calls text2SQLEnd with this context and the debug indicator to generate the actual SQL query.
  3. Executing the SQL Query: It prints the generated SQL query for visibility, especially in debug mode. The query is then executed by calling sql2Data, which returns the result as a data frame (resDF).
  4. Finalization and Error Handling: After executing the query, it prints the SQL end time. In case of any exceptions during the process, it catches the error, prints it, and returns an empty DataFrame.
  5. Return Value: The function returns the DataFrame (resDF) containing the results of the executed SQL query. If an error occurs, it returns an empty DataFrame instead.

Let us explore the directory structure starting from the parent to some of the important child folder should look like this –

Let us understand the important screenshots of this entire process –


So, finally, we’ve done it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Validating source data against RAG-response using Open AI, GloVe, FAISS using Python

Today, I’ll be presenting another exciting capability of architecture in the world of LLMs, where you need to answer one crucial point & that is how valid the response generated by these LLMs is against your data. This response is critical when discussing business growth & need to take the right action at the right time.

Why not view the demo before going through it?

Demo

Isn’t it exciting? Great! Let us understand this in detail.

The first dotted box (extreme-left) represents the area that talks about the data ingestion from different sources, including third-party PDFs. It is expected that organizations should have ready-to-digest data sources. Examples: Data Lake, Data Mart, One Lake, or any other equivalent platforms. Those PDFs will provide additional insights beyond the conventional advanced analytics.

You need to have some kind of OCR solution that will extract all the relevant information in the form of text from the documents. 

The next important part is how you define the chunking & embedding of data chunks into Vector DB. Chunking & indexing strategies, along with the overlapping chain, play a crucial importance in tying that segregated piece of context into a single context that will be fed into the source for your preferred LLMs.

This system employs a vector similarity search to browse through unstructured information and concurrently accesses the database to retrieve the context, ensuring that the responses are not only comprehensive but also anchored in validated knowledge.

This approach is particularly vital for addressing multi-hop questions, where a single query can be broken down into multiple sub-questions and may require information from numerous documents to generate an accurate answer.


pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1
pip install faiss-cpu==1.7.4
pip install gensim==4.3.2

Let us understand the key class & snippets.

  • clsFeedVectorDB.py (This is the main class that will invoke the Faiss framework to contextualize the docs inside the vector DB with the source file name to validate the answer from Gen AI using Globe.6B embedding models.)

Let us understand some of the key snippets from the above script (Full scripts will be available in the GitHub Repo) –

# Sample function to convert text to a vector
def text2Vector(self, text):
    # Encode the text using the tokenizer
    words = [word for word in text.lower().split() if word in self.model]

    # If no words in the model, return a zero vector
    if not words:
        return np.zeros(self.model.vector_size)

    # Compute the average of the word vectors
    vector = np.mean([self.model[word] for word in words], axis=0)
    return vector.reshape(1, -1)

This code is for a function called “text2Vector” that takes some text as input and converts it into a numerical vector. Let me break it down step by step:

  • It starts by taking some text as input, and this text is expected to be a sentence or a piece of text.
  • The text is then split into individual words, and each word is converted to lowercase.
  • It checks if each word is present in a pre-trained language model (probably a word embedding model like Word2Vec or GloVe). If a word is not in the model, it’s ignored.
  • If none of the words from the input text are found in the model, the function returns a vector filled with zeros. This vector has the same size as the word vectors in the model.
  • If there are words from the input text in the model, the function calculates the average vector of these words. It does this by taking the word vectors for each word found in the model and computing their mean (average). This results in a single vector that represents the input text.
  • Finally, the function reshapes this vector into a 2D array with one row and as many columns as there are elements in the vector. The reason for this reshaping is often related to compatibility with other parts of the code or libraries used in the project.

So, in simple terms, this function takes a piece of text, looks up the word vectors for the words in that text, and calculates the average of those vectors to create a single numerical representation of the text. If none of the words are found in the model, it returns a vector of zeros.

    def genData(self):
        try:
            basePath = self.basePath
            modelFileName = self.modelFileName
            vectorDBPath = self.vectorDBPath
            vectorDBFileName = self.vectorDBFileName

            # Create a FAISS index
            dimension = int(cf.conf['NO_OF_MODEL_DIM'])  # Assuming 100-dimensional vectors 
            index = faiss.IndexFlatL2(dimension)

            print('*' * 240)
            print('Vector Index Your Data for Retrieval:')
            print('*' * 240)

            FullVectorDBname = vectorDBPath + vectorDBFileName
            indexFile = str(vectorDBPath) + str(vectorDBFileName) + '.index'

            print('File: ', str(indexFile))

            data = {}
            # List all files in the specified directory
            files = os.listdir(basePath)

            # Filter out files that are not text files
            text_files = [file for file in files if file.endswith('.txt')]

            # Read each text file
            for file in text_files:
                file_path = os.path.join(basePath, file)
                print('*' * 240)
                print('Processing File:')
                print(str(file_path))
                try:
                    # Attempt to open with utf-8 encoding
                    with open(file_path, 'r', encoding='utf-8') as file:
                        for line_number, line in enumerate(file, start=1):
                            # Assume each line is a separate document
                            vector = self.text2Vector(line)
                            vector = vector.reshape(-1)
                            index_id = index.ntotal

                            index.add(np.array([vector]))  # Adding the vector to the index
                            data[index_id] = {'text': line, 'line_number': line_number, 'file_name': file_path}  # Storing the line and file name
                except UnicodeDecodeError:
                    # If utf-8 fails, try a different encoding
                    try:
                        with open(file_path, 'r', encoding='ISO-8859-1') as file:
                            for line_number, line in enumerate(file, start=1):
                                # Assume each line is a separate document
                                vector = self.text2Vector(line)
                                vector = vector.reshape(-1)
                                index_id = index.ntotal
                                index.add(np.array([vector]))  # Adding the vector to the index
                                data[index_id] = {'text': line, 'line_number': line_number, 'file_name': file_path}  # Storing the line and file name
                    except Exception as e:
                        print(f"Could not read file {file}: {e}")
                        continue

                print('*' * 240)

            # Save the data dictionary using pickle
            dataCache = vectorDBPath + modelFileName
            with open(dataCache, 'wb') as f:
                pickle.dump(data, f)

            # Save the index and data for later use
            faiss.write_index(index, indexFile)

            print('*' * 240)

            return 0

        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1
  • This code defines a function called genData, and its purpose is to prepare and store data for later retrieval using a FAISS index. Let’s break down what it does step by step:
  • It starts by assigning several variables, such as basePath, modelFileName, vectorDBPath, and vectorDBFileName. These variables likely contain file paths and configuration settings.
  • It creates a FAISS index with a specified dimension (assuming 100-dimensional vectors in this case) using faiss.IndexFlatL2. FAISS is a library for efficient similarity search and clustering of high-dimensional data.
  • It prints the file name and lines where the index will be stored. It initializes an empty dictionary called data to store information about the processed text data.
  • It lists all the files in a directory specified by basePath. It filters out only the files that have a “.txt” extension as text files.
  • It then reads each of these text files one by one. For each file:
  1. It attempts to open the file with UTF-8 encoding.
    • It reads the file line by line.
    • For each line, it calls a function text2Vector to convert the text into a numerical vector representation. This vector is added to the FAISS index.
    • It also stores some information about the line, such as the line number and the file name, in the data dictionary.
    • If there is an issue with UTF-8 encoding, it tries to open the file with a different encoding, “ISO-8859-1”. The same process of reading and storing data continues.
  • If there are any exceptions (errors) during this process, it prints an error message but continues processing other files.
  • Once all the files are processed, it saves the data dictionary using the pickle library to a file specified by dataCache.
  • It also saves the FAISS index to a file specified by indexFile.
  • Finally, it returns 0 if the process completes successfully or 1 if there was an error during execution.

In summary, this function reads text files, converts their contents into numerical vectors, and builds a FAISS index for efficient similarity search. It also saves the processed data and the index for later use. If there are any issues during the process, it prints error messages but continues processing other files.

  • clsRAGOpenAI.py (This is the main class that will invoke the RAG class, which will get the contexts with references including source files, line numbers, and source texts. This will help the customer to validate the source against the OpenAI response to understand & control the data bias & other potential critical issues.)

Let us understand some of the key snippets from the above script (Full scripts will be available in the GitHub Repo) –

def ragAnswerWithHaystackAndGPT3(self, queryVector, k, question):
    modelName = self.modelName
    maxToken = self.maxToken
    temp = self.temp

    # Assuming getTopKContexts is a method that returns the top K contexts
    contexts = self.getTopKContexts(queryVector, k)
    messages = []

    # Add contexts as system messages
    for file_name, line_number, text in contexts:
        messages.append({"role": "system", "content": f"Document: {file_name} \nLine Number: {line_number} \nContent: {text}"})

    prompt = self.generateOpenaiPrompt(queryVector, k)
    prompt = prompt + "Question: " + str(question) + ". \n Answer based on the above documents."

    # Add user question
    messages.append({"role": "user", "content": prompt})

    # Create chat completion
    completion = client.chat.completions.create(
    model=modelName,
    messages=messages,
    temperature = temp,
    max_tokens = maxToken
    )

    # Assuming the last message in the response is the answer
    last_response = completion.choices[0].message.content
    source_refernces = ['FileName: ' + str(context[0]) + ' - Line Numbers: ' + str(context[1]) + ' - Source Text (Reference): ' + str(context[2]) for context in contexts]

    return last_response, source_refernces
  • This code defines a function called ragAnswerWithHaystackAndGPT3. Its purpose is to use a combination of the Haystack search method and OpenAI’s GPT-3 model to generate an answer to a user’s question. Let’s break down what it does step by step:
  • It starts by assigning several variables, such as modelName, maxToken, and temp. These variables likely contain model-specific information and settings for GPT-3.
  • It calls a method getTopKContexts to retrieve the top K contexts (which are likely documents or pieces of text) related to the user’s query. These contexts are stored in the contexts variable.
  • It initializes an empty list called messages to store messages that will be used in the conversation with the GPT-3 model.
  • It iterates through each context and adds them as system messages to the messages list. These system messages provide information about the documents or sources being used in the conversation.
  • It creates a prompt that combines the query, retrieved contexts, and the user’s question. This prompt is then added as a user message to the messages list. It effectively sets up the conversation for GPT-3, where the user’s question is followed by context.
  • It makes a request to the GPT-3 model using the client.chat.completions.create method, passing in the model name, the constructed messages, and other settings such as temperature and maximum tokens.
  • After receiving a response from GPT-3, it assumes that the last message in the response contains the answer generated by the model.
  • It also constructs source_references, which is a list of references to the documents or sources used in generating the answer. This information includes the file name, line numbers, and source text for each context.
  • Finally, it returns the generated answer (last_response) and the source references to the caller.

In summary, this function takes a user’s query, retrieves relevant contexts or documents, sets up a conversation with GPT-3 that includes the query and contexts, and then uses GPT-3 to generate an answer. It also provides references to the sources used in generating the answer.

    def getTopKContexts(self, queryVector, k):
        try:
            distances, indices = index.search(queryVector, k)
            resDict = [(data[i]['file_name'], data[i]['line_number'], data[i]['text']) for i in indices[0]]
            return resDict
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return x

This code defines a function called getTopKContexts. Its purpose is to retrieve the top K relevant contexts or pieces of information from a pre-built index based on a query vector. Here’s a breakdown of what it does:

  1. It takes two parameters as input: queryVector, which is a numerical vector representing a query, and k, which specifies how many relevant contexts to retrieve.
  2. Inside a try-except block, it attempts the following steps:
    • It uses the index.search method to find the top K closest contexts to the given queryVector. This method returns two arrays: distances (measuring how similar the contexts are to the query) and indices (indicating the positions of the closest contexts in the data).
    • It creates a list called “resDict", which contains tuples for each of the top K contexts. Each tuple contains three pieces of information: the file name (file_name), the line number (line_number), and the text content (text) of the context. These details are extracted from a data dictionary.
  3. If the process completes successfully, it returns the list of top K contexts (resDict) to the caller.
  4. If there’s an exception (an error) during this process, it captures the error message as a string (x), prints the error message, and then returns the error message itself.

In summary, this function takes a query vector and finds the K most relevant contexts or pieces of information based on their similarity to the query. It returns these contexts as a list of tuples containing file names, line numbers, and text content. If there’s an error, it prints an error message and returns the error message string.

def generateOpenaiPrompt(self, queryVector, k):
    contexts = self.getTopKContexts(queryVector, k)
    template = ct.templateVal_1
    prompt = template
    for file_name, line_number, text in contexts:
        prompt += f"Document: {file_name}\n Line Number: {line_number} \n Content: {text}\n\n"
    return prompt

This code defines a function called generateOpenaiPrompt. Its purpose is to create a prompt or a piece of text that combines a template with information from the top K relevant contexts retrieved earlier. Let’s break down what it does:

  1. It starts by calling the getTopKContexts function to obtain the top K relevant contexts based on a given queryVector.
  2. It initializes a variable called template with a predefined template value (likely defined elsewhere in the code).
  3. It sets the prompt variable to the initial template.
  4. Then, it enters a loop where it iterates through each of the relevant contexts retrieved earlier (contexts are typically documents or text snippets).
  5. For each context, it appends information to the prompt. Specifically, it adds lines to the prompt that include:
    • The document’s file name (Document: [file_name]).
    • The line number within the document (Line Number: [line_number]).
    • The content of the context itself (Content: [text]).
  6. It adds some extra spacing (newlines) between each context to ensure readability.
  7. Finally, it returns the complete – prompt, which is a combination of the template and information from the relevant contexts.

In summary, this function takes a query vector, retrieves relevant contexts, and creates a prompt by combining a template with information from these contexts. This prompt can then be used as input for an AI model or system, likely for generating responses or answers based on the provided context.

Let us understand the directory structure of this entire application –


To learn more about this package, please visit the following GitHub link.

So, finally, we’ve done it. I know that this post is relatively smaller than my earlier post. But, I think, you can get a good hack to improve some of your long-running jobs by applying this trick.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Enable OpenAI chatbot with the selected YouTube video content using LangChain, FAISS & YouTube data-API.

Today, I’m very excited to demonstrate an effortless & new way to extract the transcript from YouTube videos & then answer the questions based on the topics selected by the users. In this post, I plan to deal with the user inputs to consider the case first & then it can summarize the video content through useful advanced analytics with the help of the LangChain & OpenAI-based model.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.
Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it very exciting? This will lead to a whole new ballgame, where one can get critical decision-making information from these human sources along with their traditional advanced analytical data.

How will it help?

Let’s say as per your historical data & analytics, the dashboard is recommending prod-A, prod-B & prod-C as the top three products for potential top-performing brands. Whereas, you are getting some alerts from the TV news on prod-B due to the recent incidents. So, in that case, you don’t want to continue with the prod-B investment. You may find a new product named prod-Z. That may reduce the risk of your investment.


What is LangChain?

LangChain is a framework for developing applications powered by language models. We believe that the most powerful and differentiated applications will not only call out to a language model but will also be:

  1. Data-aware: connect a language model to other sources of data
  2. Agentic: allow a language model to interact with its environment

The LangChain framework works around these principles.

To know more about this, please click the following link.

As you can see, this is one of the critical components in our solution, which will bind the OpenAI bot & it will feed the necessary data to provide the correct response.


What is FAISS?

Faiss is a library for efficient similarity search and clustering of dense vectors. It contains algorithms that search in sets of vectors of any size, up to ones that do not fit in RAM. It also has supporting code for evaluation and parameter tuning.

Faiss developed using C++ with complete wrappers for Python—some of the most beneficial algorithms available both on CPU & in GPU as well. Facebook AI Research develops it.

To know more about this, please click the following link.


FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

Here are the steps that will follow in sequence –

  • The application will first get the topic on which it needs to look from YouTube & find the top 5 videos using the YouTube data-API.
  • Once the application returns a list of websites from the above step, LangChain will drive the application will extract the transcripts from the video & then optimize the response size in smaller chunks to address the costly OpenAI calls. During this time, it will invoke FAISS to create document DBs.
  • Finally, it will send those chunks to OpenAI for the best response based on your supplied template that performs the final analysis with small data required for your query & gets the appropriate response with fewer costs.

CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-May-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal OpenAI-based video content ####
#### enable bot. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'LangChain Demo!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Output.csv',
'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
'TITLE': "LangChain Demo!",
'TEMP_VAL': 0.2,
'PATH' : Curr_Path,
'MAX_CNT' : 5,
'OUT_DIR': 'data'
}

Some of the key entries from the above scripts are as follows –

'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
'TEMP_VAL': 0.2,

From the above code snippet, one can understand that we need both the API keys for YouTube & OpenAI. And they have separate costs & usage, which I’ll share later in the post. Also, notice that the temperature sets to 0.2 ( range between 0 to 1). That means our AI bot will be consistent in response. And our application will use the GPT-3.5-turbo model for its analytic response.

  • clsTemplate.py (Contains all the templates for OpenAI.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On: 28-May-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the template for ####
#### OpenAI prompts to get the correct ####
#### response. ####
#### ####
################################################
# Template to use for the system message prompt
templateVal_1 = """
You are a helpful assistant that that can answer questions about youtube videos
based on the video's transcript: {docs}
Only use the factual information from the transcript to answer the question.
If you feel like you don't have enough information to answer the question, say "I don't know".
Your answers should be verbose and detailed.
"""

view raw

clsTemplate.py

hosted with ❤ by GitHub

The above code is self-explanatory. Here, we’re keeping the correct instructions for our OpenAI to respond within these guidelines.

  • clsVideoContentScrapper.py (Main class to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On 28-May-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python class that will invoke the ####
#### LangChain of package to extract ####
#### the transcript from the YouTube videos & ####
#### then answer the questions based on the ####
#### topics selected by the users. ####
#### ####
#####################################################
from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts.chat import (
ChatPromptTemplate,
SystemMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from googleapiclient.discovery import build
import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf
import os
###############################################
### Global Section ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)
YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### End of Global Section ###
###############################################
class clsVideoContentScrapper:
def __init__(self):
self.model_name = cf.conf['MODEL_NAME']
self.temp_val = cf.conf['TEMP_VAL']
self.max_cnt = int(cf.conf['MAX_CNT'])
def createDBFromYoutubeVideoUrl(self, video_url):
try:
loader = YoutubeLoader.from_youtube_url(video_url)
transcript = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
docs = text_splitter.split_documents(transcript)
db = FAISS.from_documents(docs, embeddings)
return db
except Exception as e:
x = str(e)
print('Error: ', x)
return ''
def getResponseFromQuery(self, db, query, k=4):
try:
"""
gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
the number of tokens to analyze.
"""
mod_name = self.model_name
temp_val = self.temp_val
docs = db.similarity_search(query, k=k)
docs_page_content = " ".join([d.page_content for d in docs])
chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)
# Template to use for the system message prompt
template = ct.templateVal_1
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
# Human question prompt
human_template = "Answer the following question: {question}"
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
chat_prompt = ChatPromptTemplate.from_messages(
[system_message_prompt, human_message_prompt]
)
chain = LLMChain(llm=chat, prompt=chat_prompt)
response = chain.run(question=query, docs=docs_page_content)
response = response.replace("\n", "")
return response, docs
except Exception as e:
x = str(e)
print('Error: ', x)
return '', ''
def topFiveURLFromYouTube(self, service, **kwargs):
try:
video_urls = []
channel_list = []
results = service.search().list(**kwargs).execute()
for item in results['items']:
print("Title: ", item['snippet']['title'])
print("Description: ", item['snippet']['description'])
channel = item['snippet']['channelId']
print("Channel Id: ", channel)
# Fetch the channel name using the channel ID
channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
channel_title = channel_response['items'][0]['snippet']['title']
print("Channel Title: ", channel_title)
channel_list.append(channel_title)
print("Video Id: ", item['id']['videoId'])
vidURL = "https://www.youtube.com/watch?v=&quot; + item['id']['videoId']
print("Video URL: " + vidURL)
video_urls.append(vidURL)
print("\n")
return video_urls, channel_list
except Exception as e:
video_urls = []
channel_list = []
x = str(e)
print('Error: ', x)
return video_urls, channel_list
def extractContentInText(self, topic, query):
try:
discussedTopic = []
strKeyText = ''
cnt = 0
max_cnt = self.max_cnt
urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
print('Returned List: ')
print(urlList)
print()
for video_url in urlList:
print('Processing Video: ')
print(video_url)
db = self.createDBFromYoutubeVideoUrl(video_url)
response, docs = self.getResponseFromQuery(db, query)
if len(response) > 0:
strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
discussedTopic.append(strKeyText + response)
cnt += 1
return discussedTopic
except Exception as e:
discussedTopic = []
x = str(e)
print('Error: ', x)
return discussedTopic

Let us understand the key methods step by step in detail –

def topFiveURLFromYouTube(self, service, **kwargs):
    try:
        video_urls = []
        channel_list = []
        results = service.search().list(**kwargs).execute()

        for item in results['items']:
            print("Title: ", item['snippet']['title'])
            print("Description: ", item['snippet']['description'])
            channel = item['snippet']['channelId']
            print("Channel Id: ", channel)

            # Fetch the channel name using the channel ID
            channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
            channel_title = channel_response['items'][0]['snippet']['title']
            print("Channel Title: ", channel_title)
            channel_list.append(channel_title)

            print("Video Id: ", item['id']['videoId'])
            vidURL = "https://www.youtube.com/watch?v=" + item['id']['videoId']
            print("Video URL: " + vidURL)
            video_urls.append(vidURL)
            print("\n")

        return video_urls, channel_list

    except Exception as e:
        video_urls = []
        channel_list = []
        x = str(e)
        print('Error: ', x)

        return video_urls, channel_list

The above code will fetch the most relevant YouTube URLs & bind them into a list along with the channel names & then share the lists with the main functions.

def createDBFromYoutubeVideoUrl(self, video_url):
    try:
        loader = YoutubeLoader.from_youtube_url(video_url)
        transcript = loader.load()

        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
        docs = text_splitter.split_documents(transcript)

        db = FAISS.from_documents(docs, embeddings)
        return db

    except Exception as e:
        x = str(e)
        print('Error: ', x)
        return ''

The provided Python code defines a function createDBFromYoutubeVideoUrl which appears to create a database of text documents from the transcript of a YouTube video. Here’s the explanation in simple English:

  1. The function createDBFromYoutubeVideoUrl has defined with one argument: video_url.
  2. The function uses a try-except block to handle any potential exceptions or errors that may occur.
  3. Inside the try block, the following steps are going to perform:
  • First, it creates a YoutubeLoader object from the provided video_url. This object is likely responsible for interacting with the YouTube video specified by the URL.
  • The loader object then loads the transcript of the video. This object is the text version of everything spoken in the video.
  • It then creates a RecursiveCharacterTextSplitter object with a specified chunk_size of 1000 and chunk_overlap of 100. This object may split the transcript into smaller chunks (documents) of text for easier processing or analysis. Each piece will be around 1000 characters long, and there will overlap of 100 characters between consecutive chunks.
  • The split_documents method of the text_splitter object will split the transcript into smaller documents. These documents are stored in the docs variable.
  • The FAISS.from_documents method is then called with docs and embeddings as arguments to create a FAISS (Facebook AI Similarity Search) index. This index is a database used for efficient similarity search and clustering of high-dimensional vectors, which in this case, are the embeddings of the documents. The FAISS index is stored in the db variable.
  • Finally, the db variable is returned, representing the created database from the video transcript.

4. If an exception occurs during the execution of the try block, the code execution moves to the except block:

  • Here, it first converts the exception e to a string x.
  • Then it prints an error message.
  • Finally, it returns an empty string as an indication of the error.

def getResponseFromQuery(self, db, query, k=4):
      try:
          """
          gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
          the number of tokens to analyze.
          """

          mod_name = self.model_name
          temp_val = self.temp_val

          docs = db.similarity_search(query, k=k)
          docs_page_content = " ".join([d.page_content for d in docs])

          chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)

          # Template to use for the system message prompt
          template = ct.templateVal_1

          system_message_prompt = SystemMessagePromptTemplate.from_template(template)

          # Human question prompt
          human_template = "Answer the following question: {question}"
          human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)

          chat_prompt = ChatPromptTemplate.from_messages(
              [system_message_prompt, human_message_prompt]
          )

          chain = LLMChain(llm=chat, prompt=chat_prompt)

          response = chain.run(question=query, docs=docs_page_content)
          response = response.replace("\n", "")
          return response, docs

      except Exception as e:
          x = str(e)
          print('Error: ', x)

          return '', ''

The Python function getResponseFromQuery is designed to search a given database (db) for a specific query and then generate a response using a language model (possibly GPT-3.5-turbo). The answer is based on the content found and the particular question. Here is a simple English summary:

  1. The function getResponseFromQuery takes three parameters: db, query, and k. The k parameter is optional and defaults to 4 if not provided. db is the database to search, the query is the question or prompts to analyze, and k is the number of similar items to return.
  2. The function initiates a try-except block for handling any errors that might occur.
  3. Inside the try block:
  • The function retrieves the model name and temperature value from the instance of the class this function is a part of.
  • The function then searches the db database for documents similar to the query and saves these in docs.
  • It concatenates the content of the returned documents into a single string docs_page_content.
  • It creates a ChatOpenAI object with the model name and temperature value.
  • It creates a system message prompt from a predefined template.
  • It creates a human message prompt, which is the query.
  • It combines these two prompts to form a chat prompt.
  • An LLMChain object is then created using the ChatOpenAI object and the chat prompt.
  • This LLMChain object is used to generate a response to the query using the content of the documents found in the database. The answer is then formatted by replacing all newline characters with empty strings.
  • Finally, the function returns this response along with the original documents.
  1. If any error occurs during these operations, the function goes to the except block where:
  • The error message is printed.
  • The function returns two empty strings to indicate an error occurred, and no response or documents could be produced.

def extractContentInText(self, topic, query):
    try:
        discussedTopic = []
        strKeyText = ''
        cnt = 0
        max_cnt = self.max_cnt

        urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
        print('Returned List: ')
        print(urlList)
        print()

        for video_url in urlList:
            print('Processing Video: ')
            print(video_url)
            db = self.createDBFromYoutubeVideoUrl(video_url)

            response, docs = self.getResponseFromQuery(db, query)

            if len(response) > 0:
                strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
                discussedTopic.append(strKeyText + response)

            cnt += 1

        return discussedTopic
    except Exception as e:
        discussedTopic = []
        x = str(e)
        print('Error: ', x)

        return discussedTopic

This Python function, extractContentInText, is aimed to extract relevant content from the transcripts of top YouTube videos on a specific topic and generate responses to a given query. Here’s a simple English translation:

  1. The function extractContentInText is defined with topic and query as parameters.
  2. It begins with a try-except block to catch and handle any possible exceptions.
  3. In the try block:
  • It initializes several variables: an empty list discussedTopic to store the extracted information, an empty string strKeyText to keep specific parts of the content, a counter cnt initialized at 0, and max_cnt retrieved from the self-object to specify the maximum number of YouTube videos to consider.
  • It calls the topFiveURLFromYouTube function (defined previously) to get the URLs of the top videos on the given topic from YouTube. It also retrieves the list of channel names associated with these videos.
  • It prints the returned list of URLs.
  • Then, it starts a loop over each URL in the urlList.
    • For each URL, it prints the URL, then creates a database from the transcript of the YouTube video using the function createDBFromYoutubeVideoUrl.
    • It then uses the getResponseFromQuery function to get a response to the query based on the content of the database.
    • If the length of the response is greater than 0 (meaning there is a response), it forms a string strKeyText to indicate the channel that the topic was discussed on and then appends the answer to this string. This entire string is then added to the discussedTopic list.
    • It increments the counter cnt by one after each iteration.
    • Finally, it returns the discussedTopic list, which now contains relevant content extracted from the videos.
  1. If any error occurs during these operations, the function goes into the except block:
  • It first resets discussedTopic to an empty list.
  • Then it converts the exception e to a string and prints the error message.
  • Lastly, it returns the empty discussedTopic list, indicating that no content could be extracted due to the error.
  • testLangChain.py (Main Python script to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On 28-May-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsVideoContentScrapper class to extract ####
#### the transcript from the YouTube videos. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import textwrap
import clsVideoContentScrapper as cvsc
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
cVCScrapper = cvsc.clsVideoContentScrapper()
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
#query = "What are they saying about Microsoft?"
print('Please share your topic!')
inputTopic = input('User: ')
print('Please ask your questions?')
inputQry = input('User: ')
print()
retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
cnt = 0
for discussedTopic in retList:
finText = str(cnt + 1) + ') ' + discussedTopic
print()
print(textwrap.fill(finText, width=150))
cnt += 1
r1 = len(retList)
if r1 > 0:
print()
print('Successfully Scrapped!')
else:
print()
print('Failed to Scrappe!')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Please find the key snippet –

def main():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        #query = "What are they saying about Microsoft?"
        print('Please share your topic!')
        inputTopic = input('User: ')
        print('Please ask your questions?')
        inputQry = input('User: ')
        print()

        retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
        cnt = 0

        for discussedTopic in retList:
            finText = str(cnt + 1) + ') ' + discussedTopic
            print()
            print(textwrap.fill(finText, width=150))

            cnt += 1

        r1 = len(retList)

        if r1 > 0:
            print()
            print('Successfully Scrapped!')
        else:
            print()
            print('Failed to Scrappe!')

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == "__main__":
    main()

The above main application will capture the topics from the user & then will give the user a chance to ask specific questions on the topics, invoking the main class to extract the transcript from YouTube & then feed it as a source using ChainLang & finally deliver the response. If there is no response, then it will skip the overall options.

USAGE & COST FACTOR:

Please find the OpenAI usage –

Please find the YouTube API usage –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality. Sample video taken from Santrel Media & you would find the link over here.

Tuning your model using the python-based low-code machine-learning library PyCaret

Today, I’ll discuss another important topic before I will share the excellent use case next month, as I still need some time to finish that one. We’ll see how we can leverage the brilliant capability of a low-code machine-learning library named PyCaret.

But before going through the details, why don’t we view the demo & then go through it?

Demo

Architecture:

Let us understand the flow of events –

As one can see, the initial training requests are triggered from the PyCaret-driven training models. And the application can successfully process & identify the best models out of the other combinations.

Python Packages:

Following are the python packages that are necessary to develop this use case –

pip install pandas
pip install pycaret

PyCaret is dependent on a combination of other popular python packages. So, you need to install them successfully to run this package.

CODE:

  • clsConfigClient.py (Main configuration file)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 31-Mar-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal AI-driven voice assistant. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'PyCaret Training!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Titanic.csv',
'MODEL_NAME': 'PyCaret-ft-personal-2023-03-31-04-29-53',
'TITLE': "PyCaret Training!",
'PATH' : Curr_Path,
'OUT_DIR': 'data'
}

I’m skipping this section as it is self-explanatory.


  • clsTrainModel.py (This is the main class that contains the core logic of low-code machine-learning library to evaluate the best model for your solutions.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 31-Mar-2023 ####
#### Modified On 31-Mar-2023 ####
#### ####
#### Objective: This is the main class that ####
#### contains the core logic of low-code ####
#### machine-learning library to evaluate the ####
#### best model for your solutions. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
# Import necessary libraries
import pandas as p
from pycaret.classification import *
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
###############################################
### End of Global Section ###
###############################################
class clsTrainModel:
def __init__(self):
self.model_path = cf.conf['MODEL_PATH']
self.model_name = cf.conf['MODEL_NAME']
def trainModel(self, FullFileName):
try:
df = p.read_csv(FullFileName)
row_count = int(df.shape[0])
print('Number of rows: ', str(row_count))
print(df)
# Initialize the setup in PyCaret
clf_setup = setup(
data=df,
target="Survived",
train_size=0.8, # 80% for training, 20% for testing
categorical_features=["Sex", "Embarked"],
ordinal_features={"Pclass": ["1", "2", "3"]},
ignore_features=["Name", "Ticket", "Cabin", "PassengerId"],
#silent=True, # Set to False for interactive setup
)
# Compare various models
best_model = compare_models()
# Create a specific model (e.g., Random Forest)
rf_model = create_model("rf")
# Hyperparameter tuning
tuned_rf_model = tune_model(rf_model)
# Evaluate model performance
plot_model(tuned_rf_model, plot="confusion_matrix")
plot_model(tuned_rf_model, plot="auc")
# Finalize the model (train on the complete dataset)
final_rf_model = finalize_model(tuned_rf_model)
# Make predictions on new data
new_data = df.drop("Survived", axis=1)
predictions = predict_model(final_rf_model, data=new_data)
# Writing into the Model
FullModelName = self.model_path + self.model_name
print('Model Output @:: ', str(FullModelName))
print()
# Save the fine-tuned model
save_model(final_rf_model, FullModelName)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Let us understand the code in simple terms –

  1. Import necessary libraries and load the Titanic dataset.
  2. Initialize the PyCaret setup, specifying the target variable, train-test split, categorical and ordinal features, and features to ignore.
  3. Compare various models to find the best-performing one.
  4. Create a specific model (Random Forest in this case).
  5. Perform hyper-parameter tuning on the Random Forest model.
  6. Evaluate the model’s performance using a confusion matrix and AUC-ROC curve.
  7. Finalize the model by training it on the complete dataset.
  8. Make predictions on new data.
  9. Save the trained model for future use.

  • trainPYCARETModel.py (This is the main calling python script that will invoke the training class of PyCaret package.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 31-Mar-2023 ####
#### Modified On 31-Mar-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### training class of Pycaret package. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTrainModel as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
tModel = tm.clsTrainModel()
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)
if r1 == 0:
print('Successfully Trained!')
else:
print('Failed to Train!')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The above code is pretty self-explanatory as well.


  • testPYCARETModel.py (This is the main calling python script that will invoke the testing script for PyCaret package.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 31-Mar-2023 ####
#### Modified On 31-Mar-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### testing script for PyCaret package. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
from pycaret.classification import load_model, predict_model
import pandas as p
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
model_path = cf.conf['MODEL_PATH']
model_name = cf.conf['MODEL_NAME']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = model_path + model_name
# Load the saved model
loaded_model = load_model(FullFileName)
# Prepare new data for testing (make sure it has the same columns as the original data)
new_data = p.DataFrame({
"Pclass": [3, 1],
"Sex": ["male", "female"],
"Age": [22, 38],
"SibSp": [1, 1],
"Parch": [0, 0],
"Fare": [7.25, 71.2833],
"Embarked": ["S", "C"]
})
# Make predictions using the loaded model
predictions = predict_model(loaded_model, data=new_data)
# Display the predictions
print(predictions)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

In this code, the application uses the stored model & then forecasts based on the optimized PyCaret model tuning.

Conclusion:

The above code demonstrates an end-to-end binary classification pipeline using the PyCaret library for the Titanic dataset. The goal is to predict whether a passenger survived based on the available features. Here are some conclusions you can draw from the code and data:

  1. Ease of use: The code showcases how PyCaret simplifies the machine learning process, from data preprocessing to model training, evaluation, and deployment. With just a few lines of code, you can perform tasks that would require much more effort using lower-level libraries.
  2. Model selection: The compare_models() function provides a quick and easy way to compare various machine learning algorithms and identify the best-performing one based on the chosen evaluation metric (accuracy by default). This selection helps you select a suitable model for the given problem.
  3. Hyper-parameter tuning: The tune_model() function automates the process of hyper-parameter tuning to improve model performance. We tuned a Random Forest model to optimize its predictive power in the example.
  4. Model evaluation: PyCaret provides several built-in visualization tools for assessing model performance. In the example, we used a confusion matrix and AUC-ROC curve to evaluate the performance of the tuned Random Forest model.
  5. Model deployment: The example demonstrates how to make predictions using the trained model and save the model for future use. This deployment showcases how PyCaret can streamline the process of deploying a machine-learning model in a production environment.

It is important to note that the conclusions drawn from the code and data are specific to the Titanic dataset and the chosen features. Adjust the feature engineering, preprocessing, and model selection steps for different datasets or problems accordingly. However, the general workflow and benefits provided by PyCaret would remain the same.


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Realtime reading from a Streaming using Computer Vision

This week we’re going to extend one of our earlier posts & trying to read an entire text from streaming using computer vision. If you want to view the previous post, please click the following link.

But, before we proceed, why don’t we view the demo first?

Demo

Architecture:

Let us understand the architecture flow –

Architecture flow

The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & extracts the complete text within the video & displays it on top of the target screen besides prints the same in the console.

Python Packages:

pip install imutils==0.5.4
pip install matplotlib==3.5.2
pip install numpy==1.21.6
pip install opencv-contrib-python==4.6.0.66
pip install opencv-contrib-python-headless==4.6.0.66
pip install opencv-python==4.6.0.66
pip install opencv-python-headless==4.6.0.66
pip install pandas==1.3.5
pip install Pillow==9.1.1
pip install pytesseract==0.3.9
pip install python-dateutil==2.8.2

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsReadingTextFromStream.py (This is the main class of python script that will extract the text from the WebCAM streaming in real-time.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jul-2022 ####
#### Modified On 25-Jul-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will invoke the ####
#### extraction of texts from a WebCAM. ####
#### ####
##################################################
# Importing necessary packages
from clsConfig import clsConfig as cf
from imutils.object_detection import non_max_suppression
import numpy as np
import pytesseract
import imutils
import time
import cv2
import time
###############################################
### Global Section ###
###############################################
# Two output layer names for the text detector model
lNames = cf.conf['LAYER_DET']
# Tesseract OCR text param values
strVal = "-l " + str(cf.conf['LANG']) + " –oem " + str(cf.conf['OEM_VAL']) + " –psm " + str(cf.conf['PSM_VAL']) + ""
config = (strVal)
###############################################
### End of Global Section ###
###############################################
class clsReadingTextFromStream:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.CacheL = int(cf.conf['CACHE_LIM'])
self.modelPath = str(cf.conf['MODEL_PATH']) + str(cf.conf['MODEL_FILE_NAME'])
self.minConf = float(cf.conf['MIN_CONFIDENCE'])
self.wt = int(cf.conf['WIDTH'])
self.ht = int(cf.conf['HEIGHT'])
self.pad = float(cf.conf['PADDING'])
self.title = str(cf.conf['TITLE'])
self.Otitle = str(cf.conf['ORIG_TITLE'])
self.drawTag = cf.conf['DRAW_TAG']
self.aRange = int(cf.conf['ASCII_RANGE'])
self.sParam = cf.conf['SUBTRACT_PARAM']
def findBoundBox(self, boxes, res, rW, rH, orig, origW, origH, pad):
try:
# Loop over the bounding boxes
for (spX, spY, epX, epY) in boxes:
# Scale the bounding box coordinates based on the respective
# ratios
spX = int(spX * rW)
spY = int(spY * rH)
epX = int(epX * rW)
epY = int(epY * rH)
# To obtain a better OCR of the text we can potentially
# apply a bit of padding surrounding the bounding box.
# And, computing the deltas in both the x and y directions
dX = int((epX – spX) * pad)
dY = int((epY – spY) * pad)
# Apply padding to each side of the bounding box, respectively
spX = max(0, spX – dX)
spY = max(0, spY – dY)
epX = min(origW, epX + (dX * 2))
epY = min(origH, epY + (dY * 2))
# Extract the actual padded ROI
roi = orig[spY:epY, spX:epX]
# Choose the proper OCR Config
text = pytesseract.image_to_string(roi, config=config)
# Add the bounding box coordinates and OCR'd text to the list
# of results
res.append(((spX, spY, epX, epY), text))
# Sort the results bounding box coordinates from top to bottom
res = sorted(res, key=lambda r:r[0][1])
return res
except Exception as e:
x = str(e)
print(x)
return res
def predictText(self, imgScore, imgGeo):
try:
minConf = self.minConf
# Initializing the bounding box rectangles & confidence score by
# extracting the rows & columns from the imgScore volume.
(numRows, numCols) = imgScore.shape[2:4]
rects = []
confScore = []
for y in range(0, numRows):
# Extract the imgScore probabilities to derive potential
# bounding box coordinates that surround text
imgScoreData = imgScore[0, 0, y]
xVal0 = imgGeo[0, 0, y]
xVal1 = imgGeo[0, 1, y]
xVal2 = imgGeo[0, 2, y]
xVal3 = imgGeo[0, 3, y]
anglesData = imgGeo[0, 4, y]
for x in range(0, numCols):
# If our score does not have sufficient probability,
# ignore it
if imgScoreData[x] < minConf:
continue
# Compute the offset factor as our resulting feature
# maps will be 4x smaller than the input frame
(offX, offY) = (x * 4.0, y * 4.0)
# Extract the rotation angle for the prediction and
# then compute the sin and cosine
angle = anglesData[x]
cos = np.cos(angle)
sin = np.sin(angle)
# Derive the width and height of the bounding box from
# imgGeo
h = xVal0[x] + xVal2[x]
w = xVal1[x] + xVal3[x]
# Compute both the starting and ending (x, y)-coordinates
# for the text prediction bounding box
epX = int(offX + (cos * xVal1[x]) + (sin * xVal2[x]))
epY = int(offY – (sin * xVal1[x]) + (cos * xVal2[x]))
spX = int(epX – w)
spY = int(epY – h)
# Adding bounding box coordinates and probability score
# to the respective lists
rects.append((spX, spY, epX, epY))
confScore.append(imgScoreData[x])
# return a tuple of the bounding boxes and associated confScore
return (rects, confScore)
except Exception as e:
x = str(e)
print(x)
rects = []
confScore = []
return (rects, confScore)
def processStream(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
CacheL = self.CacheL
modelPath = self.modelPath
minConf = self.minConf
wt = self.wt
ht = self.ht
pad = self.pad
title = self.title
Otitle = self.Otitle
drawTag = self.drawTag
aRange = self.aRange
sParam = self.sParam
val = 0
# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] Starting video stream…")
cap = cv2.VideoCapture(0)
# Loading the pre-trained text detector
print("[INFO] Loading Text Detector…")
net = cv2.dnn.readNet(modelPath)
# Loop over the frames from the video stream
while True:
try:
# Grab the frame from our video stream and resize it
success, frame = cap.read()
orig = frame.copy()
(origH, origW) = frame.shape[:2]
# Setting new width and height and then determine the ratio in change
# for both the width and height
(newW, newH) = (wt, ht)
rW = origW / float(newW)
rH = origH / float(newH)
# Resize the frame and grab the new frame dimensions
frame = cv2.resize(frame, (newW, newH))
(H, W) = frame.shape[:2]
# Construct a blob from the frame and then perform a forward pass of
# the model to obtain the two output layer sets
blob = cv2.dnn.blobFromImage(frame, 1.0, (W, H), sParam, swapRB=True, crop=False)
net.setInput(blob)
(confScore, imgGeo) = net.forward(lNames)
# Decode the predictions, then apply non-maxima suppression to
# suppress weak, overlapping bounding boxes
(rects, confidences) = self.predictText(confScore, imgGeo)
boxes = non_max_suppression(np.array(rects), probs=confidences)
# Initialize the list of results
res = []
# Getting BoundingBox boundaries
res = self.findBoundBox(boxes, res, rW, rH, orig, origW, origH, pad)
for ((spX, spY, epX, epY), text) in res:
# Display the text OCR by using Tesseract APIs
print("Reading Text::")
print("=" *60)
print(text)
print("=" *60)
# Removing the non-ASCII text so it can draw the text on the frame
# using OpenCV, then draw the text and a bounding box surrounding
# the text region of the input frame
text = "".join([c if ord(c) < aRange else "" for c in text]).strip()
output = orig.copy()
cv2.rectangle(output, (spX, spY), (epX, epY), drawTag, 2)
cv2.putText(output, text, (spX, spY – 20), cv2.FONT_HERSHEY_SIMPLEX, 1.2, drawTag, 3)
# Show the output frame
cv2.imshow(title, output)
#cv2.imshow(Otitle, frame)
# If the `q` key was pressed, break from the loop
if cv2.waitKey(1) == ord('q'):
break
val = 0
except Exception as e:
x = str(e)
print(x)
val = 1
# Performing cleanup at the end
cap.release()
cv2.destroyAllWindows()
return val
except Exception as e:
x = str(e)
print('Error:', x)
return 1

Please find the key snippet from the above script –

# Two output layer names for the text detector model

lNames = cf.conf['LAYER_DET']

# Tesseract OCR text param values

strVal = "-l " + str(cf.conf['LANG']) + " --oem " + str(cf.conf['OEM_VAL']) + " --psm " + str(cf.conf['PSM_VAL']) + ""
config = (strVal)

The first line contains the two output layers’ names for the text detector model. Among them, the first one indicates the outcome possibilities & the second one use to derive the bounding box coordinates of the predicted text.

The second line contains various options for the tesseract APIs. You need to understand the opportunities in detail to make them work. These are the essential options for our use case –

  • Language – The intended language, for example, English, Spanish, Hindi, Bengali, etc.
  • OEM flag – In this case, the application will use 4 to indicate LSTM neural net model for OCR.
  • OEM Value – In this case, the selected value is 7, indicating that the application treats the ROI as a single line of text.

For more details, please refer to the config file.

print("[INFO] Loading Text Detector...")
net = cv2.dnn.readNet(modelPath)

The above lines bring the already created model & load it to memory for evaluation.

# Setting new width and height and then determine the ratio in change
# for both the width and height
(newW, newH) = (wt, ht)
rW = origW / float(newW)
rH = origH / float(newH)

# Resize the frame and grab the new frame dimensions
frame = cv2.resize(frame, (newW, newH))
(H, W) = frame.shape[:2]

# Construct a blob from the frame and then perform a forward pass of
# the model to obtain the two output layer sets
blob = cv2.dnn.blobFromImage(frame, 1.0, (W, H), sParam, swapRB=True, crop=False)
net.setInput(blob)
(confScore, imgGeo) = net.forward(lNames)

# Decode the predictions, then apply non-maxima suppression to
# suppress weak, overlapping bounding boxes
(rects, confidences) = self.predictText(confScore, imgGeo)
boxes = non_max_suppression(np.array(rects), probs=confidences)

The above lines are more of preparing individual frames to get the bounding box by resizing the height & width followed by a forward pass of the model to obtain two output layer sets. And then apply the non-maxima suppression to remove the weak, overlapping bounding box by interpreting the prediction. In short, this will identify the potential text region & put the bounding box surrounding it.

# Initialize the list of results
res = []

# Getting BoundingBox boundaries
res = self.findBoundBox(boxes, res, rW, rH, orig, origW, origH, pad)

The above function will create the bounding box surrounding the predicted text regions. Also, we will capture the expected text inside the result variable.

for (spX, spY, epX, epY) in boxes:
  # Scale the bounding box coordinates based on the respective
  # ratios
  spX = int(spX * rW)
  spY = int(spY * rH)
  epX = int(epX * rW)
  epY = int(epY * rH)

  # To obtain a better OCR of the text we can potentially
  # apply a bit of padding surrounding the bounding box.
  # And, computing the deltas in both the x and y directions
  dX = int((epX - spX) * pad)
  dY = int((epY - spY) * pad)

  # Apply padding to each side of the bounding box, respectively
  spX = max(0, spX - dX)
  spY = max(0, spY - dY)
  epX = min(origW, epX + (dX * 2))
  epY = min(origH, epY + (dY * 2))

  # Extract the actual padded ROI
  roi = orig[spY:epY, spX:epX]

Now, the application will scale the bounding boxes based on the previously computed ratio for actual text recognition. In this process, the application also padded the bounding boxes & then extracted the padded region of interest.

# Choose the proper OCR Config
text = pytesseract.image_to_string(roi, config=config)

# Add the bounding box coordinates and OCR'd text to the list
# of results
res.append(((spX, spY, epX, epY), text))

Using OCR options, the application extracts the text within the video frame & adds that to the res list.

# Sort the results bounding box coordinates from top to bottom
res = sorted(res, key=lambda r:r[0][1])

It then sends a sorted output to the primary calling functions.

for ((spX, spY, epX, epY), text) in res:
  # Display the text OCR by using Tesseract APIs
  print("Reading Text::")
  print("=" *60)
  print(text)
  print("=" *60)

  # Removing the non-ASCII text so it can draw the text on the frame
  # using OpenCV, then draw the text and a bounding box surrounding
  # the text region of the input frame
  text = "".join([c if ord(c) < aRange else "" for c in text]).strip()
  output = orig.copy()

  cv2.rectangle(output, (spX, spY), (epX, epY), drawTag, 2)
  cv2.putText(output, text, (spX, spY - 20), cv2.FONT_HERSHEY_SIMPLEX, 1.2, drawTag, 3)

  # Show the output frame
  cv2.imshow(title, output)

Finally, it fetches the potential text region along with the text & then prints on top of the source video. Also, it removed some non-printable characters during this time to avoid any cryptic texts.

  • readingVideo.py (Main calling script.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jul-2022 ####
#### Modified On 25-Jul-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsReadingTextFromStream class to initiate ####
#### the reading capability in real-time ####
#### & display text via Web-CAM. ####
#####################################################
# We keep the setup code in a different class as shown below.
import clsReadingTextFromStream as rtfs
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the main class
x1 = rtfs.clsReadingTextFromStream()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'readingTextFromVideo.log', level=logging.INFO)
print('Started reading text from videos!')
# Execute all the pass
r1 = x1.processStream(debugInd, var)
if (r1 == 0):
print('Successfully read text from the Live Stream!')
else:
print('Failed to read text from the Live Stream!')
var2 = datetime.datetime.now()
c = var2 – var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

view raw

readingVideo.py

hosted with ❤ by GitHub

Please find the key snippet –

# Instantiating all the main class

x1 = rtfs.clsReadingTextFromStream()

# Execute all the pass
r1 = x1.processStream(debugInd, var)

if (r1 == 0):
    print('Successfully read text from the Live Stream!')
else:
    print('Failed to read text from the Live Stream!')

The above lines instantiate the main calling class & then invoke the function to get the desired extracted text from the live streaming video if that is successful.

FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

You will get the complete codebase in the following Github link.

Unfortunately, I cannot upload the model due to it’s size. I will share on the need basis.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Real-time augmented reality (AR) using Python-based Computer Vision

Hi Team,

Today, I’m going to discuss another Computer Vision installment. I’ll discuss how to implement Augmented Reality using Open-CV Computer Vision with full audio. We will be using part of a Bengali OTT Series called “Feludar Goendagiri” entirely for educational purposes & also as a tribute to the great legendary director, late Satyajit Roy. To know more about him, please click the following link.

Why don’t we see the demo first before jumping into the technical details?


Demo

Architecture:

Let us understand the architecture –

Process Flow

The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & blends that with the video trailer. Finally, it creates another video by correctly mixing the source audio.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install opencv-python
pip install pygame

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsAugmentedReality.py (This is the main class of python script that will embed the source video with the WebCAM streams in real-time.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 20-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will embed the source ####
#### video with the WebCAM streams in ####
#### real-time. ####
##################################################
# Importing necessary packages
import numpy as np
import cv2
from clsConfig import clsConfig as cf
# Initialize our cached reference points
CACHED_REF_PTS = None
class clsAugmentedReality:
def __init__(self):
self.TOP_LEFT_X = int(cf.conf['TOP_LEFT_X'])
self.TOP_LEFT_Y = int(cf.conf['TOP_LEFT_Y'])
self.TOP_RIGHT_X = int(cf.conf['TOP_RIGHT_X'])
self.TOP_RIGHT_Y = int(cf.conf['TOP_RIGHT_Y'])
self.BOTTOM_RIGHT_X = int(cf.conf['BOTTOM_RIGHT_X'])
self.BOTTOM_RIGHT_Y = int(cf.conf['BOTTOM_RIGHT_Y'])
self.BOTTOM_LEFT_X = int(cf.conf['BOTTOM_LEFT_X'])
self.BOTTOM_LEFT_Y = int(cf.conf['BOTTOM_LEFT_Y'])
def getWarpImages(self, frame, source, cornerIDs, arucoDict, arucoParams, zoomFlag, useCache=False):
try:
# Assigning values
TOP_LEFT_X = self.TOP_LEFT_X
TOP_LEFT_Y = self.TOP_LEFT_Y
TOP_RIGHT_X = self.TOP_RIGHT_X
TOP_RIGHT_Y = self.TOP_RIGHT_Y
BOTTOM_RIGHT_X = self.BOTTOM_RIGHT_X
BOTTOM_RIGHT_Y = self.BOTTOM_RIGHT_Y
BOTTOM_LEFT_X = self.BOTTOM_LEFT_X
BOTTOM_LEFT_Y = self.BOTTOM_LEFT_Y
# Grab a reference to our cached reference points
global CACHED_REF_PTS
if source is None:
raise
# Grab the width and height of the frame and source image,
# respectively
# Extracting Frame from Camera
# Exracting Source from Video
(imgH, imgW) = frame.shape[:2]
(srcH, srcW) = source.shape[:2]
# Detect Aruco markers in the input frame
(corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)
print('Ids: ', str(ids))
print('Rejected: ', str(rejected))
# if we *did not* find our four ArUco markers, initialize an
# empty IDs list, otherwise flatten the ID list
print('Detecting Corners: ', str(len(corners)))
ids = np.array([]) if len(corners) != 4 else ids.flatten()
# Initialize our list of reference points
refPts = []
refPtTL1 = []
# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
# Grab the index of the corner with the current ID
j = np.squeeze(np.where(ids == i))
# If we receive an empty list instead of an integer index,
# then we could not find the marker with the current ID
if j.size == 0:
continue
# Otherwise, append the corner (x, y)-coordinates to our list
# of reference points
corner = np.squeeze(corners[j])
refPts.append(corner)
# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
# If we are allowed to use cached reference points, fall
# back on them
if useCache and CACHED_REF_PTS is not None:
refPts = CACHED_REF_PTS
# Otherwise, we cannot use the cache and/or there are no
# previous cached reference points, so return early
else:
return None
# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
CACHED_REF_PTS = refPts
# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)
# For zoom option recalculating all the 4 points
refPtTL1_L_X = refPtTL[0][0]-TOP_LEFT_X
refPtTL1_L_Y = refPtTL[0][1]-TOP_LEFT_Y
refPtTL1.append((refPtTL1_L_X,refPtTL1_L_Y))
refPtTL1_R_X = refPtTL[1][0]+TOP_RIGHT_X
refPtTL1_R_Y = refPtTL[1][1]+TOP_RIGHT_Y
refPtTL1.append((refPtTL1_R_X,refPtTL1_R_Y))
refPtTD1_L_X = refPtTL[2][0]+BOTTOM_RIGHT_X
refPtTD1_L_Y = refPtTL[2][1]+BOTTOM_RIGHT_Y
refPtTL1.append((refPtTD1_L_X,refPtTD1_L_Y))
refPtTD1_R_X = refPtTL[3][0]-BOTTOM_LEFT_X
refPtTD1_R_Y = refPtTL[3][1]+BOTTOM_LEFT_Y
refPtTL1.append((refPtTD1_R_X,refPtTD1_R_Y))
dstMatMod = [refPtTL1[0], refPtTL1[1], refPtTL1[2], refPtTL1[3]]
dstMatMod = np.array(dstMatMod)
# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])
# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
(H, _) = cv2.findHomography(srcMat, dstMat)
else:
(H, _) = cv2.findHomography(srcMat, dstMatMod)
warped = cv2.warpPerspective(source, H, (imgW, imgH))
# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source
# image into the destination)
mask = np.zeros((imgH, imgW), dtype="uint8")
if zoomFlag == 1:
cv2.fillConvexPoly(mask, dstMat.astype("int32"), (255, 255, 255), cv2.LINE_AA)
else:
cv2.fillConvexPoly(mask, dstMatMod.astype("int32"), (255, 255, 255), cv2.LINE_AA)
# This optional step will give the source image a black
# border surrounding it when applied to the source image, you
# can apply a dilation operation
rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.dilate(mask, rect, iterations=2)
# Create a three channel version of the mask by stacking it
# depth-wise, such that we can copy the warped source image
# into the input image
maskScaled = mask.copy() / 255.0
maskScaled = np.dstack([maskScaled] * 3)
# Copy the warped source image into the input image by
# (1) Multiplying the warped image and masked together,
# (2) Then multiplying the original input image with the
# mask (giving more weight to the input where there
# are not masked pixels), and
# (3) Adding the resulting multiplications together
warpedMultiplied = cv2.multiply(warped.astype("float"), maskScaled)
imageMultiplied = cv2.multiply(frame.astype(float), 1.0 – maskScaled)
output = cv2.add(warpedMultiplied, imageMultiplied)
output = output.astype("uint8")
# Return the output frame to the calling function
return output
except Exception as e:
# Delibarately raising the issue
# That way the control goes to main calling methods
# exception section
raise

Please find the key snippet from the above script –

(imgH, imgW) = frame.shape[:2]
(srcH, srcW) = source.shape[:2]

# Detect Aruco markers in the input frame
(corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)

Identifying the Aruco markers are key here. The above lines help the program detect all four corners.

However, let us discuss more on the Aruco markers & strategies that I’ve used for several different surfaces.

As you can see, the right-hand side Aruco marker is tiny compared to the left one. Hence, that one will be ideal for a curve surface like Coffee Mug, Bottle rather than a flat surface.

Also, we’ve demonstrated the zoom capability with the smaller Aruco marker that will Augment almost double the original surface area.

Let us understand why we need that; as you know, any spherical surface like a bottle is round-shaped. Hence, detecting relatively more significant Aruco markers in four corners will be difficult for any camera to identify.

Hence, we need a process where close four corners can be extrapolated mathematically to relatively larger projected areas easily detectable by any WebCAM.

Let’s observe the following figure –

Simulated Extrapolated corners

As you can see that the original position of the four corners is represented using the following points, i.e., (x1, y1), (x2, y2), (x3, y3) & (x4, y4).

And these positions are very close to each other. Hence, it will be easier for the camera to detect all the points (like a plain surface) without many retries.

And later, you can add specific values of x & y to them to get the derived four corners as shown in the above figures through the following points, i.e. (x1.1, y1.1), (x2.1, y2.1), (x3.1, y3.1) & (x4.1, y4.1).

# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
  # Grab the index of the corner with the current ID
  j = np.squeeze(np.where(ids == i))

  # If we receive an empty list instead of an integer index,
  # then we could not find the marker with the current ID
  if j.size == 0:
    continue

  # Otherwise, append the corner (x, y)-coordinates to our list
  # of reference points
  corner = np.squeeze(corners[j])
  refPts.append(corner)

# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
  # If we are allowed to use cached reference points, fall
  # back on them
  if useCache and CACHED_REF_PTS is not None:
    refPts = CACHED_REF_PTS

  # Otherwise, we cannot use the cache and/or there are no
  # previous cached reference points, so return early
  else:
    return None

# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
  CACHED_REF_PTS = refPts

# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)

In the above snippet, the application will scan through all the points & try to detect Aruco markers & then create a list of reference points, which will later be used to define the destination transformation matrix.

# For zoom option recalculating all the 4 points
refPtTL1_L_X = refPtTL[0][0]-TOP_LEFT_X
refPtTL1_L_Y = refPtTL[0][1]-TOP_LEFT_Y

refPtTL1.append((refPtTL1_L_X,refPtTL1_L_Y))

refPtTL1_R_X = refPtTL[1][0]+TOP_RIGHT_X
refPtTL1_R_Y = refPtTL[1][1]+TOP_RIGHT_Y

refPtTL1.append((refPtTL1_R_X,refPtTL1_R_Y))

refPtTD1_L_X = refPtTL[2][0]+BOTTOM_RIGHT_X
refPtTD1_L_Y = refPtTL[2][1]+BOTTOM_RIGHT_Y

refPtTL1.append((refPtTD1_L_X,refPtTD1_L_Y))

refPtTD1_R_X = refPtTL[3][0]-BOTTOM_LEFT_X
refPtTD1_R_Y = refPtTL[3][1]+BOTTOM_LEFT_Y

refPtTL1.append((refPtTD1_R_X,refPtTD1_R_Y))

dstMatMod = [refPtTL1[0], refPtTL1[1], refPtTL1[2], refPtTL1[3]]
dstMatMod = np.array(dstMatMod)

The above snippets calculate the revised points for the zoom-out capabilities as discussed in one of the earlier figures.

# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])

The above snippet will create a transformation matrix for the video trailer.

# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
  (H, _) = cv2.findHomography(srcMat, dstMat)
else:
  (H, _) = cv2.findHomography(srcMat, dstMatMod)

warped = cv2.warpPerspective(source, H, (imgW, imgH))

# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source
# image into the destination)
mask = np.zeros((imgH, imgW), dtype="uint8")
if zoomFlag == 1:
  cv2.fillConvexPoly(mask, dstMat.astype("int32"), (255, 255, 255), cv2.LINE_AA)
else:
  cv2.fillConvexPoly(mask, dstMatMod.astype("int32"), (255, 255, 255), cv2.LINE_AA)

# This optional step will give the source image a black
# border surrounding it when applied to the source image, you
# can apply a dilation operation
rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.dilate(mask, rect, iterations=2)

# Create a three channel version of the mask by stacking it
# depth-wise, such that we can copy the warped source image
# into the input image
maskScaled = mask.copy() / 255.0
maskScaled = np.dstack([maskScaled] * 3)

# Copy the warped source image into the input image by
# (1) Multiplying the warped image and masked together,
# (2) Then multiplying the original input image with the
#     mask (giving more weight to the input where there
#     are not masked pixels), and
# (3) Adding the resulting multiplications together
warpedMultiplied = cv2.multiply(warped.astype("float"), maskScaled)
imageMultiplied = cv2.multiply(frame.astype(float), 1.0 - maskScaled)
output = cv2.add(warpedMultiplied, imageMultiplied)
output = output.astype("uint8")

Finally, depending upon the zoom flag, the application will create a warped image surrounded by an optionally black border.

  • clsEmbedVideoWithStream.py (This is the main class of python script that will invoke the clsAugmentedReality class to initiate augment reality after splitting the audio & video & then project them via the Web-CAM with a seamless broadcast.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will invoke the ####
#### clsAugmentedReality class to initiate ####
#### augment reality after splitting the ####
#### audio & video & then project them via ####
#### the Web-CAM with a seamless broadcast. ####
##################################################
# Importing necessary packages
import clsAugmentedReality as ar
from clsConfig import clsConfig as cf
from imutils.video import VideoStream
from collections import deque
import imutils
import time
import cv2
import subprocess
import os
import pygame
import time
import threading
import sys
###############################################
### Global Section ###
###############################################
# Instantiating the dependant class
x1 = ar.clsAugmentedReality()
###############################################
### End of Global Section ###
###############################################
class BreakLoop(Exception):
pass
class clsEmbedVideoWithStream:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.FileName = str(cf.conf['FILE_NAME'])
self.CacheL = int(cf.conf['CACHE_LIM'])
self.FileName_1 = str(cf.conf['FILE_NAME_1'])
self.audioLen = int(cf.conf['audioLen'])
self.audioFreq = float(cf.conf['audioFreq'])
self.videoFrame = float(cf.conf['videoFrame'])
self.stopFlag=cf.conf['stopFlag']
self.zFlag=int(cf.conf['zoomFlag'])
self.title = str(cf.conf['TITLE'])
def playAudio(self, audioFile, audioLen, freq, stopFlag=False):
try:
pygame.mixer.init()
pygame.init()
pygame.mixer.music.load(audioFile)
pygame.mixer.music.set_volume(10)
val = int(audioLen)
i = 0
while i < val:
pygame.mixer.music.play(loops=0, start=float(i))
time.sleep(freq)
i = i + 1
if (i >= val):
raise BreakLoop
if (stopFlag==True):
raise BreakLoop
return 0
except BreakLoop as s:
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def extractAudio(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def processStream(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
FileName = self.FileName
CacheL = self.CacheL
FileName_1 = self.FileName_1
audioLen = self.audioLen
audioFreq = self.audioFreq
videoFrame = self.videoFrame
stopFlag = self.stopFlag
zFlag = self.zFlag
title = self.title
print('audioFreq:')
print(str(audioFreq))
print('videoFrame:')
print(str(videoFrame))
# Construct the source for Video & Temporary Audio
videoFile = Curr_Path + sep + 'Video' + sep + FileName
audioFile = Curr_Path + sep + 'Video' + sep + FileName_1
# Load the Aruco dictionary and grab the Aruco parameters
print("[INFO] initializing marker detector…")
arucoDict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_ARUCO_ORIGINAL)
arucoParams = cv2.aruco.DetectorParameters_create()
# Initialize the video file stream
print("[INFO] accessing video stream…")
vf = cv2.VideoCapture(videoFile)
x = self.extractAudio(videoFile)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
# Initialize a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)
# We need to have a frame in our queue to start our augmented reality
# pipeline, so read the next frame from our video file source and add
# it to our queue
(grabbed, source) = vf.read()
Q.appendleft(source)
# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] starting video stream…")
vs = VideoStream(src=0).start()
time.sleep(2.0)
flg = 0
t = threading.Thread(target=self.playAudio, args=(audioFile, audioLen, audioFreq, stopFlag,))
t.daemon = True
try:
# Loop over the frames from the video stream
while len(Q) > 0:
try:
# Grab the frame from our video stream and resize it
frame = vs.read()
frame = imutils.resize(frame, width=1020)
# Attempt to find the ArUCo markers in the frame, and provided
# they are found, take the current source image and warp it onto
# input frame using our augmented reality technique
warped = x1.getWarpImages(
frame, source,
cornerIDs=(923, 1001, 241, 1007),
arucoDict=arucoDict,
arucoParams=arucoParams,
zoomFlag=zFlag,
useCache=CacheL > 0)
# If the warped frame is not None, then we know (1) we found the
# four ArUCo markers and (2) the perspective warp was successfully
# applied
if warped is not None:
# Set the frame to the output augment reality frame and then
# grab the next video file frame from our queue
frame = warped
source = Q.popleft()
if flg == 0:
t.start()
flg = flg + 1
# For speed/efficiency, we can use a queue to keep the next video
# frame queue ready for us — the trick is to ensure the queue is
# always (or nearly full)
if len(Q) != Q.maxlen:
# Read the next frame from the video file stream
(grabbed, nextFrame) = vf.read()
# If the frame was read (meaning we are not at the end of the
# video file stream), add the frame to our queue
if grabbed:
Q.append(nextFrame)
# Show the output frame
cv2.imshow(title, frame)
time.sleep(videoFrame)
# If the `q` key was pressed, break from the loop
if cv2.waitKey(2) & 0xFF == ord('q'):
stopFlag = True
break
except BreakLoop:
raise BreakLoop
except Exception as e:
pass
if (len(Q) == Q.maxlen):
time.sleep(2)
break
except BreakLoop as s:
print('Processed completed!')
# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()
except Exception as e:
x = str(e)
print(x)
# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()
return 0
except Exception as e:
x = str(e)
print('Error:', x)
return 1

Please find the key snippet from the above script –

def playAudio(self, audioFile, audioLen, freq, stopFlag=False):
  try:
    pygame.mixer.init()
    pygame.init()
    pygame.mixer.music.load(audioFile)

    pygame.mixer.music.set_volume(10)

    val = int(audioLen)
    i = 0

    while i < val:
      pygame.mixer.music.play(loops=0, start=float(i))
      time.sleep(freq)

      i = i + 1

      if (i >= val):
        raise BreakLoop

      if (stopFlag==True):
        raise BreakLoop

    return 0
  except BreakLoop as s:
    return 0
  except Exception as e:
    x = str(e)
    print(x)

    return 1

The above function will initiate the pygame library to run the sound of the video file that has been extracted as part of a separate process.

def extractAudio(self, video_file, output_ext="mp3"):
    try:
        """Converts video to audio directly using `ffmpeg` command
        with the help of subprocess module"""
        filename, ext = os.path.splitext(video_file)
        subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.STDOUT)

        return 0
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above function temporarily extracts the audio file from the source trailer video.

# Initialize the video file stream
print("[INFO] accessing video stream...")
vf = cv2.VideoCapture(videoFile)

x = self.extractAudio(videoFile)

if x == 0:
    print('Successfully Audio extracted from the source file!')
else:
    print('Failed to extract the source audio!')

# Initialize a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)

# We need to have a frame in our queue to start our augmented reality
# pipeline, so read the next frame from our video file source and add
# it to our queue
(grabbed, source) = vf.read()
Q.appendleft(source)

# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] starting video stream...")
vs = VideoStream(src=0).start()

time.sleep(2.0)
flg = 0

The above snippets read the frames from the video file after invoking the audio extraction. Then, it uses a Queue method to store all the video frames for better performance. And finally, it starts consuming the standard streaming video from the WebCAM to augment the trailer video on top of it.

t = threading.Thread(target=self.playAudio, args=(audioFile, audioLen, audioFreq, stopFlag,))
t.daemon = True

Now, the application has instantiated an orphan thread to spin off the audio play function. The reason is to void the performance & video frame frequency impact on top of it.

while len(Q) > 0:
  try:
    # Grab the frame from our video stream and resize it
    frame = vs.read()
    frame = imutils.resize(frame, width=1020)

    # Attempt to find the ArUCo markers in the frame, and provided
    # they are found, take the current source image and warp it onto
    # input frame using our augmented reality technique
    warped = x1.getWarpImages(
      frame, source,
      cornerIDs=(923, 1001, 241, 1007),
      arucoDict=arucoDict,
      arucoParams=arucoParams,
      zoomFlag=zFlag,
      useCache=CacheL > 0)

    # If the warped frame is not None, then we know (1) we found the
    # four ArUCo markers and (2) the perspective warp was successfully
    # applied
    if warped is not None:
      # Set the frame to the output augment reality frame and then
      # grab the next video file frame from our queue
      frame = warped
      source = Q.popleft()

      if flg == 0:

        t.start()
        flg = flg + 1

    # For speed/efficiency, we can use a queue to keep the next video
    # frame queue ready for us -- the trick is to ensure the queue is
    # always (or nearly full)
    if len(Q) != Q.maxlen:
      # Read the next frame from the video file stream
      (grabbed, nextFrame) = vf.read()

      # If the frame was read (meaning we are not at the end of the
      # video file stream), add the frame to our queue
      if grabbed:
        Q.append(nextFrame)

    # Show the output frame
    cv2.imshow(title, frame)
    time.sleep(videoFrame)

    # If the `q` key was pressed, break from the loop
    if cv2.waitKey(2) & 0xFF == ord('q'):
      stopFlag = True
      break

  except BreakLoop:
    raise BreakLoop
  except Exception as e:
    pass

  if (len(Q) == Q.maxlen):
    time.sleep(2)
    break

The final segment will call the getWarpImages function to get the Augmented image on top of the video. It also checks for the upcoming frames & whether the source video is finished or not. In case of the end, the application will initiate a break method to come out from the infinite WebCAM read. Also, there is a provision for manual exit by pressing the ‘Q’ from the MacBook keyboard.

# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()

It is always advisable to close your camera & remove any temporarily available windows that are still left once the application finishes the process.

  • augmentedMovieTrailer.py (Main calling script)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsEmbedVideoWithStream class to initiate ####
#### the augmented reality in real-time ####
#### & display a trailer on top of any surface ####
#### via Web-CAM. ####
#####################################################
# We keep the setup code in a different class as shown below.
import clsEmbedVideoWithStream as evws
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the main class
x1 = evws.clsEmbedVideoWithStream()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'augmentedMovieTrailer.log', level=logging.INFO)
print('Started augmenting videos!')
# Execute all the pass
r1 = x1.processStream(debugInd, var)
if (r1 == 0):
print('Successfully identified human emotions!')
else:
print('Failed to identify the human emotions!')
var2 = datetime.datetime.now()
c = var2 – var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The above script will initially instantiate the main calling class & then invoke the processStream function to create the Augmented Reality.


FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

Directory Structure

You will get the complete codebase in the following Github link.

If you want to know more about this legendary director & his famous work, please visit the following link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Real-time Zoom-In/Zoom-Out using Python-based Computer Vision

Hi Guys,

Today, I’ll be using another exciting installment of Computer Vision. The application will read the real-time human hand gesture to control WebCAM’s zoom-in or zoom-out capability.

Why don’t we see the demo first before jumping into the technical details?

Demo

Architecture:

Let us understand the architecture –

Broad Diagram

As one can see, the application reads individual frames from WebCAM & then map the human hand gestures with a media pipe. And finally, calculate the distance between particular pipe points projected on human hands.

Let’s take another depiction of the experiment to better understand the above statement.

Camera & Subject Position

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install mediapipe
pip install opencv-python

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  1. clsConfig.py (Configuration script for the application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 24-May-2022 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'FINAL_PATH': Curr_Path + sep + 'Target' + sep,
'APP_DESC_1': 'Hand Gesture Zoom Control!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'TITLE': "Human Hand Gesture Controlling App",
'minVal':0.01,
'maxVal':1
}

view raw

clsConfig.py

hosted with ❤ by GitHub

2. clsVideoZoom.py (This script will zoom the video streaming depending upon the hand gestures.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 23-May-2022 ####
#### Modified On 24-May-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsVideoZoom class to initiate ####
#### the model to read the real-time ####
#### human hand gesture from video ####
#### Web-CAM & control zoom-in & zoom-out. ####
##################################################
import mediapipe as mp
import cv2
import time
import clsHandMotionScanner as hms
import math
import imutils
import numpy as np
from clsConfig import clsConfig as cf
class clsVideoZoom():
def __init__(self):
self.title = str(cf.conf['TITLE'])
self.minVal = float(cf.conf['minVal'])
self.maxVal = int(cf.conf['maxVal'])
def zoomVideo(self, image, Iscale=1):
try:
scale=Iscale
#get the webcam size
height, width, channels = image.shape
#prepare the crop
centerX,centerY=int(height/2),int(width/2)
radiusX,radiusY= int(scale*centerX),int(scale*centerY)
minX,maxX=centerX-radiusX,centerX+radiusX
minY,maxY=centerY-radiusY,centerY+radiusY
cropped = image[minX:maxX, minY:maxY]
resized_cropped = cv2.resize(cropped, (width, height))
return resized_cropped
except Exception as e:
x = str(e)
return image
def runSensor(self):
try:
pTime = 0
cTime = 0
zRange = 0
zRangeBar = 0
cap = cv2.VideoCapture(0)
detector = hms.clsHandMotionScanner(detectionCon=0.7)
while True:
success,img = cap.read()
img = imutils.resize(img, width=720)
#img = detector.findHands(img, draw=False)
#lmList = detector.findPosition(img, draw=False)
img = detector.findHands(img)
lmList = detector.findPosition(img, draw=False)
if len(lmList) != 0:
print('*'*60)
#print(lmList[4], lmList[8])
#print('*'*60)
x1, y1 = lmList[4][1], lmList[4][2]
x2, y2 = lmList[8][1], lmList[8][2]
cx, cy = (x1+x2)//2, (y1+y2)//2
cv2.circle(img, (x1,y1), 15, (255,0,255), cv2.FILLED)
cv2.circle(img, (x2,y2), 15, (255,0,255), cv2.FILLED)
cv2.line(img, (x1,y1), (x2,y2), (255,0,255), 3)
cv2.circle(img, (cx,cy), 15, (255,0,255), cv2.FILLED)
lenVal = math.hypot(x2-x1, y2-y1)
print('Length:', str(lenVal))
print('*'*60)
# Hand Range is from 50 to 270
# Camera Zoom Range is 0.01, 1
minVal = self.minVal
maxVal = self.maxVal
zRange = np.interp(lenVal, [50, 270], [minVal, maxVal])
zRangeBar = np.interp(lenVal, [50, 270], [400, 150])
print('Range: ', str(zRange))
if lenVal < 50:
cv2.circle(img, (cx,cy), 15, (0,255,0), cv2.FILLED)
cv2.rectangle(img, (50, 150), (85, 400), (255,0,0), 3)
cv2.rectangle(img, (50, int(zRangeBar)), (85, 400), (255,0,0), cv2.FILLED)
cTime = time.time()
fps = 1/(cTime-pTime)
pTime = cTime
image = cv2.flip(img, flipCode=1)
cv2.putText(image, str(int(fps)), (10, 70), cv2.FONT_HERSHEY_PLAIN, 3, (255, 0, 255), 3)
cv2.imshow("Original Source",image)
# Creating the new zoom video
cropImg = self.zoomVideo(img, zRange)
cv2.putText(cropImg, str(int(fps)), (10, 70), cv2.FONT_HERSHEY_PLAIN, 3, (255, 0, 255), 3)
cv2.imshow("Zoomed Source",cropImg)
if cv2.waitKey(1) == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
return 0
except Exception as e:
x = str(e)
print('Error:', x)
return 1

view raw

clsVideoZoom.py

hosted with ❤ by GitHub

Key snippets from the above scripts –

def zoomVideo(self, image, Iscale=1):
    try:
        scale=Iscale

        #get the webcam size
        height, width, channels = image.shape

        #prepare the crop
        centerX,centerY=int(height/2),int(width/2)
        radiusX,radiusY= int(scale*centerX),int(scale*centerY)

        minX,maxX=centerX-radiusX,centerX+radiusX
        minY,maxY=centerY-radiusY,centerY+radiusY

        cropped = image[minX:maxX, minY:maxY]
        resized_cropped = cv2.resize(cropped, (width, height))

        return resized_cropped

    except Exception as e:
        x = str(e)

        return image

The above method will zoom in & zoom out depending upon the scale value that the human hand gesture will receive.

cap = cv2.VideoCapture(0)
detector = hms.clsHandMotionScanner(detectionCon=0.7)

The following lines will read the individual frames from webCAM. Instantiate another open-source customized class, which will find the hand’s position.

img = detector.findHands(img)
lmList = detector.findPosition(img, draw=False)

And captured the hand position depending upon the movements.

x1, y1 = lmList[4][1], lmList[4][2]
x2, y2 = lmList[8][1], lmList[8][2]

cx, cy = (x1+x2)//2, (y1+y2)//2

cv2.circle(img, (x1,y1), 15, (255,0,255), cv2.FILLED)
cv2.circle(img, (x2,y2), 15, (255,0,255), cv2.FILLED)

To understand the above lines, let’s look into the following diagram –

Source: Mediapipe

As one can see, the thumbs tip value is 4 & Index fingertip is 8. The application will mark these points with a solid circle.

lenVal = math.hypot(x2-x1, y2-y1)

The above line will calculate the distance between the thumbs tip & index fingertip.

# Camera Zoom Range is 0.01, 1

minVal = self.minVal
maxVal = self.maxVal

zRange = np.interp(lenVal, [50, 270], [minVal, maxVal])
zRangeBar = np.interp(lenVal, [50, 270], [400, 150])

In the above lines, the application will translate the values captured between the two fingertips & then translate them into a more meaningful camera zoom range from 0.01 to 1.

if lenVal < 50:
    cv2.circle(img, (cx,cy), 15, (0,255,0), cv2.FILLED)

The application will not consider a value below 50 as 0.01 for the WebCAM start value.

cTime = time.time()
fps = 1/(cTime-pTime)
pTime = cTime


image = cv2.flip(img, flipCode=1)
cv2.putText(image, str(int(fps)), (10, 70), cv2.FONT_HERSHEY_PLAIN, 3, (255, 0, 255), 3)
cv2.imshow("Original Source",image)

# Creating the new zoom video
cropImg = self.zoomVideo(img, zRange)
cv2.putText(cropImg, str(int(fps)), (10, 70), cv2.FONT_HERSHEY_PLAIN, 3, (255, 0, 255), 3)
cv2.imshow("Zoomed Source",cropImg)

The application will capture the frame rate & share the original video frame and the test frame, where it will zoom in or out depending on the hand gesture.

3. clsHandMotionScanner.py (This is an enhance version of open source script, which will capture the hand position.)


##################################################
#### Written By: SATYAKI DE ####
#### Modified On 23-May-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python class that will capture the ####
#### human hand gesture on real-time basis ####
#### and that will enable the video zoom ####
#### capability of the feed directly coming ####
#### out of a Web-CAM. ####
##################################################
import mediapipe as mp
import cv2
import time
class clsHandMotionScanner():
def __init__(self, mode=False, maxHands=2, detectionCon=0.5, modelComplexity=1, trackCon=0.5):
self.mode = mode
self.maxHands = maxHands
self.detectionCon = detectionCon
self.modelComplex = modelComplexity
self.trackCon = trackCon
self.mpHands = mp.solutions.hands
self.hands = self.mpHands.Hands(self.mode, self.maxHands,self.modelComplex,self.detectionCon, self.trackCon)
# it gives small dots onhands total 20 landmark points
self.mpDraw = mp.solutions.drawing_utils
def findHands(self, img, draw=True):
try:
# Send rgb image to hands
imgRGB = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
self.results = self.hands.process(imgRGB)
# process the frame
if self.results.multi_hand_landmarks:
for handLms in self.results.multi_hand_landmarks:
if draw:
#Draw dots and connect them
self.mpDraw.draw_landmarks(img,handLms,self.mpHands.HAND_CONNECTIONS)
return img
except Exception as e:
x = str(e)
print('Error: ', x)
return img
def findPosition(self, img, handNo=0, draw=True):
try:
lmlist = []
# check wether any landmark was detected
if self.results.multi_hand_landmarks:
#Which hand are we talking about
myHand = self.results.multi_hand_landmarks[handNo]
# Get id number and landmark information
for id, lm in enumerate(myHand.landmark):
# id will give id of landmark in exact index number
# height width and channel
h,w,c = img.shape
#find the position
cx,cy = int(lm.x*w), int(lm.y*h) #center
#print(id,cx,cy)
lmlist.append([id,cx,cy])
# Draw circle for 0th landmark
if draw:
cv2.circle(img,(cx,cy), 15 , (255,0,255), cv2.FILLED)
return lmlist
except Exception as e:
x = str(e)
print('Error: ', x)
lmlist = []
return lmlist

Key snippets from the above script –

def findHands(self, img, draw=True):
    try:
        # Send rgb image to hands
        imgRGB = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
        self.results = self.hands.process(imgRGB)

        # process the frame
        if self.results.multi_hand_landmarks:
            for handLms in self.results.multi_hand_landmarks:

                if draw:
                    #Draw dots and connect them
                    self.mpDraw.draw_landmarks(img,handLms,self.mpHands.HAND_CONNECTIONS)

        return img
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return img

The above function will identify individual key points & marked them as dots on top of human hands.

def findPosition(self, img, handNo=0, draw=True):
      try:
          lmlist = []

          # check wether any landmark was detected
          if self.results.multi_hand_landmarks:
              #Which hand are we talking about
              myHand = self.results.multi_hand_landmarks[handNo]
              # Get id number and landmark information
              for id, lm in enumerate(myHand.landmark):
                  # id will give id of landmark in exact index number
                  # height width and channel
                  h,w,c = img.shape
                  #find the position - center
                  cx,cy = int(lm.x*w), int(lm.y*h) 
                  lmlist.append([id,cx,cy])

              # Draw circle for 0th landmark
              if draw:
                  cv2.circle(img,(cx,cy), 15 , (255,0,255), cv2.FILLED)

          return lmlist
      except Exception as e:
          x = str(e)
          print('Error: ', x)

          lmlist = []
          return lmlist

The above line will capture the position of each media pipe point along with the x & y coordinate & store them in a list, which will be later parsed for main use case.

4. viewHandMotion.py (Main calling script.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 23-May-2022 ####
#### Modified On 23-May-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsVideoZoom class to initiate ####
#### the model to read the real-time ####
#### hand movements gesture that enables ####
#### video zoom control. ####
##################################################
import time
import clsVideoZoom as vz
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating the base class
x1 = vz.clsVideoZoom()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'visualZoom.log', level=logging.INFO)
print('Started Visual-Zoom Emotions!')
r1 = x1.runSensor()
if (r1 == 0):
print('Successfully identified visual zoom!')
else:
print('Failed to identify the visual zoom!')
var2 = datetime.datetime.now()
c = var2 – var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The above lines are self-explanatory. So, I’m not going to discuss anything on this script.


FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

Directory

So, we’ve done it.


You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.