Personal Virtual Assistant (SJ) implemented using python-based OpenAI, Rev_AI & PyTtSX3.

Today, I will discuss our Virtual personal assistant (SJ) with a combination of AI-driven APIs, which is now operational in Python. We will use the three most potent APIs using OpenAI, Rev-AI & Pyttsx3. Why don’t we see the demo first?

Great! Let us understand we can leverage this by writing a tiny snippet using this new AI model.

Architecture:

Let us understand the flow of events –

The application first invokes the API to capture the audio spoken through the audio device & then translate that into text, which is later parsed & shared as input by the openai for the response of the posted queries. Once, OpenAI shares the response, the python-based engine will take the response & using pyttsx3 to convert them to voice.


Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install openai==0.25.0
pip install PyAudio==0.2.13
pip install playsound==1.3.0
pip install pandas==1.5.2
pip install rev-ai==2.17.1
pip install six==1.16.0
pip install websocket-client==0.59.0

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsConfigClient.py (Main configuration file)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 31-Dec-2022 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal AI-driven voice assistant. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'output' + sep,
'REPORT_DIR': 'output',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'CODE_PATH': Curr_Path + sep + 'Code' + sep,
'APP_DESC_1': 'Personal Voice Assistant (SJ)!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'TITLE': "Personal Voice Assistant (SJ)!",
'PATH' : Curr_Path,
'OPENAI_API_KEY': "sk-aapwfMWDuFE5XXXUr2BH",
'REVAI_API_KEY': "02ks6kFhEKjdhdure8474JJAJJ945958_h8P_DEKDNkK6DwNNNHU17aRtCw",
'MODEL_NAME': "code-davinci-002",
"speedSpeech": 170,
"speedPitch": 0.8,
"soundRate": 44100,
"contentType": "audio/x-raw",
"layout": "interleaved",
"format": "S16LE",
"channels": 1
}

A few of the essential entries from the above snippet, which one should be looked for, are –

'OPENAI_API_KEY': "sk-aapwfMWDuFE5XXXUr2BH",
'REVAI_API_KEY': "02ks6kFhEKjdhdure8474JJAJJ945958_h8P_DEKDNkK6DwNNNHU17aRtCw",
'MODEL_NAME': "code-davinci-002",
"speedSpeech": 170,
"speedPitch": 0.8,
"soundRate": 44100,
"contentType": "audio/x-raw",
"layout": "interleaved",
"format": "S16LE",
"channels": 1

Note that, all the API-key are not real. You need to generate your own key.

  • clsText2Voice.py (The python script that will convert text to voice)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Oct-2019 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: Main class converting ####
#### text to voice using third-party API. ####
###############################################
import pyttsx3
from clsConfigClient import clsConfigClient as cf
class clsText2Voice:
def __init__(self):
self.speedSpeech = cf.conf['speedSpeech']
self.speedPitch = cf.conf['speedPitch']
def getAudio(self, srcString):
try:
speedSpeech = self.speedSpeech
speedPitch = self.speedPitch
engine = pyttsx3.init()
# Set the speed of the speech (in words per minute)
engine.setProperty('rate', speedSpeech)
# Set the pitch of the speech (1.0 is default)
engine.setProperty('pitch', speedPitch)
# Converting to MP3
engine.say(srcString)
engine.runAndWait()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the important snippet will be as follows –

def getAudio(self, srcString):
    try:
        speedSpeech = self.speedSpeech
        speedPitch = self.speedPitch
        
        engine = pyttsx3.init()

        # Set the speed of the speech (in words per minute)
        engine.setProperty('rate', speedSpeech)

        # Set the pitch of the speech (1.0 is default)
        engine.setProperty('pitch', speedPitch)

        # Converting to MP3
        engine.say(srcString)
        engine.runAndWait()

        return 0

The code is a function that generates speech audio from a given string using the Pyttsx3 library in Python. The function sets the speech rate and pitch using the “speedSpeech” and “speedPitch” properties of the calling object, initializes the Pyttsx3 engine, sets the speech rate and pitch on the engine, speaks the given string, and waits for the speech to finish. The function returns 0 after the speech is finished.


  • clsChatEngine.py (This python script will invoke the ChatGPT OpenAI class to initiate the response of the queries in python.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### ChatGPT OpenAI class to initiate the ####
#### response of the queries in python. ####
#####################################################
import os
import openai
import json
from clsConfigClient import clsConfigClient as cf
import sys
import errno
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### Global Section ###
###############################################
CODE_PATH=str(cf.conf['CODE_PATH'])
MODEL_NAME=str(cf.conf['MODEL_NAME'])
###############################################
### End of Global Section ###
###############################################
class clsChatEngine:
def __init__(self):
self.OPENAI_API_KEY=str(cf.conf['OPENAI_API_KEY'])
def findFromSJ(self, text):
try:
OPENAI_API_KEY = self.OPENAI_API_KEY
# ChatGPT API_KEY
openai.api_key = OPENAI_API_KEY
print('22'*60)
try:
# Getting response from ChatGPT
response = openai.Completion.create(
engine=MODEL_NAME,
prompt=text,
max_tokens=64,
top_p=1.0,
n=3,
temperature=0,
frequency_penalty=0.0,
presence_penalty=0.0,
stop=["\"\"\""]
)
except IOError as e:
if e.errno == errno.EPIPE:
pass
print('44'*60)
res = response.choices[0].text
return res
except IOError as e:
if e.errno == errno.EPIPE:
pass
except Exception as e:
x = str(e)
print(x)
print('66'*60)
return x

Key snippets from the above-script are as follows –

def findFromSJ(self, text):
      try:
          OPENAI_API_KEY = self.OPENAI_API_KEY

          # ChatGPT API_KEY
          openai.api_key = OPENAI_API_KEY

          print('22'*60)

          try:
              # Getting response from ChatGPT
              response = openai.Completion.create(
              engine=MODEL_NAME,
              prompt=text,
              max_tokens=64,
              top_p=1.0,
              n=3,
              temperature=0,
              frequency_penalty=0.0,
              presence_penalty=0.0,
              stop=["\"\"\""]
              )
          except IOError as e:
              if e.errno == errno.EPIPE:
                  pass

          print('44'*60)
          res = response.choices[0].text

          return res

      except IOError as e:
          if e.errno == errno.EPIPE:
              pass

      except Exception as e:
          x = str(e)
          print(x)

          print('66'*60)

          return x

The code is a function that uses OpenAI’s ChatGPT model to generate text based on a given prompt text. The function takes the text to be completed as input and uses an API key stored in the OPENAI_API_KEY property of the calling object to request OpenAI’s API. If the request is successful, the function returns the top completion generated by the model, as stored in the text field of the first item in the choices list of the API response.

The function includes error handling for IOError and Exception. If an IOError occurs, the function checks if the error number is errno.EPIPE and, if it is, returns without doing anything. If an Exception occurs, the function converts the error message to a string and prints it, then returns the string.


  • clsVoice2Text.py (This python script will invoke the Rev-AI class to initiate the transformation of audio into the text.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### Rev-AI class to initiate the transformation ####
#### of audio into the text. ####
#####################################################
import pyaudio
from rev_ai.models import MediaConfig
from rev_ai.streamingclient import RevAiStreamingClient
from six.moves import queue
import ssl
import json
import pandas as p
import clsMicrophoneStream as ms
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
# Initiating Log class
l = cl.clsL()
# Bypassing SSL Authentication
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
# Legacy python that doesn't verify HTTPS certificates by default
pass
else:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Insert your access token here ####
######################################
debug_ind = 'Y'
################################################################
### Sampling rate of your microphone and desired chunk size ####
################################################################
class clsVoice2Text:
def __init__(self):
self.OPENAI_API_KEY=str(cf.conf['OPENAI_API_KEY'])
self.rate = cf.conf['soundRate']
def processVoice(self, var):
try:
OPENAI_API_KEY = self.OPENAI_API_KEY
accessToken = cf.conf['REVAI_API_KEY']
rate = self.rate
chunk = int(rate/10)
################################################################
### Creates a media config with the settings set for a raw ####
### microphone input ####
################################################################
sampleMC = MediaConfig('audio/x-raw', 'interleaved', 44100, 'S16LE', 1)
streamclient = RevAiStreamingClient(accessToken, sampleMC)
#####################################################################
### Opens microphone input. The input will stop after a keyboard ####
### interrupt. ####
#####################################################################
with ms.clsMicrophoneStream(rate, chunk) as stream:
#####################################################################
### Uses try method to enable users to manually close the stream ####
#####################################################################
try:
response_gen = ''
response = ''
finalText = ''
#########################################################################
### Starts the server connection and thread sending microphone audio ####
#########################################################################
response_gen = streamclient.start(stream.generator())
###################################################
### Iterates through responses and prints them ####
###################################################
for response in response_gen:
try:
print('JSON:')
print(response)
r = json.loads(response)
df = p.json_normalize(r["elements"])
l.logr('1.df_' + var + '.csv', debug_ind, df, 'log')
column_name = "confidence"
if column_name in df.columns:
print('DF:: ')
print(df)
finalText = "".join(df["value"])
print("TEXT:")
print(finalText)
df = p.DataFrame()
raise Exception
except Exception as e:
x = str(e)
break
streamclient.end()
return finalText
except Exception as e:
x = str(e)
#######################################
### Ends the WebSocket connection. ####
#######################################
streamclient.end()
return ''
except Exception as e:
x = str(e)
print('Error: ', x)
streamclient.end()
return x

Here is the important snippet from the above code –

def processVoice(self, var):
      try:
          OPENAI_API_KEY = self.OPENAI_API_KEY
          accessToken = cf.conf['REVAI_API_KEY']
          rate = self.rate
          chunk = int(rate/10)

          ################################################################
          ### Creates a media config with the settings set for a raw  ####
          ### microphone input                                        ####
          ################################################################

          sampleMC = MediaConfig('audio/x-raw', 'interleaved', 44100, 'S16LE', 1)

          streamclient = RevAiStreamingClient(accessToken, sampleMC)

          #####################################################################
          ### Opens microphone input. The input will stop after a keyboard ####
          ### interrupt.                                                   ####
          #####################################################################

          with ms.clsMicrophoneStream(rate, chunk) as stream:

              #####################################################################
              ### Uses try method to enable users to manually close the stream ####
              #####################################################################

              try:
                  response_gen = ''
                  response = ''
                  finalText = ''
                  
                  ############################################
                  ### Starts the server connection        ####
                  ### and thread sending microphone audio #### 
                  ############################################

                  response_gen = streamclient.start(stream.generator())

                  ###################################################
                  ### Iterates through responses and prints them ####
                  ###################################################

                  for response in response_gen:
                      try:
                          print('JSON:')
                          print(response)

                          r = json.loads(response)

                          df = p.json_normalize(r["elements"])
                          l.logr('1.df_' + var + '.csv', debug_ind, df, 'log')
                          column_name = "confidence"

                          if column_name in df.columns:
                              print('DF:: ')
                              print(df)

                              finalText = "".join(df["value"])
                              print("TEXT:")
                              print(finalText)

                              df = p.DataFrame()

                              raise Exception

                      except Exception as e:
                          x = str(e)
                          break

                  streamclient.end()

                  return finalText

              except Exception as e:
                  x = str(e)
                  #######################################
                  ### Ends the WebSocket connection. ####
                  #######################################

                  streamclient.end()

                  return ''

      except Exception as e:
          x = str(e)
          print('Error: ', x)

          streamclient.end()

          return x

The code is a python function called processVoice() that processes a user’s voice input using the Rev.AI API. The function takes in one argument, “var,” which is not used in the code.

  1. Let us understand the code –
    • First, the function sets several variables, including the Rev.AI API access token, the sample rate, and the chunk size for the audio input.
    • Then, it creates a media configuration object for raw microphone input.
    • A RevAiStreamingClient object is created using the access token and the media configuration.
    • The code opens the microphone input using a statement and the microphone stream class.
    • Within the statement, the code starts the server connection and a thread that sends microphone audio to the server.
    • The code then iterates through the responses from the server, normalizing the JSON response and storing the values in a pandas data-frame.
    • If the “confidence” column exists in the data-frame, the code joins all the values to form the final text and raises an exception.
      • If there is an exception, the WebSocket connection is ended, and the final text is returned.
      • If there is any error, the WebSocket connection is also ended, and an empty string or the error message is returned.

  • clsMicrophoneStream.py (This python script invoke the rev_ai template to capture the chunk voice data & stream it to the service for text translation & return the response to app.)


#####################################################
#### Modified By: SATYAKI DE ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### rev_ai template to capture the chunk voice ####
#### data & stream it to the service for text ####
#### translation & return the response to app. ####
#####################################################
import pyaudio
from rev_ai.models import MediaConfig
from six.moves import queue
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
class clsMicrophoneStream(object):
#############################################
### Opens a recording stream as a ####
### generator yielding the audio chunks. ####
#############################################
def __init__(self, rate, chunk):
self._rate = rate
self._chunk = chunk
##################################################
### Create a thread-safe buffer of audio data ####
##################################################
self._buff = queue.Queue()
self.closed = True
def __enter__(self):
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
#########################################################
### The API currently only supports 1-channel (mono) ####
### audio. ####
#########################################################
channels=1, rate=self._rate,
input=True, frames_per_buffer=self._chunk,
####################################################################
### Run the audio stream asynchronously to fill the buffer ####
### object. Run the audio stream asynchronously to fill the ####
### buffer object. This is necessary so that the input device's ####
### buffer doesn't overflow while the calling thread makes ####
### network requests, etc. ####
####################################################################
stream_callback=self._fill_buffer,
)
self.closed = False
return self
def __exit__(self, type, value, traceback):
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
###############################################################
### Signal the generator to terminate so that the client's ####
### streaming_recognize method will not block the process ####
### termination. ####
###############################################################
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
##############################################################
### Continuously collect data from the audio stream, into ####
### the buffer. ####
##############################################################
self._buff.put(in_data)
return None, pyaudio.paContinue
def generator(self):
while not self.closed:
######################################################################
### Use a blocking get() to ensure there's at least one chunk of ####
### data, and stop iteration if the chunk is None, indicating the ####
### end of the audio stream. ####
######################################################################
chunk = self._buff.get()
if chunk is None:
return
data = [chunk]
##########################################################
### Now consume whatever other data's still buffered. ####
##########################################################
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b''.join(data)

The key snippet from the above script are as follows –

def __enter__(self):
    self._audio_interface = pyaudio.PyAudio()
    self._audio_stream = self._audio_interface.open(
        format=pyaudio.paInt16,

        #########################################################
        ### The API currently only supports 1-channel (mono) ####
        ### audio.                                           ####
        #########################################################

        channels=1, rate=self._rate,
        input=True, frames_per_buffer=self._chunk,

        ####################################################################
        ### Run the audio stream asynchronously to fill the buffer      ####
        ### object. Run the audio stream asynchronously to fill the     ####
        ### buffer object. This is necessary so that the input device's ####
        ### buffer doesn't overflow while the calling thread makes      ####
        ### network requests, etc.                                      ####
        ####################################################################

        stream_callback=self._fill_buffer,
    )

    self.closed = False

    return self

This code is a part of a context manager class (clsMicrophoneStream) and implements the __enter__ method of the class. The method sets up a PyAudio object and opens an audio stream using the PyAudio object. The audio stream is configured to have the following properties:

  • Format: 16-bit integer (paInt16)
  • Channels: 1 (mono)
  • Rate: The rate specified in the instance of the ms.clsMicrophoneStream class.
  • Input: True, meaning the audio stream is an input stream, not an output stream.
  • Frames per buffer: The chunk specified in the instance of the ms.clsMicrophoneStream class.
  • Stream callback: The method self._fill_buffer will be called when the buffer needs more data.

The self.closed attribute is set to False to indicate that the stream is open. The method returns the instance of the class (self).

def __exit__(self, type, value, traceback):
    self._audio_stream.stop_stream()
    self._audio_stream.close()
    self.closed = True

    ###############################################################
    ### Signal the generator to terminate so that the client's ####
    ### streaming_recognize method will not block the process  ####
    ### termination.                                           ####
    ###############################################################

    self._buff.put(None)
    self._audio_interface.terminate()

The exit method implements the “exit” behavior of a Python context manager. It is automatically called when the context manager is exited using the statement.

The method stops and closes the audio stream, sets the closed attribute to True, and places None in the buffer. The terminate method of the PyAudio interface is then called to release any resources used by the audio stream.

def _fill_buffer(self, in_data, frame_count, time_info, status_flags):

    ##############################################################
    ### Continuously collect data from the audio stream, into ####
    ### the buffer.                                           ####
    ##############################################################

    self._buff.put(in_data)
    return None, pyaudio.paContinue

The _fill_buffer method is a callback function that runs asynchronously to continuously collect data from the audio stream and add it to the buffer.

The _fill_buffer method takes four arguments:

  • in_data: the raw audio data collected from the audio stream.
  • frame_count: the number of frames of audio data that was collected.
  • time_info: information about the timing of the audio data.
  • status_flags: flags that indicate the status of the audio stream.

The method adds the collected in_data to the buffer using the put method of the buffer object. It returns a tuple of None and pyaudio.paContinue to indicate that the audio stream should continue.

def generator(self):
    while not self.closed:
        ######################################################################
        ### Use a blocking get() to ensure there's at least one chunk of  ####
        ### data, and stop iteration if the chunk is None, indicating the ####
        ### end of the audio stream.                                      ####
        ######################################################################

        chunk = self._buff.get()
        if chunk is None:
            return
        data = [chunk]

        ##########################################################
        ### Now consume whatever other data's still buffered. ####
        ##########################################################

        while True:
            try:
                chunk = self._buff.get(block=False)
                if chunk is None:
                    return
                data.append(chunk)
            except queue.Empty:
                break

        yield b''.join(data)

The logic of the code “def generator(self):” is as follows:

The function generator is an infinite loop that runs until self.closed is True. Within the loop, it uses a blocking get() method of the buffer object (self._buff) to retrieve a chunk of audio data. If the retrieved chunk is None, it means the end of the audio stream has been reached, and the function returns.

If the retrieved chunk is not None, it appends it to the data list. The function then enters another inner loop that continues to retrieve chunks from the buffer using the non-blocking get() method until there are no more chunks left. Finally, the function yields the concatenated chunks of data as a single-byte string.


  • SJVoiceAssistant.py (Main calling python script)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 31-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### multiple classes to initiate the ####
#### AI-enabled personal assistant, which would ####
#### display & answer the queries through voice. ####
#####################################################
import pyaudio
from six.moves import queue
import ssl
import json
import pandas as p
import clsMicrophoneStream as ms
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsChatEngine as ce
import clsText2Voice as tv
import clsVoice2Text as vt
#from signal import signal, SIGPIPE, SIG_DFL
#signal(SIGPIPE,SIG_DFL)
###################################################
##### Adding the Instantiating Global classes #####
###################################################
x2 = ce.clsChatEngine()
x3 = tv.clsText2Voice()
x4 = vt.clsVoice2Text()
# Initiating Log class
l = cl.clsL()
###################################################
##### End of Global Classes #######
###################################################
# Bypassing SSL Authentication
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
# Legacy python that doesn't verify HTTPS certificates by default
pass
else:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Insert your access token here ####
######################################
debug_ind = 'Y'
######################################
#### Global Flag ########
######################################
def main():
try:
spFlag = True
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
exitComment = 'THANKS.'
while True:
try:
finalText = ''
if spFlag == True:
finalText = x4.processVoice(var)
else:
pass
val = finalText.upper().strip()
print('Main Return: ', val)
print('Exit Call: ', exitComment)
print('Length of Main Return: ', len(val))
print('Length of Exit Call: ', len(exitComment))
if val == exitComment:
break
elif finalText == '':
spFlag = True
else:
print('spFlag::',spFlag)
print('Inside: ', finalText)
resVal = x2.findFromSJ(finalText)
print('ChatGPT Response:: ')
print(resVal)
resAud = x3.getAudio(resVal)
spFlag = False
except Exception as e:
pass
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('End Time: ' + str(var1))
print('SJ Voice Assistant exited successfully!')
print('*'*120)
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And, the key snippet from the above script –

def main():
    try:
        spFlag = True

        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        exitComment = 'THANKS.'

        while True:
            try:
                finalText = ''

                if spFlag == True:
                    finalText = x4.processVoice(var)
                else:
                    pass

                val = finalText.upper().strip()

                print('Main Return: ', val)
                print('Exit Call: ', exitComment)
                print('Length of Main Return: ', len(val))
                print('Length of Exit Call: ', len(exitComment))

                if val == exitComment:
                    break
                elif finalText == '':
                    spFlag = True
                else:
                    print('spFlag::',spFlag)
                    print('Inside: ', finalText)
                    resVal = x2.findFromSJ(finalText)

                    print('ChatGPT Response:: ')
                    print(resVal)

                    resAud = x3.getAudio(resVal)
                    spFlag = False
            except Exception as e:
                pass

        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('End Time: ' + str(var1))
        print('SJ Voice Assistant exited successfully!')
        print('*'*120)

    except Exception as e:
        x = str(e)
        print('Error: ', x)

The code is a Python script that implements a voice-based chatbot (likely named “SJ Voice Assistant”). The code performs the following operations:

  1. Initialize the string “exitComment” to “THANKS.” and set the “spFlag” to True.
  2. Start an infinite loop until a specific condition breaks the loop.
  3. In the loop, try to process the input voice with a function called “processVoice()” from an object “x4”. Store the result in “finalText.”
  4. Convert “finalText” to upper case, remove leading/trailing whitespaces, and store it in “val.” Print “Main Return” and “Exit Call” with their length.
  5. If “val” equals “exitComment,” break the loop. Suppose “finalText” is an empty string; set “spFlag” to True. Otherwise, perform further processing: a. Call the function “findFromSJ()” from an object “x2” with the input “finalText.” Store the result in “resVal.” b. Call the function “getAudio()” from an object “x3” with the input “resVal.” Store the result in “resAud.” Set “spFlag” to False.
  6. If an exception occurs, catch it and pass (do nothing).
  7. Finally the application will exit by displaying the following text – “SJ Voice Assistant exited successfully!”
  8. If an exception occurs outside the loop, catch it and print the error message.

So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Real-time augmented reality (AR) using Python-based Computer Vision

Hi Team,

Today, I’m going to discuss another Computer Vision installment. I’ll discuss how to implement Augmented Reality using Open-CV Computer Vision with full audio. We will be using part of a Bengali OTT Series called “Feludar Goendagiri” entirely for educational purposes & also as a tribute to the great legendary director, late Satyajit Roy. To know more about him, please click the following link.

Why don’t we see the demo first before jumping into the technical details?


Demo

Architecture:

Let us understand the architecture –

Process Flow

The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & blends that with the video trailer. Finally, it creates another video by correctly mixing the source audio.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install opencv-python
pip install pygame

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsAugmentedReality.py (This is the main class of python script that will embed the source video with the WebCAM streams in real-time.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 20-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will embed the source ####
#### video with the WebCAM streams in ####
#### real-time. ####
##################################################
# Importing necessary packages
import numpy as np
import cv2
from clsConfig import clsConfig as cf
# Initialize our cached reference points
CACHED_REF_PTS = None
class clsAugmentedReality:
def __init__(self):
self.TOP_LEFT_X = int(cf.conf['TOP_LEFT_X'])
self.TOP_LEFT_Y = int(cf.conf['TOP_LEFT_Y'])
self.TOP_RIGHT_X = int(cf.conf['TOP_RIGHT_X'])
self.TOP_RIGHT_Y = int(cf.conf['TOP_RIGHT_Y'])
self.BOTTOM_RIGHT_X = int(cf.conf['BOTTOM_RIGHT_X'])
self.BOTTOM_RIGHT_Y = int(cf.conf['BOTTOM_RIGHT_Y'])
self.BOTTOM_LEFT_X = int(cf.conf['BOTTOM_LEFT_X'])
self.BOTTOM_LEFT_Y = int(cf.conf['BOTTOM_LEFT_Y'])
def getWarpImages(self, frame, source, cornerIDs, arucoDict, arucoParams, zoomFlag, useCache=False):
try:
# Assigning values
TOP_LEFT_X = self.TOP_LEFT_X
TOP_LEFT_Y = self.TOP_LEFT_Y
TOP_RIGHT_X = self.TOP_RIGHT_X
TOP_RIGHT_Y = self.TOP_RIGHT_Y
BOTTOM_RIGHT_X = self.BOTTOM_RIGHT_X
BOTTOM_RIGHT_Y = self.BOTTOM_RIGHT_Y
BOTTOM_LEFT_X = self.BOTTOM_LEFT_X
BOTTOM_LEFT_Y = self.BOTTOM_LEFT_Y
# Grab a reference to our cached reference points
global CACHED_REF_PTS
if source is None:
raise
# Grab the width and height of the frame and source image,
# respectively
# Extracting Frame from Camera
# Exracting Source from Video
(imgH, imgW) = frame.shape[:2]
(srcH, srcW) = source.shape[:2]
# Detect Aruco markers in the input frame
(corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)
print('Ids: ', str(ids))
print('Rejected: ', str(rejected))
# if we *did not* find our four ArUco markers, initialize an
# empty IDs list, otherwise flatten the ID list
print('Detecting Corners: ', str(len(corners)))
ids = np.array([]) if len(corners) != 4 else ids.flatten()
# Initialize our list of reference points
refPts = []
refPtTL1 = []
# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
# Grab the index of the corner with the current ID
j = np.squeeze(np.where(ids == i))
# If we receive an empty list instead of an integer index,
# then we could not find the marker with the current ID
if j.size == 0:
continue
# Otherwise, append the corner (x, y)-coordinates to our list
# of reference points
corner = np.squeeze(corners[j])
refPts.append(corner)
# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
# If we are allowed to use cached reference points, fall
# back on them
if useCache and CACHED_REF_PTS is not None:
refPts = CACHED_REF_PTS
# Otherwise, we cannot use the cache and/or there are no
# previous cached reference points, so return early
else:
return None
# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
CACHED_REF_PTS = refPts
# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)
# For zoom option recalculating all the 4 points
refPtTL1_L_X = refPtTL[0][0]TOP_LEFT_X
refPtTL1_L_Y = refPtTL[0][1]TOP_LEFT_Y
refPtTL1.append((refPtTL1_L_X,refPtTL1_L_Y))
refPtTL1_R_X = refPtTL[1][0]+TOP_RIGHT_X
refPtTL1_R_Y = refPtTL[1][1]+TOP_RIGHT_Y
refPtTL1.append((refPtTL1_R_X,refPtTL1_R_Y))
refPtTD1_L_X = refPtTL[2][0]+BOTTOM_RIGHT_X
refPtTD1_L_Y = refPtTL[2][1]+BOTTOM_RIGHT_Y
refPtTL1.append((refPtTD1_L_X,refPtTD1_L_Y))
refPtTD1_R_X = refPtTL[3][0]BOTTOM_LEFT_X
refPtTD1_R_Y = refPtTL[3][1]+BOTTOM_LEFT_Y
refPtTL1.append((refPtTD1_R_X,refPtTD1_R_Y))
dstMatMod = [refPtTL1[0], refPtTL1[1], refPtTL1[2], refPtTL1[3]]
dstMatMod = np.array(dstMatMod)
# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])
# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
(H, _) = cv2.findHomography(srcMat, dstMat)
else:
(H, _) = cv2.findHomography(srcMat, dstMatMod)
warped = cv2.warpPerspective(source, H, (imgW, imgH))
# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source
# image into the destination)
mask = np.zeros((imgH, imgW), dtype="uint8")
if zoomFlag == 1:
cv2.fillConvexPoly(mask, dstMat.astype("int32"), (255, 255, 255), cv2.LINE_AA)
else:
cv2.fillConvexPoly(mask, dstMatMod.astype("int32"), (255, 255, 255), cv2.LINE_AA)
# This optional step will give the source image a black
# border surrounding it when applied to the source image, you
# can apply a dilation operation
rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.dilate(mask, rect, iterations=2)
# Create a three channel version of the mask by stacking it
# depth-wise, such that we can copy the warped source image
# into the input image
maskScaled = mask.copy() / 255.0
maskScaled = np.dstack([maskScaled] * 3)
# Copy the warped source image into the input image by
# (1) Multiplying the warped image and masked together,
# (2) Then multiplying the original input image with the
# mask (giving more weight to the input where there
# are not masked pixels), and
# (3) Adding the resulting multiplications together
warpedMultiplied = cv2.multiply(warped.astype("float"), maskScaled)
imageMultiplied = cv2.multiply(frame.astype(float), 1.0 maskScaled)
output = cv2.add(warpedMultiplied, imageMultiplied)
output = output.astype("uint8")
# Return the output frame to the calling function
return output
except Exception as e:
# Delibarately raising the issue
# That way the control goes to main calling methods
# exception section
raise

Please find the key snippet from the above script –

(imgH, imgW) = frame.shape[:2]
(srcH, srcW) = source.shape[:2]

# Detect Aruco markers in the input frame
(corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)

Identifying the Aruco markers are key here. The above lines help the program detect all four corners.

However, let us discuss more on the Aruco markers & strategies that I’ve used for several different surfaces.

As you can see, the right-hand side Aruco marker is tiny compared to the left one. Hence, that one will be ideal for a curve surface like Coffee Mug, Bottle rather than a flat surface.

Also, we’ve demonstrated the zoom capability with the smaller Aruco marker that will Augment almost double the original surface area.

Let us understand why we need that; as you know, any spherical surface like a bottle is round-shaped. Hence, detecting relatively more significant Aruco markers in four corners will be difficult for any camera to identify.

Hence, we need a process where close four corners can be extrapolated mathematically to relatively larger projected areas easily detectable by any WebCAM.

Let’s observe the following figure –

Simulated Extrapolated corners

As you can see that the original position of the four corners is represented using the following points, i.e., (x1, y1), (x2, y2), (x3, y3) & (x4, y4).

And these positions are very close to each other. Hence, it will be easier for the camera to detect all the points (like a plain surface) without many retries.

And later, you can add specific values of x & y to them to get the derived four corners as shown in the above figures through the following points, i.e. (x1.1, y1.1), (x2.1, y2.1), (x3.1, y3.1) & (x4.1, y4.1).

# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
  # Grab the index of the corner with the current ID
  j = np.squeeze(np.where(ids == i))

  # If we receive an empty list instead of an integer index,
  # then we could not find the marker with the current ID
  if j.size == 0:
    continue

  # Otherwise, append the corner (x, y)-coordinates to our list
  # of reference points
  corner = np.squeeze(corners[j])
  refPts.append(corner)

# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
  # If we are allowed to use cached reference points, fall
  # back on them
  if useCache and CACHED_REF_PTS is not None:
    refPts = CACHED_REF_PTS

  # Otherwise, we cannot use the cache and/or there are no
  # previous cached reference points, so return early
  else:
    return None

# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
  CACHED_REF_PTS = refPts

# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)

In the above snippet, the application will scan through all the points & try to detect Aruco markers & then create a list of reference points, which will later be used to define the destination transformation matrix.

# For zoom option recalculating all the 4 points
refPtTL1_L_X = refPtTL[0][0]-TOP_LEFT_X
refPtTL1_L_Y = refPtTL[0][1]-TOP_LEFT_Y

refPtTL1.append((refPtTL1_L_X,refPtTL1_L_Y))

refPtTL1_R_X = refPtTL[1][0]+TOP_RIGHT_X
refPtTL1_R_Y = refPtTL[1][1]+TOP_RIGHT_Y

refPtTL1.append((refPtTL1_R_X,refPtTL1_R_Y))

refPtTD1_L_X = refPtTL[2][0]+BOTTOM_RIGHT_X
refPtTD1_L_Y = refPtTL[2][1]+BOTTOM_RIGHT_Y

refPtTL1.append((refPtTD1_L_X,refPtTD1_L_Y))

refPtTD1_R_X = refPtTL[3][0]-BOTTOM_LEFT_X
refPtTD1_R_Y = refPtTL[3][1]+BOTTOM_LEFT_Y

refPtTL1.append((refPtTD1_R_X,refPtTD1_R_Y))

dstMatMod = [refPtTL1[0], refPtTL1[1], refPtTL1[2], refPtTL1[3]]
dstMatMod = np.array(dstMatMod)

The above snippets calculate the revised points for the zoom-out capabilities as discussed in one of the earlier figures.

# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])

The above snippet will create a transformation matrix for the video trailer.

# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
  (H, _) = cv2.findHomography(srcMat, dstMat)
else:
  (H, _) = cv2.findHomography(srcMat, dstMatMod)

warped = cv2.warpPerspective(source, H, (imgW, imgH))

# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source
# image into the destination)
mask = np.zeros((imgH, imgW), dtype="uint8")
if zoomFlag == 1:
  cv2.fillConvexPoly(mask, dstMat.astype("int32"), (255, 255, 255), cv2.LINE_AA)
else:
  cv2.fillConvexPoly(mask, dstMatMod.astype("int32"), (255, 255, 255), cv2.LINE_AA)

# This optional step will give the source image a black
# border surrounding it when applied to the source image, you
# can apply a dilation operation
rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.dilate(mask, rect, iterations=2)

# Create a three channel version of the mask by stacking it
# depth-wise, such that we can copy the warped source image
# into the input image
maskScaled = mask.copy() / 255.0
maskScaled = np.dstack([maskScaled] * 3)

# Copy the warped source image into the input image by
# (1) Multiplying the warped image and masked together,
# (2) Then multiplying the original input image with the
#     mask (giving more weight to the input where there
#     are not masked pixels), and
# (3) Adding the resulting multiplications together
warpedMultiplied = cv2.multiply(warped.astype("float"), maskScaled)
imageMultiplied = cv2.multiply(frame.astype(float), 1.0 - maskScaled)
output = cv2.add(warpedMultiplied, imageMultiplied)
output = output.astype("uint8")

Finally, depending upon the zoom flag, the application will create a warped image surrounded by an optionally black border.

  • clsEmbedVideoWithStream.py (This is the main class of python script that will invoke the clsAugmentedReality class to initiate augment reality after splitting the audio & video & then project them via the Web-CAM with a seamless broadcast.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will invoke the ####
#### clsAugmentedReality class to initiate ####
#### augment reality after splitting the ####
#### audio & video & then project them via ####
#### the Web-CAM with a seamless broadcast. ####
##################################################
# Importing necessary packages
import clsAugmentedReality as ar
from clsConfig import clsConfig as cf
from imutils.video import VideoStream
from collections import deque
import imutils
import time
import cv2
import subprocess
import os
import pygame
import time
import threading
import sys
###############################################
### Global Section ###
###############################################
# Instantiating the dependant class
x1 = ar.clsAugmentedReality()
###############################################
### End of Global Section ###
###############################################
class BreakLoop(Exception):
pass
class clsEmbedVideoWithStream:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.FileName = str(cf.conf['FILE_NAME'])
self.CacheL = int(cf.conf['CACHE_LIM'])
self.FileName_1 = str(cf.conf['FILE_NAME_1'])
self.audioLen = int(cf.conf['audioLen'])
self.audioFreq = float(cf.conf['audioFreq'])
self.videoFrame = float(cf.conf['videoFrame'])
self.stopFlag=cf.conf['stopFlag']
self.zFlag=int(cf.conf['zoomFlag'])
self.title = str(cf.conf['TITLE'])
def playAudio(self, audioFile, audioLen, freq, stopFlag=False):
try:
pygame.mixer.init()
pygame.init()
pygame.mixer.music.load(audioFile)
pygame.mixer.music.set_volume(10)
val = int(audioLen)
i = 0
while i < val:
pygame.mixer.music.play(loops=0, start=float(i))
time.sleep(freq)
i = i + 1
if (i >= val):
raise BreakLoop
if (stopFlag==True):
raise BreakLoop
return 0
except BreakLoop as s:
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def extractAudio(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def processStream(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
FileName = self.FileName
CacheL = self.CacheL
FileName_1 = self.FileName_1
audioLen = self.audioLen
audioFreq = self.audioFreq
videoFrame = self.videoFrame
stopFlag = self.stopFlag
zFlag = self.zFlag
title = self.title
print('audioFreq:')
print(str(audioFreq))
print('videoFrame:')
print(str(videoFrame))
# Construct the source for Video & Temporary Audio
videoFile = Curr_Path + sep + 'Video' + sep + FileName
audioFile = Curr_Path + sep + 'Video' + sep + FileName_1
# Load the Aruco dictionary and grab the Aruco parameters
print("[INFO] initializing marker detector…")
arucoDict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_ARUCO_ORIGINAL)
arucoParams = cv2.aruco.DetectorParameters_create()
# Initialize the video file stream
print("[INFO] accessing video stream…")
vf = cv2.VideoCapture(videoFile)
x = self.extractAudio(videoFile)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
# Initialize a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)
# We need to have a frame in our queue to start our augmented reality
# pipeline, so read the next frame from our video file source and add
# it to our queue
(grabbed, source) = vf.read()
Q.appendleft(source)
# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] starting video stream…")
vs = VideoStream(src=0).start()
time.sleep(2.0)
flg = 0
t = threading.Thread(target=self.playAudio, args=(audioFile, audioLen, audioFreq, stopFlag,))
t.daemon = True
try:
# Loop over the frames from the video stream
while len(Q) > 0:
try:
# Grab the frame from our video stream and resize it
frame = vs.read()
frame = imutils.resize(frame, width=1020)
# Attempt to find the ArUCo markers in the frame, and provided
# they are found, take the current source image and warp it onto
# input frame using our augmented reality technique
warped = x1.getWarpImages(
frame, source,
cornerIDs=(923, 1001, 241, 1007),
arucoDict=arucoDict,
arucoParams=arucoParams,
zoomFlag=zFlag,
useCache=CacheL > 0)
# If the warped frame is not None, then we know (1) we found the
# four ArUCo markers and (2) the perspective warp was successfully
# applied
if warped is not None:
# Set the frame to the output augment reality frame and then
# grab the next video file frame from our queue
frame = warped
source = Q.popleft()
if flg == 0:
t.start()
flg = flg + 1
# For speed/efficiency, we can use a queue to keep the next video
# frame queue ready for us — the trick is to ensure the queue is
# always (or nearly full)
if len(Q) != Q.maxlen:
# Read the next frame from the video file stream
(grabbed, nextFrame) = vf.read()
# If the frame was read (meaning we are not at the end of the
# video file stream), add the frame to our queue
if grabbed:
Q.append(nextFrame)
# Show the output frame
cv2.imshow(title, frame)
time.sleep(videoFrame)
# If the `q` key was pressed, break from the loop
if cv2.waitKey(2) & 0xFF == ord('q'):
stopFlag = True
break
except BreakLoop:
raise BreakLoop
except Exception as e:
pass
if (len(Q) == Q.maxlen):
time.sleep(2)
break
except BreakLoop as s:
print('Processed completed!')
# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()
except Exception as e:
x = str(e)
print(x)
# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()
return 0
except Exception as e:
x = str(e)
print('Error:', x)
return 1

Please find the key snippet from the above script –

def playAudio(self, audioFile, audioLen, freq, stopFlag=False):
  try:
    pygame.mixer.init()
    pygame.init()
    pygame.mixer.music.load(audioFile)

    pygame.mixer.music.set_volume(10)

    val = int(audioLen)
    i = 0

    while i < val:
      pygame.mixer.music.play(loops=0, start=float(i))
      time.sleep(freq)

      i = i + 1

      if (i >= val):
        raise BreakLoop

      if (stopFlag==True):
        raise BreakLoop

    return 0
  except BreakLoop as s:
    return 0
  except Exception as e:
    x = str(e)
    print(x)

    return 1

The above function will initiate the pygame library to run the sound of the video file that has been extracted as part of a separate process.

def extractAudio(self, video_file, output_ext="mp3"):
    try:
        """Converts video to audio directly using `ffmpeg` command
        with the help of subprocess module"""
        filename, ext = os.path.splitext(video_file)
        subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.STDOUT)

        return 0
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above function temporarily extracts the audio file from the source trailer video.

# Initialize the video file stream
print("[INFO] accessing video stream...")
vf = cv2.VideoCapture(videoFile)

x = self.extractAudio(videoFile)

if x == 0:
    print('Successfully Audio extracted from the source file!')
else:
    print('Failed to extract the source audio!')

# Initialize a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)

# We need to have a frame in our queue to start our augmented reality
# pipeline, so read the next frame from our video file source and add
# it to our queue
(grabbed, source) = vf.read()
Q.appendleft(source)

# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] starting video stream...")
vs = VideoStream(src=0).start()

time.sleep(2.0)
flg = 0

The above snippets read the frames from the video file after invoking the audio extraction. Then, it uses a Queue method to store all the video frames for better performance. And finally, it starts consuming the standard streaming video from the WebCAM to augment the trailer video on top of it.

t = threading.Thread(target=self.playAudio, args=(audioFile, audioLen, audioFreq, stopFlag,))
t.daemon = True

Now, the application has instantiated an orphan thread to spin off the audio play function. The reason is to void the performance & video frame frequency impact on top of it.

while len(Q) > 0:
  try:
    # Grab the frame from our video stream and resize it
    frame = vs.read()
    frame = imutils.resize(frame, width=1020)

    # Attempt to find the ArUCo markers in the frame, and provided
    # they are found, take the current source image and warp it onto
    # input frame using our augmented reality technique
    warped = x1.getWarpImages(
      frame, source,
      cornerIDs=(923, 1001, 241, 1007),
      arucoDict=arucoDict,
      arucoParams=arucoParams,
      zoomFlag=zFlag,
      useCache=CacheL > 0)

    # If the warped frame is not None, then we know (1) we found the
    # four ArUCo markers and (2) the perspective warp was successfully
    # applied
    if warped is not None:
      # Set the frame to the output augment reality frame and then
      # grab the next video file frame from our queue
      frame = warped
      source = Q.popleft()

      if flg == 0:

        t.start()
        flg = flg + 1

    # For speed/efficiency, we can use a queue to keep the next video
    # frame queue ready for us -- the trick is to ensure the queue is
    # always (or nearly full)
    if len(Q) != Q.maxlen:
      # Read the next frame from the video file stream
      (grabbed, nextFrame) = vf.read()

      # If the frame was read (meaning we are not at the end of the
      # video file stream), add the frame to our queue
      if grabbed:
        Q.append(nextFrame)

    # Show the output frame
    cv2.imshow(title, frame)
    time.sleep(videoFrame)

    # If the `q` key was pressed, break from the loop
    if cv2.waitKey(2) & 0xFF == ord('q'):
      stopFlag = True
      break

  except BreakLoop:
    raise BreakLoop
  except Exception as e:
    pass

  if (len(Q) == Q.maxlen):
    time.sleep(2)
    break

The final segment will call the getWarpImages function to get the Augmented image on top of the video. It also checks for the upcoming frames & whether the source video is finished or not. In case of the end, the application will initiate a break method to come out from the infinite WebCAM read. Also, there is a provision for manual exit by pressing the ‘Q’ from the MacBook keyboard.

# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()

It is always advisable to close your camera & remove any temporarily available windows that are still left once the application finishes the process.

  • augmentedMovieTrailer.py (Main calling script)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsEmbedVideoWithStream class to initiate ####
#### the augmented reality in real-time ####
#### & display a trailer on top of any surface ####
#### via Web-CAM. ####
#####################################################
# We keep the setup code in a different class as shown below.
import clsEmbedVideoWithStream as evws
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the main class
x1 = evws.clsEmbedVideoWithStream()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'augmentedMovieTrailer.log', level=logging.INFO)
print('Started augmenting videos!')
# Execute all the pass
r1 = x1.processStream(debugInd, var)
if (r1 == 0):
print('Successfully identified human emotions!')
else:
print('Failed to identify the human emotions!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The above script will initially instantiate the main calling class & then invoke the processStream function to create the Augmented Reality.


FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

Directory Structure

You will get the complete codebase in the following Github link.

If you want to know more about this legendary director & his famous work, please visit the following link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Real-time Zoom-In/Zoom-Out using Python-based Computer Vision

Hi Guys,

Today, I’ll be using another exciting installment of Computer Vision. The application will read the real-time human hand gesture to control WebCAM’s zoom-in or zoom-out capability.

Why don’t we see the demo first before jumping into the technical details?

Demo

Architecture:

Let us understand the architecture –

Broad Diagram

As one can see, the application reads individual frames from WebCAM & then map the human hand gestures with a media pipe. And finally, calculate the distance between particular pipe points projected on human hands.

Let’s take another depiction of the experiment to better understand the above statement.

Camera & Subject Position

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install mediapipe
pip install opencv-python

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  1. clsConfig.py (Configuration script for the application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 24-May-2022 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'FINAL_PATH': Curr_Path + sep + 'Target' + sep,
'APP_DESC_1': 'Hand Gesture Zoom Control!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'TITLE': "Human Hand Gesture Controlling App",
'minVal':0.01,
'maxVal':1
}

view raw

clsConfig.py

hosted with ❤ by GitHub

2. clsVideoZoom.py (This script will zoom the video streaming depending upon the hand gestures.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 23-May-2022 ####
#### Modified On 24-May-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsVideoZoom class to initiate ####
#### the model to read the real-time ####
#### human hand gesture from video ####
#### Web-CAM & control zoom-in & zoom-out. ####
##################################################
import mediapipe as mp
import cv2
import time
import clsHandMotionScanner as hms
import math
import imutils
import numpy as np
from clsConfig import clsConfig as cf
class clsVideoZoom():
def __init__(self):
self.title = str(cf.conf['TITLE'])
self.minVal = float(cf.conf['minVal'])
self.maxVal = int(cf.conf['maxVal'])
def zoomVideo(self, image, Iscale=1):
try:
scale=Iscale
#get the webcam size
height, width, channels = image.shape
#prepare the crop
centerX,centerY=int(height/2),int(width/2)
radiusX,radiusY= int(scale*centerX),int(scale*centerY)
minX,maxX=centerXradiusX,centerX+radiusX
minY,maxY=centerYradiusY,centerY+radiusY
cropped = image[minX:maxX, minY:maxY]
resized_cropped = cv2.resize(cropped, (width, height))
return resized_cropped
except Exception as e:
x = str(e)
return image
def runSensor(self):
try:
pTime = 0
cTime = 0
zRange = 0
zRangeBar = 0
cap = cv2.VideoCapture(0)
detector = hms.clsHandMotionScanner(detectionCon=0.7)
while True:
success,img = cap.read()
img = imutils.resize(img, width=720)
#img = detector.findHands(img, draw=False)
#lmList = detector.findPosition(img, draw=False)
img = detector.findHands(img)
lmList = detector.findPosition(img, draw=False)
if len(lmList) != 0:
print('*'*60)
#print(lmList[4], lmList[8])
#print('*'*60)
x1, y1 = lmList[4][1], lmList[4][2]
x2, y2 = lmList[8][1], lmList[8][2]
cx, cy = (x1+x2)//2, (y1+y2)//2
cv2.circle(img, (x1,y1), 15, (255,0,255), cv2.FILLED)
cv2.circle(img, (x2,y2), 15, (255,0,255), cv2.FILLED)
cv2.line(img, (x1,y1), (x2,y2), (255,0,255), 3)
cv2.circle(img, (cx,cy), 15, (255,0,255), cv2.FILLED)
lenVal = math.hypot(x2x1, y2y1)
print('Length:', str(lenVal))
print('*'*60)
# Hand Range is from 50 to 270
# Camera Zoom Range is 0.01, 1
minVal = self.minVal
maxVal = self.maxVal
zRange = np.interp(lenVal, [50, 270], [minVal, maxVal])
zRangeBar = np.interp(lenVal, [50, 270], [400, 150])
print('Range: ', str(zRange))
if lenVal < 50:
cv2.circle(img, (cx,cy), 15, (0,255,0), cv2.FILLED)
cv2.rectangle(img, (50, 150), (85, 400), (255,0,0), 3)
cv2.rectangle(img, (50, int(zRangeBar)), (85, 400), (255,0,0), cv2.FILLED)
cTime = time.time()
fps = 1/(cTimepTime)
pTime = cTime
image = cv2.flip(img, flipCode=1)
cv2.putText(image, str(int(fps)), (10, 70), cv2.FONT_HERSHEY_PLAIN, 3, (255, 0, 255), 3)
cv2.imshow("Original Source",image)
# Creating the new zoom video
cropImg = self.zoomVideo(img, zRange)
cv2.putText(cropImg, str(int(fps)), (10, 70), cv2.FONT_HERSHEY_PLAIN, 3, (255, 0, 255), 3)
cv2.imshow("Zoomed Source",cropImg)
if cv2.waitKey(1) == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
return 0
except Exception as e:
x = str(e)
print('Error:', x)
return 1

view raw

clsVideoZoom.py

hosted with ❤ by GitHub

Key snippets from the above scripts –

def zoomVideo(self, image, Iscale=1):
    try:
        scale=Iscale

        #get the webcam size
        height, width, channels = image.shape

        #prepare the crop
        centerX,centerY=int(height/2),int(width/2)
        radiusX,radiusY= int(scale*centerX),int(scale*centerY)

        minX,maxX=centerX-radiusX,centerX+radiusX
        minY,maxY=centerY-radiusY,centerY+radiusY

        cropped = image[minX:maxX, minY:maxY]
        resized_cropped = cv2.resize(cropped, (width, height))

        return resized_cropped

    except Exception as e:
        x = str(e)

        return image

The above method will zoom in & zoom out depending upon the scale value that the human hand gesture will receive.

cap = cv2.VideoCapture(0)
detector = hms.clsHandMotionScanner(detectionCon=0.7)

The following lines will read the individual frames from webCAM. Instantiate another open-source customized class, which will find the hand’s position.

img = detector.findHands(img)
lmList = detector.findPosition(img, draw=False)

And captured the hand position depending upon the movements.

x1, y1 = lmList[4][1], lmList[4][2]
x2, y2 = lmList[8][1], lmList[8][2]

cx, cy = (x1+x2)//2, (y1+y2)//2

cv2.circle(img, (x1,y1), 15, (255,0,255), cv2.FILLED)
cv2.circle(img, (x2,y2), 15, (255,0,255), cv2.FILLED)

To understand the above lines, let’s look into the following diagram –

Source: Mediapipe

As one can see, the thumbs tip value is 4 & Index fingertip is 8. The application will mark these points with a solid circle.

lenVal = math.hypot(x2-x1, y2-y1)

The above line will calculate the distance between the thumbs tip & index fingertip.

# Camera Zoom Range is 0.01, 1

minVal = self.minVal
maxVal = self.maxVal

zRange = np.interp(lenVal, [50, 270], [minVal, maxVal])
zRangeBar = np.interp(lenVal, [50, 270], [400, 150])

In the above lines, the application will translate the values captured between the two fingertips & then translate them into a more meaningful camera zoom range from 0.01 to 1.

if lenVal < 50:
    cv2.circle(img, (cx,cy), 15, (0,255,0), cv2.FILLED)

The application will not consider a value below 50 as 0.01 for the WebCAM start value.

cTime = time.time()
fps = 1/(cTime-pTime)
pTime = cTime


image = cv2.flip(img, flipCode=1)
cv2.putText(image, str(int(fps)), (10, 70), cv2.FONT_HERSHEY_PLAIN, 3, (255, 0, 255), 3)
cv2.imshow("Original Source",image)

# Creating the new zoom video
cropImg = self.zoomVideo(img, zRange)
cv2.putText(cropImg, str(int(fps)), (10, 70), cv2.FONT_HERSHEY_PLAIN, 3, (255, 0, 255), 3)
cv2.imshow("Zoomed Source",cropImg)

The application will capture the frame rate & share the original video frame and the test frame, where it will zoom in or out depending on the hand gesture.

3. clsHandMotionScanner.py (This is an enhance version of open source script, which will capture the hand position.)


##################################################
#### Written By: SATYAKI DE ####
#### Modified On 23-May-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python class that will capture the ####
#### human hand gesture on real-time basis ####
#### and that will enable the video zoom ####
#### capability of the feed directly coming ####
#### out of a Web-CAM. ####
##################################################
import mediapipe as mp
import cv2
import time
class clsHandMotionScanner():
def __init__(self, mode=False, maxHands=2, detectionCon=0.5, modelComplexity=1, trackCon=0.5):
self.mode = mode
self.maxHands = maxHands
self.detectionCon = detectionCon
self.modelComplex = modelComplexity
self.trackCon = trackCon
self.mpHands = mp.solutions.hands
self.hands = self.mpHands.Hands(self.mode, self.maxHands,self.modelComplex,self.detectionCon, self.trackCon)
# it gives small dots onhands total 20 landmark points
self.mpDraw = mp.solutions.drawing_utils
def findHands(self, img, draw=True):
try:
# Send rgb image to hands
imgRGB = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
self.results = self.hands.process(imgRGB)
# process the frame
if self.results.multi_hand_landmarks:
for handLms in self.results.multi_hand_landmarks:
if draw:
#Draw dots and connect them
self.mpDraw.draw_landmarks(img,handLms,self.mpHands.HAND_CONNECTIONS)
return img
except Exception as e:
x = str(e)
print('Error: ', x)
return img
def findPosition(self, img, handNo=0, draw=True):
try:
lmlist = []
# check wether any landmark was detected
if self.results.multi_hand_landmarks:
#Which hand are we talking about
myHand = self.results.multi_hand_landmarks[handNo]
# Get id number and landmark information
for id, lm in enumerate(myHand.landmark):
# id will give id of landmark in exact index number
# height width and channel
h,w,c = img.shape
#find the position
cx,cy = int(lm.x*w), int(lm.y*h) #center
#print(id,cx,cy)
lmlist.append([id,cx,cy])
# Draw circle for 0th landmark
if draw:
cv2.circle(img,(cx,cy), 15 , (255,0,255), cv2.FILLED)
return lmlist
except Exception as e:
x = str(e)
print('Error: ', x)
lmlist = []
return lmlist

Key snippets from the above script –

def findHands(self, img, draw=True):
    try:
        # Send rgb image to hands
        imgRGB = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
        self.results = self.hands.process(imgRGB)

        # process the frame
        if self.results.multi_hand_landmarks:
            for handLms in self.results.multi_hand_landmarks:

                if draw:
                    #Draw dots and connect them
                    self.mpDraw.draw_landmarks(img,handLms,self.mpHands.HAND_CONNECTIONS)

        return img
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return img

The above function will identify individual key points & marked them as dots on top of human hands.

def findPosition(self, img, handNo=0, draw=True):
      try:
          lmlist = []

          # check wether any landmark was detected
          if self.results.multi_hand_landmarks:
              #Which hand are we talking about
              myHand = self.results.multi_hand_landmarks[handNo]
              # Get id number and landmark information
              for id, lm in enumerate(myHand.landmark):
                  # id will give id of landmark in exact index number
                  # height width and channel
                  h,w,c = img.shape
                  #find the position - center
                  cx,cy = int(lm.x*w), int(lm.y*h) 
                  lmlist.append([id,cx,cy])

              # Draw circle for 0th landmark
              if draw:
                  cv2.circle(img,(cx,cy), 15 , (255,0,255), cv2.FILLED)

          return lmlist
      except Exception as e:
          x = str(e)
          print('Error: ', x)

          lmlist = []
          return lmlist

The above line will capture the position of each media pipe point along with the x & y coordinate & store them in a list, which will be later parsed for main use case.

4. viewHandMotion.py (Main calling script.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 23-May-2022 ####
#### Modified On 23-May-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsVideoZoom class to initiate ####
#### the model to read the real-time ####
#### hand movements gesture that enables ####
#### video zoom control. ####
##################################################
import time
import clsVideoZoom as vz
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating the base class
x1 = vz.clsVideoZoom()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'visualZoom.log', level=logging.INFO)
print('Started Visual-Zoom Emotions!')
r1 = x1.runSensor()
if (r1 == 0):
print('Successfully identified visual zoom!')
else:
print('Failed to identify the visual zoom!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The above lines are self-explanatory. So, I’m not going to discuss anything on this script.


FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

Directory

So, we’ve done it.


You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Detecting real-time human emotions using Open-CV, DeepFace & Python

Hi Guys,

Today, I’ll be using another exciting installment of Computer Vision. Our focus will be on getting a sense of human emotions. Let me explain. This post will demonstrate how to read/detect human emotions by analyzing computer vision videos. We will be using part of a Bengali Movie called “Ganashatru (An enemy of the people)” entirely for educational purposes & also as a tribute to the great legendary director late Satyajit Roy. To know more about him, please click the following link.

Why don’t we see the demo first before jumping into the technical details?

Demo

Architecture:

Let us understand the architecture –

Process Flow

From the above diagram, one can see that the application, which uses both the Open-CV & DeepFace, analyzes individual frames from the source. Then predicts the emotions & adds the label in the target B&W frames. Finally, it creates another video by correctly mixing the source audio.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install deepface
pip install opencv-python
pip install ffpyplayer

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsConfig.py (This script will play the video along with audio in sync.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 22-Apr-2022 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': 'GonoshotruClimax',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'FINAL_PATH': Curr_Path + sep + 'Target' + sep,
'APP_DESC_1': 'Video Emotion Capture!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'VIDEO_FILE_EXTN': '.mp4',
'AUDIO_FILE_EXTN': '.mp3',
'IMAGE_FILE_EXTN': '.jpg',
'TITLE': "Gonoshotru – Emotional Analysis"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

All the above inputs are generic & used as normal parameters.

  • clsFaceEmotionDetect.py (This python class will track the human emotions after splitting the audio from the video & put that label on top of the video frame.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Apr-2022 ####
#### Modified On 20-Apr-2022 ####
#### ####
#### Objective: This python class will ####
#### track the human emotions after splitting ####
#### the audio from the video & put that ####
#### label on top of the video frame. ####
#### ####
##################################################
from imutils.video import FileVideoStream
from imutils.video import FPS
import numpy as np
import imutils
import time
import cv2
from clsConfig import clsConfig as cf
from deepface import DeepFace
import clsL as cl
import subprocess
import sys
import os
# Initiating Log class
l = cl.clsL()
class clsFaceEmotionDetect:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.FileName = str(cf.conf['FILE_NAME'])
self.VideoFileExtn = str(cf.conf['VIDEO_FILE_EXTN'])
self.ImageFileExtn = str(cf.conf['IMAGE_FILE_EXTN'])
def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def readEmotion(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
FileName = self.FileName
VideoFileExtn = self.VideoFileExtn
ImageFileExtn = self.ImageFileExtn
font = cv2.FONT_HERSHEY_SIMPLEX
# Load Video
videoFile = Curr_Path + sep + 'Video' + sep + FileName + VideoFileExtn
temp_path = Curr_Path + sep + 'Temp' + sep
# Extracting the audio from the source video
x = self.convert_video_to_audio_ffmpeg(videoFile)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
# Loading the haarcascade xml class
faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# start the file video stream thread and allow the buffer to
# start to fill
print("[INFO] Starting video file thread…")
fvs = FileVideoStream(videoFile).start()
time.sleep(1.0)
cnt = 0
# start the FPS timer
fps = FPS().start()
try:
# loop over frames from the video file stream
while fvs.more():
cnt += 1
# grab the frame from the threaded video file stream, resize
# it, and convert it to grayscale (while still retaining 3
# channels)
try:
frame = fvs.read()
except Exception as e:
x = str(e)
print('Error: ', x)
frame = imutils.resize(frame, width=720)
cv2.imshow("Gonoshotru – Source", frame)
# Enforce Detection to False will continue the sequence even when there is no face
result = DeepFace.analyze(frame, enforce_detection=False, actions = ['emotion'])
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = np.dstack([frame, frame, frame])
faces = faceCascade.detectMultiScale(image=frame, scaleFactor=1.1, minNeighbors=4, minSize=(80,80), flags=cv2.CASCADE_SCALE_IMAGE)
# Draw a rectangle around the face
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x + w, y + h), (0,255,0), 2)
# Use puttext method for inserting live emotion on video
cv2.putText(frame, result['dominant_emotion'], (50,390), font, 3, (0,0,255), 2, cv2.LINE_4)
# display the size of the queue on the frame
#cv2.putText(frame, "Queue Size: {}".format(fvs.Q.qsize()), (10, 30), font, 0.6, (0, 255, 0), 2)
cv2.imwrite(temp_path+'frame-' + str(cnt) + ImageFileExtn, frame)
# show the frame and update the FPS counter
cv2.imshow("Gonoshotru – Emotional Analysis", frame)
fps.update()
if cv2.waitKey(2) & 0xFF == ord('q'):
break
except Exception as e:
x = str(e)
print('Error: ', x)
print('No more frame exists!')
# stop the timer and display FPS information
fps.stop()
print("[INFO] Elasped Time: {:.2f}".format(fps.elapsed()))
print("[INFO] Approx. FPS: {:.2f}".format(fps.fps()))
# do a bit of cleanup
cv2.destroyAllWindows()
fvs.stop()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Key snippets from the above scripts –

def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
    try:
        """Converts video to audio directly using `ffmpeg` command
        with the help of subprocess module"""
        filename, ext = os.path.splitext(video_file)
        subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.STDOUT)

        return 0
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above snippet represents an Audio extraction function that will extract the audio from the source file & store it in the specified directory.

# Loading the haarcascade xml class
faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

Now, Loading is one of the best classes for face detection, which our applications require.

fvs = FileVideoStream(videoFile).start()

Using FileVideoStream will enable our application to process the video faster than cv2.VideoCapture() method.

# start the FPS timer
fps = FPS().start()

The application then invokes the FPS.Start() that will initiate the FPS timer.

# loop over frames from the video file stream
while fvs.more():

The application will check using fvs.more() to find the EOF of the video file. Until then, it will try to read individual frames.

try:
    frame = fvs.read()
except Exception as e:
    x = str(e)
    print('Error: ', x)

The application will read individual frames. In case of any issue, it will capture the correct error without terminating the main program at the beginning. This exception strategy is beneficial when there is no longer any frame to read & yet due to the end frame issue, the entire application throws an error.

frame = imutils.resize(frame, width=720)
cv2.imshow("Gonoshotru - Source", frame)

At this point, the application is resizing the frame for better resolution & performance. Furthermore, identify this video feed as a source.

# Enforce Detection to False will continue the sequence even when there is no face
result = DeepFace.analyze(frame, enforce_detection=False, actions = ['emotion'])

Finally, the application has used the deepface machine-learning API to analyze the subject face & trying to predict its emotions.

frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = np.dstack([frame, frame, frame])

faces = faceCascade.detectMultiScale(image=frame, scaleFactor=1.1, minNeighbors=4, minSize=(80,80), flags=cv2.CASCADE_SCALE_IMAGE)

detectMultiScale function can use to detect the faces. This function will return a rectangle with coordinates (x, y, w, h) around the detected face.

It takes three common arguments — the input image, scaleFactor, and minNeighbours.

scaleFactor specifies how much the image size reduces with each scale. There may be more faces near the camera in a group photo than others. Naturally, such faces would appear more prominent than the ones behind. This factor compensates for that.

minNeighbours specifies how many neighbors each candidate rectangle should have to retain. One may have to tweak these values to get the best results. This parameter specifies the number of neighbors a rectangle should have to be called a face.

# Draw a rectangle around the face
for (x, y, w, h) in faces:
    cv2.rectangle(frame, (x, y), (x + w, y + h), (0,255,0), 2)

As discussed above, the application is now calculating the square’s boundary after receiving the values of x, y, w, & h.

# Use puttext method for inserting live emotion on video
cv2.putText(frame, result['dominant_emotion'], (50,390), font, 3, (0,0,255), 2, cv2.LINE_4)

Finally, capture the dominant emotion from the deepface API & post it on top of the target video.

# display the size of the queue on the frame
cv2.imwrite(temp_path+'frame-' + str(cnt) + ImageFileExtn, frame)

# show the frame and update the FPS counter
cv2.imshow("Gonoshotru - Emotional Analysis", frame)
fps.update()

Also, writing individual frames into a temporary folder, where later they will be consumed & mixed with the source audio.

if cv2.waitKey(2) & 0xFF == ord('q'):
    break

At any given point, if the user wants to quit, the above snippet will allow them by simply pressing either the escape-button or ‘q’-button from the keyboard.

  • clsVideoPlay.py (This script will play the video along with audio in sync.)


###############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 17-Apr-2022 ####
#### ####
#### Objective: This script will play the ####
#### video along with audio in sync. ####
#### ####
###############################################
import os
import platform as pl
import cv2
import numpy as np
import glob
import re
import ffmpeg
import time
from clsConfig import clsConfig as cf
from ffpyplayer.player import MediaPlayer
import logging
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
class clsVideoPlay:
def __init__(self):
self.fileNmFin = str(cf.conf['FILE_NAME'])
self.final_path = str(cf.conf['FINAL_PATH'])
self.title = str(cf.conf['TITLE'])
self.VideoFileExtn = str(cf.conf['VIDEO_FILE_EXTN'])
def videoP(self, file):
try:
cap = cv2.VideoCapture(file)
player = MediaPlayer(file)
start_time = time.time()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
_, val = player.get_frame(show=False)
if val == 'eof':
break
cv2.imshow(file, frame)
elapsed = (time.time() start_time) * 1000 # msec
play_time = int(cap.get(cv2.CAP_PROP_POS_MSEC))
sleep = max(1, int(play_time elapsed))
if cv2.waitKey(sleep) & 0xFF == ord("q"):
break
player.close_player()
cap.release()
cv2.destroyAllWindows()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def stream(self, dInd, var):
try:
VideoFileExtn = self.VideoFileExtn
fileNmFin = self.fileNmFin + VideoFileExtn
final_path = self.final_path
title = self.title
FullFileName = final_path + fileNmFin
ret = self.videoP(FullFileName)
if ret == 0:
print('Successfully Played the Video!')
return 0
else:
return 1
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

view raw

clsVideoPlay.py

hosted with ❤ by GitHub

Let us explore the key snippet –

cap = cv2.VideoCapture(file)
player = MediaPlayer(file)

In the above snippet, the application first reads the video & at the same time, it will create an instance of the MediaPlayer.

play_time = int(cap.get(cv2.CAP_PROP_POS_MSEC))

The application uses cv2.CAP_PROP_POS_MSEC to synchronize video and audio.

  • peopleEmotionRead.py (This is the main calling python script that will invoke the class to initiate the model to read the real-time human emotions from video.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 20-Apr-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsFaceEmotionDetect class to initiate ####
#### the model to read the real-time ####
#### human emotions from video or even from ####
#### Web-CAM & predict it continuously. ####
##################################################
# We keep the setup code in a different class as shown below.
import clsFaceEmotionDetect as fed
import clsFrame2Video as fv
import clsVideoPlay as vp
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the three classes
x1 = fed.clsFaceEmotionDetect()
x2 = fv.clsFrame2Video()
x3 = vp.clsVideoPlay()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Capturing Real-Time Human Emotions!')
# Execute all the pass
r1 = x1.readEmotion(debugInd, var)
r2 = x2.convert2Vid(debugInd, var)
r3 = x3.stream(debugInd, var)
if ((r1 == 0) and (r2 == 0) and (r3 == 0)):
print('Successfully identified human emotions!')
else:
print('Failed to identify the human emotions!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The key-snippet from the above script are as follows –

# Instantiating all the three classes

x1 = fed.clsFaceEmotionDetect()
x2 = fv.clsFrame2Video()
x3 = vp.clsVideoPlay()

As one can see from the above snippet, all the major classes are instantiated & loaded into the memory.

# Execute all the pass
r1 = x1.readEmotion(debugInd, var)
r2 = x2.convert2Vid(debugInd, var)
r3 = x3.stream(debugInd, var)

All the responses are captured into the corresponding variables, which later check for success status.


Let us capture & compare the emotions in a screenshot for better understanding –

Emotion Analysis

So, one can see that most of the frames from the video & above-posted frame correctly identify the human emotions.


FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

Directory

So, we’ve done it.

You will get the complete codebase in the following Github link.

If you want to know more about this legendary director & his famous work, please visit the following link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.