Detecting real-time human emotions using Open-CV, DeepFace & Python

Hi Guys,

Today, I’ll be using another exciting installment of Computer Vision. Our focus will be on getting a sense of human emotions. Let me explain. This post will demonstrate how to read/detect human emotions by analyzing computer vision videos. We will be using part of a Bengali Movie called “Ganashatru (An enemy of the people)” entirely for educational purposes & also as a tribute to the great legendary director late Satyajit Roy. To know more about him, please click the following link.

Why don’t we see the demo first before jumping into the technical details?

Demo

Architecture:

Let us understand the architecture –

Process Flow

From the above diagram, one can see that the application, which uses both the Open-CV & DeepFace, analyzes individual frames from the source. Then predicts the emotions & adds the label in the target B&W frames. Finally, it creates another video by correctly mixing the source audio.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install deepface
pip install opencv-python
pip install ffpyplayer

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsConfig.py (This script will play the video along with audio in sync.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 22-Apr-2022 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': 'GonoshotruClimax',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'FINAL_PATH': Curr_Path + sep + 'Target' + sep,
'APP_DESC_1': 'Video Emotion Capture!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'VIDEO_FILE_EXTN': '.mp4',
'AUDIO_FILE_EXTN': '.mp3',
'IMAGE_FILE_EXTN': '.jpg',
'TITLE': "Gonoshotru – Emotional Analysis"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

All the above inputs are generic & used as normal parameters.

  • clsFaceEmotionDetect.py (This python class will track the human emotions after splitting the audio from the video & put that label on top of the video frame.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Apr-2022 ####
#### Modified On 20-Apr-2022 ####
#### ####
#### Objective: This python class will ####
#### track the human emotions after splitting ####
#### the audio from the video & put that ####
#### label on top of the video frame. ####
#### ####
##################################################
from imutils.video import FileVideoStream
from imutils.video import FPS
import numpy as np
import imutils
import time
import cv2
from clsConfig import clsConfig as cf
from deepface import DeepFace
import clsL as cl
import subprocess
import sys
import os
# Initiating Log class
l = cl.clsL()
class clsFaceEmotionDetect:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.FileName = str(cf.conf['FILE_NAME'])
self.VideoFileExtn = str(cf.conf['VIDEO_FILE_EXTN'])
self.ImageFileExtn = str(cf.conf['IMAGE_FILE_EXTN'])
def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def readEmotion(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
FileName = self.FileName
VideoFileExtn = self.VideoFileExtn
ImageFileExtn = self.ImageFileExtn
font = cv2.FONT_HERSHEY_SIMPLEX
# Load Video
videoFile = Curr_Path + sep + 'Video' + sep + FileName + VideoFileExtn
temp_path = Curr_Path + sep + 'Temp' + sep
# Extracting the audio from the source video
x = self.convert_video_to_audio_ffmpeg(videoFile)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
# Loading the haarcascade xml class
faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# start the file video stream thread and allow the buffer to
# start to fill
print("[INFO] Starting video file thread…")
fvs = FileVideoStream(videoFile).start()
time.sleep(1.0)
cnt = 0
# start the FPS timer
fps = FPS().start()
try:
# loop over frames from the video file stream
while fvs.more():
cnt += 1
# grab the frame from the threaded video file stream, resize
# it, and convert it to grayscale (while still retaining 3
# channels)
try:
frame = fvs.read()
except Exception as e:
x = str(e)
print('Error: ', x)
frame = imutils.resize(frame, width=720)
cv2.imshow("Gonoshotru – Source", frame)
# Enforce Detection to False will continue the sequence even when there is no face
result = DeepFace.analyze(frame, enforce_detection=False, actions = ['emotion'])
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = np.dstack([frame, frame, frame])
faces = faceCascade.detectMultiScale(image=frame, scaleFactor=1.1, minNeighbors=4, minSize=(80,80), flags=cv2.CASCADE_SCALE_IMAGE)
# Draw a rectangle around the face
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x + w, y + h), (0,255,0), 2)
# Use puttext method for inserting live emotion on video
cv2.putText(frame, result['dominant_emotion'], (50,390), font, 3, (0,0,255), 2, cv2.LINE_4)
# display the size of the queue on the frame
#cv2.putText(frame, "Queue Size: {}".format(fvs.Q.qsize()), (10, 30), font, 0.6, (0, 255, 0), 2)
cv2.imwrite(temp_path+'frame-' + str(cnt) + ImageFileExtn, frame)
# show the frame and update the FPS counter
cv2.imshow("Gonoshotru – Emotional Analysis", frame)
fps.update()
if cv2.waitKey(2) & 0xFF == ord('q'):
break
except Exception as e:
x = str(e)
print('Error: ', x)
print('No more frame exists!')
# stop the timer and display FPS information
fps.stop()
print("[INFO] Elasped Time: {:.2f}".format(fps.elapsed()))
print("[INFO] Approx. FPS: {:.2f}".format(fps.fps()))
# do a bit of cleanup
cv2.destroyAllWindows()
fvs.stop()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Key snippets from the above scripts –

def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
    try:
        """Converts video to audio directly using `ffmpeg` command
        with the help of subprocess module"""
        filename, ext = os.path.splitext(video_file)
        subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.STDOUT)

        return 0
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above snippet represents an Audio extraction function that will extract the audio from the source file & store it in the specified directory.

# Loading the haarcascade xml class
faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

Now, Loading is one of the best classes for face detection, which our applications require.

fvs = FileVideoStream(videoFile).start()

Using FileVideoStream will enable our application to process the video faster than cv2.VideoCapture() method.

# start the FPS timer
fps = FPS().start()

The application then invokes the FPS.Start() that will initiate the FPS timer.

# loop over frames from the video file stream
while fvs.more():

The application will check using fvs.more() to find the EOF of the video file. Until then, it will try to read individual frames.

try:
    frame = fvs.read()
except Exception as e:
    x = str(e)
    print('Error: ', x)

The application will read individual frames. In case of any issue, it will capture the correct error without terminating the main program at the beginning. This exception strategy is beneficial when there is no longer any frame to read & yet due to the end frame issue, the entire application throws an error.

frame = imutils.resize(frame, width=720)
cv2.imshow("Gonoshotru - Source", frame)

At this point, the application is resizing the frame for better resolution & performance. Furthermore, identify this video feed as a source.

# Enforce Detection to False will continue the sequence even when there is no face
result = DeepFace.analyze(frame, enforce_detection=False, actions = ['emotion'])

Finally, the application has used the deepface machine-learning API to analyze the subject face & trying to predict its emotions.

frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = np.dstack([frame, frame, frame])

faces = faceCascade.detectMultiScale(image=frame, scaleFactor=1.1, minNeighbors=4, minSize=(80,80), flags=cv2.CASCADE_SCALE_IMAGE)

detectMultiScale function can use to detect the faces. This function will return a rectangle with coordinates (x, y, w, h) around the detected face.

It takes three common arguments — the input image, scaleFactor, and minNeighbours.

scaleFactor specifies how much the image size reduces with each scale. There may be more faces near the camera in a group photo than others. Naturally, such faces would appear more prominent than the ones behind. This factor compensates for that.

minNeighbours specifies how many neighbors each candidate rectangle should have to retain. One may have to tweak these values to get the best results. This parameter specifies the number of neighbors a rectangle should have to be called a face.

# Draw a rectangle around the face
for (x, y, w, h) in faces:
    cv2.rectangle(frame, (x, y), (x + w, y + h), (0,255,0), 2)

As discussed above, the application is now calculating the square’s boundary after receiving the values of x, y, w, & h.

# Use puttext method for inserting live emotion on video
cv2.putText(frame, result['dominant_emotion'], (50,390), font, 3, (0,0,255), 2, cv2.LINE_4)

Finally, capture the dominant emotion from the deepface API & post it on top of the target video.

# display the size of the queue on the frame
cv2.imwrite(temp_path+'frame-' + str(cnt) + ImageFileExtn, frame)

# show the frame and update the FPS counter
cv2.imshow("Gonoshotru - Emotional Analysis", frame)
fps.update()

Also, writing individual frames into a temporary folder, where later they will be consumed & mixed with the source audio.

if cv2.waitKey(2) & 0xFF == ord('q'):
    break

At any given point, if the user wants to quit, the above snippet will allow them by simply pressing either the escape-button or ‘q’-button from the keyboard.

  • clsVideoPlay.py (This script will play the video along with audio in sync.)


###############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 17-Apr-2022 ####
#### ####
#### Objective: This script will play the ####
#### video along with audio in sync. ####
#### ####
###############################################
import os
import platform as pl
import cv2
import numpy as np
import glob
import re
import ffmpeg
import time
from clsConfig import clsConfig as cf
from ffpyplayer.player import MediaPlayer
import logging
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
class clsVideoPlay:
def __init__(self):
self.fileNmFin = str(cf.conf['FILE_NAME'])
self.final_path = str(cf.conf['FINAL_PATH'])
self.title = str(cf.conf['TITLE'])
self.VideoFileExtn = str(cf.conf['VIDEO_FILE_EXTN'])
def videoP(self, file):
try:
cap = cv2.VideoCapture(file)
player = MediaPlayer(file)
start_time = time.time()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
_, val = player.get_frame(show=False)
if val == 'eof':
break
cv2.imshow(file, frame)
elapsed = (time.time() start_time) * 1000 # msec
play_time = int(cap.get(cv2.CAP_PROP_POS_MSEC))
sleep = max(1, int(play_time elapsed))
if cv2.waitKey(sleep) & 0xFF == ord("q"):
break
player.close_player()
cap.release()
cv2.destroyAllWindows()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def stream(self, dInd, var):
try:
VideoFileExtn = self.VideoFileExtn
fileNmFin = self.fileNmFin + VideoFileExtn
final_path = self.final_path
title = self.title
FullFileName = final_path + fileNmFin
ret = self.videoP(FullFileName)
if ret == 0:
print('Successfully Played the Video!')
return 0
else:
return 1
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

view raw

clsVideoPlay.py

hosted with ❤ by GitHub

Let us explore the key snippet –

cap = cv2.VideoCapture(file)
player = MediaPlayer(file)

In the above snippet, the application first reads the video & at the same time, it will create an instance of the MediaPlayer.

play_time = int(cap.get(cv2.CAP_PROP_POS_MSEC))

The application uses cv2.CAP_PROP_POS_MSEC to synchronize video and audio.

  • peopleEmotionRead.py (This is the main calling python script that will invoke the class to initiate the model to read the real-time human emotions from video.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 20-Apr-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsFaceEmotionDetect class to initiate ####
#### the model to read the real-time ####
#### human emotions from video or even from ####
#### Web-CAM & predict it continuously. ####
##################################################
# We keep the setup code in a different class as shown below.
import clsFaceEmotionDetect as fed
import clsFrame2Video as fv
import clsVideoPlay as vp
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the three classes
x1 = fed.clsFaceEmotionDetect()
x2 = fv.clsFrame2Video()
x3 = vp.clsVideoPlay()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Capturing Real-Time Human Emotions!')
# Execute all the pass
r1 = x1.readEmotion(debugInd, var)
r2 = x2.convert2Vid(debugInd, var)
r3 = x3.stream(debugInd, var)
if ((r1 == 0) and (r2 == 0) and (r3 == 0)):
print('Successfully identified human emotions!')
else:
print('Failed to identify the human emotions!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The key-snippet from the above script are as follows –

# Instantiating all the three classes

x1 = fed.clsFaceEmotionDetect()
x2 = fv.clsFrame2Video()
x3 = vp.clsVideoPlay()

As one can see from the above snippet, all the major classes are instantiated & loaded into the memory.

# Execute all the pass
r1 = x1.readEmotion(debugInd, var)
r2 = x2.convert2Vid(debugInd, var)
r3 = x3.stream(debugInd, var)

All the responses are captured into the corresponding variables, which later check for success status.


Let us capture & compare the emotions in a screenshot for better understanding –

Emotion Analysis

So, one can see that most of the frames from the video & above-posted frame correctly identify the human emotions.


FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

Directory

So, we’ve done it.

You will get the complete codebase in the following Github link.

If you want to know more about this legendary director & his famous work, please visit the following link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Prepare analytics based on streaming data from Twitter using Python

Hi Guys!

Today, we will be projecting an analytics storyline based on streaming data from twitter’s developer account.

I want to make sure that this solely for educational purposes & no data analysis has provided to any agency or third-party apps. So, when you are planning to use this API, make sure that you strictly follow these rules.

In order to create a streaming channel from Twitter, you need to create one developer account.

As I’m a huge soccer fan, I would like to refer to one soccer place on Twitter for this. In this case, we’ll be checking BA Analytics for this.

6. Origin_Site

Please find the steps to create one developer account –

Step -1: 

You have to go to the following link. Over there you need to submit the request in order to create the account. You need to provide proper justification as to why you need that account. I’m not going into those forms. They are self-explanatory.

Once, your developer account activated, you need to click the following link as shown below –

1. TwitterSetup

Once you clicked that, the program will lead to you the following page –

2. TwitterSetup - Continue

If you don’t have any app, the first page will look something like the above page.

Step 2:

3. TwiterSetup - Continue

Now, you need to fill-up the following details. For security reasons, I’ll be hiding sensitive data here.

Step 3:

4. TwitterSetUp - Continue

After creating that, you need to go to the next tab i.e. key’s & tokens. The initial screen will only have Consumer API keys.

Step 4:

To generate the Access token, you need to click the create button from the above screenshot & then the new page will look like this –

5. TwitterSetUp - Continue

Our program will be using all these pieces of information.

So, now we’re ready for our Python program.

In order to access Twitter API through python, you need to install the following package –

pip install python-twitter

Let’s see the directory structure –

7. Directory

Let’s check only the relevant scripts here. We’re not going to discuss the clsL.py as we’ve already discussed. Please refer to the old post.

1. clsConfig.py (This script contains all the parameters of the server.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'ACCESS_TOKEN': '99999999-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
        'ACCESS_SECRET': 'YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY',
        'CONSUMER_KEY': "aaaaaaaaaaaaaaaaaaaaaaa",
        'CONSUMER_SECRET': 'HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH',
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

For security reasons, I’ve removed the original keys with dummy keys. You have to fill-up your own keys.

2. clsTwitter.py (This script will fetch data from Twitter API & process the same & send it to the calling method.)

###############################################
#### Written By: SATYAKI DE                ####
#### Written On: 12-Oct-2019               ####
#### Modified On 12-Oct-2019               ####
####                                       ####
#### Objective: Main class fetching sample ####
#### data from Twitter API.                ####
###############################################

import twitter
from clsConfig import clsConfig as cf
import json
import re
import string
import logging

class clsTwitter:
    def __init__(self):
        self.access_token = cf.config['ACCESS_TOKEN']
        self.access_secret = cf.config['ACCESS_SECRET']
        self.consumer_key = cf.config['CONSUMER_KEY']
        self.consumer_secret = cf.config['CONSUMER_SECRET']

    def find_element(self, srcJson, key):
        """Pull all values of specified key from nested JSON."""
        arr = []

        def fetch(srcJson, arr, key):
            """Recursively search for values of key in JSON tree."""
            if isinstance(srcJson, dict):
                for k, v in srcJson.items():
                    if isinstance(v, (dict, list)):
                        fetch(v, arr, key)
                    elif k == key:
                        arr.append(v)
            elif isinstance(srcJson, list):
                for item in srcJson:
                    fetch(item, arr, key)
            return arr

        finJson = fetch(srcJson, arr, key)
        return finJson

    def searchQry(self, rawQry):
        try:
            fin_dict = {}
            finJson = ''
            res = ''
            cnt = 0

            # Parameters to invoke Twitter API
            ACCESS_TOKEN = self.access_token
            ACCESS_SECRET = self.access_secret
            CONSUMER_KEY = self.consumer_key
            CONSUMER_SECRET = self.consumer_secret

            tmpR20 = 'Raw Query: ' + str(rawQry)
            logging.info(tmpR20)

            finJson = '['

            if rawQry == '':
                print('No data to proceed!')
                logging.info('No data to proceed!')
            else:
                t = twitter.Api(
                                  consumer_key = CONSUMER_KEY,
                                  consumer_secret = CONSUMER_SECRET,
                                  access_token_key = ACCESS_TOKEN,
                                  access_token_secret = ACCESS_SECRET
                               )

                response = t.GetSearch(raw_query=rawQry)
                print('Total Records fetched:', str(len(response)))

                for i in response:

                    # Converting them to json
                    data = str(i)
                    res_json = json.loads(data)

                    # Calling individual key
                    id = res_json['id']
                    tmpR19 = 'Id: ' + str(id)
                    logging.info(tmpR19)

                    try:
                        f_count = res_json['quoted_status']['user']['followers_count']
                    except:
                        f_count = 0
                    tmpR21 = 'Followers Count: ' + str(f_count)
                    logging.info(tmpR21)

                    try:
                        r_count = res_json['quoted_status']['retweet_count']
                    except:
                        r_count = 0
                    tmpR22 = 'Retweet Count: ' + str(r_count)
                    logging.info(tmpR22)

                    text = self.find_element(res_json, 'text')

                    for j in text:
                        strF = re.sub(f'[^{re.escape(string.printable)}]', '', str(j))
                        pat = re.compile(r'[\t\n]')
                        strG = pat.sub("", strF)
                        res = "".join(strG)

                    # Forming return dictionary
                    #fin_dict.update({id:'id', f_count: 'followerCount', r_count: 'reTweetCount', res: 'msgPost'})
                    if cnt == 0:
                        finJson = finJson + '{"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'
                    else:
                        finJson = finJson + ', {"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'

                    cnt += 1

            finJson = finJson + ']'

            jdata = json.dumps(finJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails' : x}

            return ResJson

The key lines from this snippet are as follows –

def find_element(self, srcJson, key):
    """Pull all values of specified key from nested JSON."""
    arr = []

    def fetch(srcJson, arr, key):
        """Recursively search for values of key in JSON tree."""
        if isinstance(srcJson, dict):
            for k, v in srcJson.items():
                if isinstance(v, (dict, list)):
                    fetch(v, arr, key)
                elif k == key:
                    arr.append(v)
        elif isinstance(srcJson, list):
            for item in srcJson:
                fetch(item, arr, key)
        return arr

    finJson = fetch(srcJson, arr, key)
    return finJson

This function will check against a specific key & based on that it will search from the supplied JSON & returns the value. This would be particularly very useful when you don’t have any fixed position of your elements.

t = twitter.Api(
                  consumer_key = CONSUMER_KEY,
                  consumer_secret = CONSUMER_SECRET,
                  access_token_key = ACCESS_TOKEN,
                  access_token_secret = ACCESS_SECRET
               )

response = t.GetSearch(raw_query=rawQry)

In this case, Python application will receive the JSON response using the new Twitter API.

id = res_json['id']
try:
    f_count = res_json['quoted_status']['user']['followers_count']
except:
    f_count = 0
try:
    r_count = res_json['quoted_status']['retweet_count']
except:
    r_count = 0

Fetching specific fixed position elements from the response API.

text = self.find_element(res_json, 'text')

Fetching the dynamic position based element using our customized function.

for j in text:
    strF = re.sub(f'[^{re.escape(string.printable)}]', '', str(j))
    pat = re.compile(r'[\t\n]')
    strG = pat.sub("", strF)
    res = "".join(strG)

Removing non-printable characters & white spaces from the extracted text field in order to get clean data.

if cnt == 0:
    finJson = finJson + '{"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'
else:
    finJson = finJson + ', {"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'

Finally, generating a JSON string dynamically.

jdata = json.dumps(finJson)
ResJson = json.loads(jdata)

And, returning the JSON to our calling program.

3. callTwitterAPI.py (This is the main script that will invoke the Twitter API & then project the analytic report based on the available Twitter data.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
#### Modified On 12-Oct-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsTwitter as ct

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def getMaximumFollower(df):
    try:
        d1 = df['followerCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = 0

        return dt_part1

def getMaximumRetweet(df):
    try:
        d1 = df['reTweetCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedTwitter.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        # Query using parameters
        rawQry = 'q=from%3ABlades_analytic&src=typd'

        x1 = ct.clsTwitter()
        ret_2 = x1.searchQry(rawQry)

        # Capturing the JSON Payload
        res = json.loads(ret_2)

        # Converting dictionary to Pandas Dataframe
        df_ret = p.read_json(ret_2, orient='records')

        # Resetting the column orders as per JSON
        df_ret = df_ret[list(res[0].keys())]

        l.logr('1.Twitter_' + var + '.csv', debug_ind, df_ret, 'log')

        print('Realtime Twitter Data:: ')
        logging.info('Realtime Twitter Data:: ')
        print(df_ret)
        print()

        # Checking execution status
        ret_val_2 = df_ret.shape[0]

        if ret_val_2 == 0:
            print("Twitter hasn't returned any rows. Please check your queries!")
            logging.info("Twitter hasn't returned any rows. Please check your queries!")
            print("*" * 157)
            logging.info(tmpR0)
        else:
            print("Successfuly row feteched!")
            logging.info("Successfuly row feteched!")
            print("*" * 157)
            logging.info(tmpR0)

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)

        # Performing Basic Aggregate
        # 1. Find the user who has maximum Followers
        df_ret['MaxFollower'] = getMaximumFollower(df_ret)

        # 2. Find the user who has maximum Re-Tweets
        df_ret['MaxTweet'] = getMaximumRetweet(df_ret)

        # Getting Status
        df_MaxFollower = df_ret[(df_ret['followerCount'] == df_ret['MaxFollower'])]

        # Dropping Columns
        df_MaxFollower.drop(['reTweetCount'], axis=1, inplace=True)
        df_MaxFollower.drop(['MaxTweet'], axis=1, inplace=True)

        l.logr('2.Twitter_Maximum_Follower_' + var + '.csv', debug_ind, df_MaxFollower, 'log')

        print('Maximum Follower:: ')
        print(df_MaxFollower)
        print("*" * 157)
        logging.info(tmpR0)

        df_MaxTwitter = df_ret[(df_ret['reTweetCount'] == df_ret['MaxTweet'])]
        print()

        # Dropping Columns
        df_MaxTwitter.drop(['followerCount'], axis=1, inplace=True)
        df_MaxTwitter.drop(['MaxFollower'], axis=1, inplace=True)

        l.logr('3.Twitter_Maximum_Retweet_' + var + '.csv', debug_ind, df_MaxTwitter, 'log')

        print('Maximum Re-Twitt:: ')
        print(df_MaxTwitter)
        print("*" * 157)
        logging.info(tmpR0)

        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError:
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

And, here are the key lines –

x1 = ct.clsTwitter()
ret_2 = x1.searchQry(rawQry)

Our application is instantiating the newly developed class.

# Capturing the JSON Payload
res = json.loads(ret_2)

# Converting dictionary to Pandas Dataframe
df_ret = p.read_json(ret_2, orient='records')

# Resetting the column orders as per JSON
df_ret = df_ret[list(res[0].keys())]

Converting the JSON to pandas dataframe for our analytic data point.

def getMaximumFollower(df):
    try:
        d1 = df['followerCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = 0

        return dt_part1

def getMaximumRetweet(df):
    try:
        d1 = df['reTweetCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

These two functions declared above in the calling script are generating the maximum data point from the Re-Tweet & Followers from our returned dataset.

# Getting Status
df_MaxFollower = df_ret[(df_ret['followerCount'] == df_ret['MaxFollower'])]

And, this is the way, our application will fetch the maximum twitter dataset –

df_MaxTwitter = df_ret[(df_ret['reTweetCount'] == df_ret['MaxTweet'])]

And, you can customize your output by dropping unwanted columns in the specific dataset.

And, here is the output on Windows, which looks like –

8. WindowsRun

And, here is the windows log directory –

WindowsRunLog

So, we’ve achieved our target data point.

So, we’ll come out with another exciting post in the coming days!

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.

Explaining New Python Library – Regular Expression in JSON

Hi Guys!

As discussed, here is the continuation of the previous post. We’ll explain the regular expression from the library that I’ve created recently.

First, let me share the calling script for regular expression –

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 08-Sep-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from dnpr.clsDnpr import clsDnpr
import datetime as dt
import json

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

def main():
    try:
        # Initializing the class
        t = clsDnpr()
        
        srcJson = [
                    {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
                    {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
                    {"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
                    {"FirstName": "Archi", "LastName": "Bose", "Sal": 7000},
                    {"FirstName": "Deb", "LastName": "Sen", "Sal": 9500}
                  ]

        print("4. Checking regular expression functionality!")
        print()

        var13 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var13))

        print('::Function Regex_Like:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Rexex_Like: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))

        # Invoking the distinct function
        tarJson = t.regex_like(srcJson, tarColumn, inpPattern)

        print('End of Function Regex_Like!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var14 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var14))

        var15 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var15))

        print('::Function Regex_Replace:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Rexex_Replace: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))
        replaceString = 'Ka'
        print('Replacing Character: ', replaceString)

        # Invoking the distinct function
        tarJson = t.regex_replace(srcJson, tarColumn, inpPattern, replaceString)

        print('End of Function Rexex_Replace!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var16 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var16))

        var17 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var17))

        print('::Function Regex_Substr:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Regex_Substr: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))

        # Invoking the distinct function
        tarJson = t.regex_substr(srcJson, tarColumn, inpPattern)

        print('End of Function Regex_Substr!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var18 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var18))

        print("=" * 157)
        print("End of regular expression function!")
        print("=" * 157)



    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

As per the library, we’ll discuss the following functionalities –

  1. regex_like
  2. regex_replace
  3. regex_substr

Now, let us check how to call these functions.

1. regex_like:

Following is the base skeleton in order to invoke this function –

regex_like(Input Json, Target Column, Pattern To Match) return Output Json

Here are the key lines in the script –

srcJson = [
            {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
            {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
            {"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
            {"FirstName": "Archi", "LastName": "Bose", "Sal": 7000},
            {"FirstName": "Deb", "LastName": "Sen", "Sal": 9500}
          ]

# Invoking the distinct function
tarJson = t.regex_like(srcJson, tarColumn, inpPattern)

2. regex_replace:

Following is the base skeleton in order to invoke this function –

regex_replace(Input Json, Target Column, Pattern to Replace) return Output Json

Here are the key lines in the script –

tarColumn = 'FirstName'
print('Target Column for Rexex_Replace: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))
replaceString = 'Ka'
print('Replacing Character: ', replaceString)

# Invoking the distinct function
tarJson = t.regex_replace(srcJson, tarColumn, inpPattern, replaceString)

As you can see, here ‘Sa’ with ‘Ka’ provided it matches the specific pattern in the JSON.

3. regex_replace:

Following is the base skeleton in order to invoke this function –

regex_substr(Input Json, Target Column, Pattern to substring) return Output Json

Here are the key lines –

tarColumn = 'FirstName'
print('Target Column for Regex_Substr: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))

# Invoking the distinct function
tarJson = t.regex_substr(srcJson, tarColumn, inpPattern)

In this case, we’ve subtracted a part of the JSON string & return the final result as JSON.

Let us first see the sample input JSON –

SourceJSON_Regex

Let us check how it looks when we run the calling script –

  • regex_like:
Regex_Like

This function will retrieve the elements, which will start with ‘Sa‘. As a result, we’ll see the following two elements in the Payload.

  • regex_replace:
Regex_Replace

In this case, we’re replacing any string which starts with ‘Sa‘ & replaced with the ‘Ka‘.

  • regex_substr:
Regex_Substr

As you can see that the first element FirstName changed the name from “Satyaki” to “tyaki“.

So, finally, we’ve achieved our target.

I’ll post the next exciting concept very soon.

Till then! Happy Avenging! 😀

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.